This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 82460c0bf0 [Feature][Connector-V2] Support multi-table sink feature 
for kudu (#5951)
82460c0bf0 is described below

commit 82460c0bf0e88a40070590c3b0615d9335aa4ceb
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Thu Dec 14 13:36:49 2023 +0800

    [Feature][Connector-V2] Support multi-table sink feature for kudu (#5951)
---
 docs/en/connector-v2/sink/Kudu.md                  | 72 ++++++++++++++++
 .../connectors/seatunnel/kudu/sink/KuduSink.java   |  9 +-
 .../seatunnel/kudu/sink/KuduSinkFactory.java       | 12 ++-
 .../seatunnel/kudu/sink/KuduSinkWriter.java        |  5 +-
 .../seatunnel/kudu/source/KuduSource.java          |  2 -
 .../e2e/connector/kudu/KuduCDCSinkIT.java          |  2 +
 .../seatunnel/e2e/connector/kudu/KuduIT.java       |  2 +
 .../connector/kudu/KuduWIthMultipleTableIT.java    | 97 +++++++++++++++++++---
 .../resources/fake_to_kudu_with_multipletable.conf | 87 +++++++++++++++++++
 .../kudu_to_assert_with_multipletable.conf         |  3 +-
 10 files changed, 270 insertions(+), 21 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kudu.md 
b/docs/en/connector-v2/sink/Kudu.md
index b6e4eee24c..f885240498 100644
--- a/docs/en/connector-v2/sink/Kudu.md
+++ b/docs/en/connector-v2/sink/Kudu.md
@@ -123,6 +123,78 @@ sink {
 }
 ```
 
+### Multiple Table
+
+```hocon
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "kudu_sink_1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "kudu_sink_2"
+              fields {
+                        id = int
+                        val_bool = boolean
+                        val_int8 = tinyint
+                        val_int16 = smallint
+                        val_int32 = int
+                        val_int64 = bigint
+                        val_float = float
+                        val_double = double
+                        val_decimal = "decimal(16, 1)"
+                        val_string = string
+                        val_unixtime_micros = timestamp
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+
+sink {
+   kudu{
+    kudu_masters = "kudu-master-multiple:7051"
+ }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index 0cc827f1ed..898016b5cf 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,18 +29,16 @@ import 
org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommit
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 
 /**
  * Kudu Sink implementation by using SeaTunnel sink API. This class contains 
the method to create
  * {@link AbstractSimpleSink}.
  */
-@AutoService(SeaTunnelSink.class)
 public class KuduSink
         implements SeaTunnelSink<
-                SeaTunnelRow, KuduSinkState, KuduCommitInfo, 
KuduAggregatedCommitInfo> {
+                        SeaTunnelRow, KuduSinkState, KuduCommitInfo, 
KuduAggregatedCommitInfo>,
+                SupportMultiTableSink {
 
     private KuduSinkConfig kuduSinkConfig;
     private SeaTunnelRowType seaTunnelRowType;
@@ -51,7 +50,7 @@ public class KuduSink
 
     @Override
     public String getPluginName() {
-        return "kudu";
+        return "Kudu";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
index fc94e94b88..3917d1cd62 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
@@ -29,6 +29,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
 import com.google.auto.service.AutoService;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static 
org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
 import static 
org.apache.kudu.client.SessionConfiguration.FlushMode.MANUAL_FLUSH;
@@ -43,7 +45,8 @@ public class KuduSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(KuduSinkConfig.MASTER, KuduSinkConfig.TABLE_NAME)
+                .required(KuduSinkConfig.MASTER)
+                .optional(KuduSinkConfig.TABLE_NAME)
                 .optional(KuduSinkConfig.WORKER_COUNT)
                 .optional(KuduSinkConfig.OPERATION_TIMEOUT)
                 .optional(KuduSinkConfig.ADMIN_OPERATION_TIMEOUT)
@@ -73,6 +76,13 @@ public class KuduSinkFactory implements TableSinkFactory {
     public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig config = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
+        if (!config.getOptional(KuduSinkConfig.TABLE_NAME).isPresent()) {
+            Map<String, String> map = config.toMap();
+            map.put(
+                    KuduSinkConfig.TABLE_NAME.key(),
+                    catalogTable.getTableId().toTablePath().getFullName());
+            config = ReadonlyConfig.fromMap(new HashMap<>(map));
+        }
         KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
         return () -> new KuduSink(kuduSinkConfig, catalogTable);
     }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
index acf5b4ca2e..82a28c0451 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
@@ -32,7 +33,9 @@ import java.io.IOException;
 import java.util.Optional;
 
 @Slf4j
-public class KuduSinkWriter implements SinkWriter<SeaTunnelRow, 
KuduCommitInfo, KuduSinkState> {
+public class KuduSinkWriter
+        implements SinkWriter<SeaTunnelRow, KuduCommitInfo, KuduSinkState>,
+                SupportMultiTableSinkWriter<Void> {
 
     private SeaTunnelRowType seaTunnelRowType;
     private KuduOutputFormat fileWriter;
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index 57b7925b0d..9490208af5 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -28,14 +28,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceTableConfig;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
 import java.util.stream.Collectors;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class KuduSource
         implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, 
KuduSourceState>,
                 SupportParallelism {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
index dd45471060..733e0d39c9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
@@ -36,6 +36,7 @@ import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
@@ -273,6 +274,7 @@ public class KuduCDCSinkIT extends TestSuiteBase implements 
TestResource {
     }
 
     @Override
+    @AfterAll
     public void tearDown() throws Exception {
         if (kuduClient != null) {
             kuduClient.close();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index ae26aff796..b0c8cba1fe 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -39,6 +39,7 @@ import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
 
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
@@ -274,6 +275,7 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
     }
 
     @Override
+    @AfterAll
     public void tearDown() throws Exception {
         if (kuduClient != null) {
             kuduClient.close();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
index 11cd819b6c..cc05653d5c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
@@ -32,11 +32,15 @@ import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.OperationResponse;
 import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
 
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
@@ -62,9 +66,11 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
+import static org.awaitility.Awaitility.await;
 
 @Slf4j
 @DisabledOnContainer(
@@ -158,12 +164,11 @@ public class KuduWIthMultipleTableIT extends 
TestSuiteBase implements TestResour
             row.addObject("val_decimal", new BigDecimal("1.1212"));
             row.addObject("val_string", "test");
             row.addObject("val_unixtime_micros", new 
java.sql.Timestamp(1693477266998L));
-            row.addObject("val_binary", "NEW".getBytes());
             OperationResponse response = kuduSession.apply(insert);
         }
     }
 
-    private void initializeKuduTable() throws KuduException {
+    private void initializeKuduTable(String tableName) throws KuduException {
 
         List<ColumnSchema> columns = new ArrayList();
 
@@ -210,10 +215,6 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase 
implements TestResour
                 new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", 
Type.UNIXTIME_MICROS)
                         .nullable(true)
                         .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_binary", Type.BINARY)
-                        .nullable(true)
-                        .build());
 
         Schema schema = new Schema(columns);
 
@@ -222,8 +223,7 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase 
implements TestResour
 
         tableOptions.addHashPartitions(hashKeys, 2);
         tableOptions.setNumReplicas(1);
-        kuduClient.createTable("kudu_source_table_1", schema, tableOptions);
-        kuduClient.createTable("kudu_source_table_2", schema, tableOptions);
+        kuduClient.createTable(tableName, schema, tableOptions);
     }
 
     private void getKuduClient() {
@@ -238,8 +238,10 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase 
implements TestResour
     }
 
     @TestTemplate
-    public void testKudu(TestContainer container) throws IOException, 
InterruptedException {
-        initializeKuduTable();
+    public void testKuduMultipleRead(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_source_table_1");
+        initializeKuduTable("kudu_source_table_2");
         batchInsertData("kudu_source_table_1");
         batchInsertData("kudu_source_table_2");
         Container.ExecResult execResult =
@@ -249,7 +251,62 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase 
implements TestResour
         kuduClient.deleteTable("kudu_source_table_2");
     }
 
+    @TestTemplate
+    public void testKuduMultipleWrite(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_sink_1");
+        initializeKuduTable("kudu_sink_2");
+        Container.ExecResult execResult =
+                container.executeJob("/fake_to_kudu_with_multipletable.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertAll(
+                                        () -> {
+                                            Assertions.assertIterableEquals(
+                                                    Stream.<List<Object>>of(
+                                                                    
Arrays.asList(
+                                                                            
"1",
+                                                                            
"true",
+                                                                            
"1",
+                                                                            
"2",
+                                                                            
"3",
+                                                                            
"4",
+                                                                            
"4.3",
+                                                                            
"5.3",
+                                                                            
"6.30000",
+                                                                            
"NEW",
+                                                                            
"2020-02-02 02:02:02.0"))
+                                                            
.collect(Collectors.toList()),
+                                                    readData("kudu_sink_1"));
+                                        },
+                                        () -> {
+                                            Assertions.assertIterableEquals(
+                                                    Stream.<List<Object>>of(
+                                                                    
Arrays.asList(
+                                                                            
"1",
+                                                                            
"true",
+                                                                            
"1",
+                                                                            
"2",
+                                                                            
"3",
+                                                                            
"4",
+                                                                            
"4.3",
+                                                                            
"5.3",
+                                                                            
"6.30000",
+                                                                            
"NEW",
+                                                                            
"2020-02-02 02:02:02.0"))
+                                                            
.collect(Collectors.toList()),
+                                                    readData("kudu_sink_2"));
+                                        }));
+
+        kuduClient.deleteTable("kudu_sink_1");
+        kuduClient.deleteTable("kudu_sink_2");
+    }
+
     @Override
+    @AfterAll
     public void tearDown() throws Exception {
         if (kuduClient != null) {
             kuduClient.close();
@@ -264,6 +321,26 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase 
implements TestResour
         }
     }
 
+    public List<List<Object>> readData(String tableName) throws KuduException {
+        List<List<Object>> result = new ArrayList<>();
+        KuduTable kuduTable = kuduClient.openTable(tableName);
+        KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
+        while (scanner.hasMoreRows()) {
+            RowResultIterator rowResults = scanner.nextRows();
+            List<Object> row = new ArrayList<>();
+            while (rowResults.hasNext()) {
+                RowResult rowResult = rowResults.next();
+                for (int i = 0; i < rowResult.getSchema().getColumns().size(); 
i++) {
+                    row.add(rowResult.getObject(i).toString());
+                }
+            }
+            if (!row.isEmpty()) {
+                result.add(row);
+            }
+        }
+        return result;
+    }
+
     private static String getHostIPAddress() {
         try {
             Enumeration<NetworkInterface> networkInterfaceEnumeration =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
new file mode 100644
index 0000000000..61dd950214
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "kudu_sink_1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "kudu_sink_2"
+              fields {
+                        id = int
+                        val_bool = boolean
+                        val_int8 = tinyint
+                        val_int16 = smallint
+                        val_int32 = int
+                        val_int64 = bigint
+                        val_float = float
+                        val_double = double
+                        val_decimal = "decimal(16, 1)"
+                        val_string = string
+                        val_unixtime_micros = timestamp
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+
+sink {
+   kudu{
+    kudu_masters = "kudu-master-multiple:7051"
+ }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
index 8e1fb641a7..93c7114850 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
@@ -21,8 +21,7 @@
 env {
   # You can set engine configuration here
   execution.parallelism = 1
-  job.mode = "STREAMING"
-  execution.checkpoint.interval = 5000
+  job.mode = "BATCH"
 }
 
 source {

Reply via email to