This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 82460c0bf0 [Feature][Connector-V2] Support multi-table sink feature
for kudu (#5951)
82460c0bf0 is described below
commit 82460c0bf0e88a40070590c3b0615d9335aa4ceb
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Thu Dec 14 13:36:49 2023 +0800
[Feature][Connector-V2] Support multi-table sink feature for kudu (#5951)
---
docs/en/connector-v2/sink/Kudu.md | 72 ++++++++++++++++
.../connectors/seatunnel/kudu/sink/KuduSink.java | 9 +-
.../seatunnel/kudu/sink/KuduSinkFactory.java | 12 ++-
.../seatunnel/kudu/sink/KuduSinkWriter.java | 5 +-
.../seatunnel/kudu/source/KuduSource.java | 2 -
.../e2e/connector/kudu/KuduCDCSinkIT.java | 2 +
.../seatunnel/e2e/connector/kudu/KuduIT.java | 2 +
.../connector/kudu/KuduWIthMultipleTableIT.java | 97 +++++++++++++++++++---
.../resources/fake_to_kudu_with_multipletable.conf | 87 +++++++++++++++++++
.../kudu_to_assert_with_multipletable.conf | 3 +-
10 files changed, 270 insertions(+), 21 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kudu.md
b/docs/en/connector-v2/sink/Kudu.md
index b6e4eee24c..f885240498 100644
--- a/docs/en/connector-v2/sink/Kudu.md
+++ b/docs/en/connector-v2/sink/Kudu.md
@@ -123,6 +123,78 @@ sink {
}
```
+### Multiple Table
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "kudu_sink_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "kudu_sink_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+
+sink {
+ kudu{
+ kudu_masters = "kudu-master-multiple:7051"
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index 0cc827f1ed..898016b5cf 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,18 +29,16 @@ import
org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommit
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
/**
* Kudu Sink implementation by using SeaTunnel sink API. This class contains
the method to create
* {@link AbstractSimpleSink}.
*/
-@AutoService(SeaTunnelSink.class)
public class KuduSink
implements SeaTunnelSink<
- SeaTunnelRow, KuduSinkState, KuduCommitInfo,
KuduAggregatedCommitInfo> {
+ SeaTunnelRow, KuduSinkState, KuduCommitInfo,
KuduAggregatedCommitInfo>,
+ SupportMultiTableSink {
private KuduSinkConfig kuduSinkConfig;
private SeaTunnelRowType seaTunnelRowType;
@@ -51,7 +50,7 @@ public class KuduSink
@Override
public String getPluginName() {
- return "kudu";
+ return "Kudu";
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
index fc94e94b88..3917d1cd62 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java
@@ -29,6 +29,8 @@ import
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import com.google.auto.service.AutoService;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import static
org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
import static
org.apache.kudu.client.SessionConfiguration.FlushMode.MANUAL_FLUSH;
@@ -43,7 +45,8 @@ public class KuduSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(KuduSinkConfig.MASTER, KuduSinkConfig.TABLE_NAME)
+ .required(KuduSinkConfig.MASTER)
+ .optional(KuduSinkConfig.TABLE_NAME)
.optional(KuduSinkConfig.WORKER_COUNT)
.optional(KuduSinkConfig.OPERATION_TIMEOUT)
.optional(KuduSinkConfig.ADMIN_OPERATION_TIMEOUT)
@@ -73,6 +76,13 @@ public class KuduSinkFactory implements TableSinkFactory {
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
+ if (!config.getOptional(KuduSinkConfig.TABLE_NAME).isPresent()) {
+ Map<String, String> map = config.toMap();
+ map.put(
+ KuduSinkConfig.TABLE_NAME.key(),
+ catalogTable.getTableId().toTablePath().getFullName());
+ config = ReadonlyConfig.fromMap(new HashMap<>(map));
+ }
KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
return () -> new KuduSink(kuduSinkConfig, catalogTable);
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
index acf5b4ca2e..82a28c0451 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
@@ -32,7 +33,9 @@ import java.io.IOException;
import java.util.Optional;
@Slf4j
-public class KuduSinkWriter implements SinkWriter<SeaTunnelRow,
KuduCommitInfo, KuduSinkState> {
+public class KuduSinkWriter
+ implements SinkWriter<SeaTunnelRow, KuduCommitInfo, KuduSinkState>,
+ SupportMultiTableSinkWriter<Void> {
private SeaTunnelRowType seaTunnelRowType;
private KuduOutputFormat fileWriter;
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index 57b7925b0d..9490208af5 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -28,14 +28,12 @@ import
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
-import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
-@AutoService(SeaTunnelSource.class)
public class KuduSource
implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit,
KuduSourceState>,
SupportParallelism {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
index dd45471060..733e0d39c9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
@@ -36,6 +36,7 @@ import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -273,6 +274,7 @@ public class KuduCDCSinkIT extends TestSuiteBase implements
TestResource {
}
@Override
+ @AfterAll
public void tearDown() throws Exception {
if (kuduClient != null) {
kuduClient.close();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index ae26aff796..b0c8cba1fe 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -39,6 +39,7 @@ import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -274,6 +275,7 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
}
@Override
+ @AfterAll
public void tearDown() throws Exception {
if (kuduClient != null) {
kuduClient.close();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
index 11cd819b6c..cc05653d5c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
@@ -32,11 +32,15 @@ import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -62,9 +66,11 @@ import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
+import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
@@ -158,12 +164,11 @@ public class KuduWIthMultipleTableIT extends
TestSuiteBase implements TestResour
row.addObject("val_decimal", new BigDecimal("1.1212"));
row.addObject("val_string", "test");
row.addObject("val_unixtime_micros", new
java.sql.Timestamp(1693477266998L));
- row.addObject("val_binary", "NEW".getBytes());
OperationResponse response = kuduSession.apply(insert);
}
}
- private void initializeKuduTable() throws KuduException {
+ private void initializeKuduTable(String tableName) throws KuduException {
List<ColumnSchema> columns = new ArrayList();
@@ -210,10 +215,6 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase
implements TestResour
new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros",
Type.UNIXTIME_MICROS)
.nullable(true)
.build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_binary", Type.BINARY)
- .nullable(true)
- .build());
Schema schema = new Schema(columns);
@@ -222,8 +223,7 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase
implements TestResour
tableOptions.addHashPartitions(hashKeys, 2);
tableOptions.setNumReplicas(1);
- kuduClient.createTable("kudu_source_table_1", schema, tableOptions);
- kuduClient.createTable("kudu_source_table_2", schema, tableOptions);
+ kuduClient.createTable(tableName, schema, tableOptions);
}
private void getKuduClient() {
@@ -238,8 +238,10 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase
implements TestResour
}
@TestTemplate
- public void testKudu(TestContainer container) throws IOException,
InterruptedException {
- initializeKuduTable();
+ public void testKuduMultipleRead(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_source_table_1");
+ initializeKuduTable("kudu_source_table_2");
batchInsertData("kudu_source_table_1");
batchInsertData("kudu_source_table_2");
Container.ExecResult execResult =
@@ -249,7 +251,62 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase
implements TestResour
kuduClient.deleteTable("kudu_source_table_2");
}
+ @TestTemplate
+ public void testKuduMultipleWrite(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_sink_1");
+ initializeKuduTable("kudu_sink_2");
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_kudu_with_multipletable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+
Arrays.asList(
+
"1",
+
"true",
+
"1",
+
"2",
+
"3",
+
"4",
+
"4.3",
+
"5.3",
+
"6.30000",
+
"NEW",
+
"2020-02-02 02:02:02.0"))
+
.collect(Collectors.toList()),
+ readData("kudu_sink_1"));
+ },
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+
Arrays.asList(
+
"1",
+
"true",
+
"1",
+
"2",
+
"3",
+
"4",
+
"4.3",
+
"5.3",
+
"6.30000",
+
"NEW",
+
"2020-02-02 02:02:02.0"))
+
.collect(Collectors.toList()),
+ readData("kudu_sink_2"));
+ }));
+
+ kuduClient.deleteTable("kudu_sink_1");
+ kuduClient.deleteTable("kudu_sink_2");
+ }
+
@Override
+ @AfterAll
public void tearDown() throws Exception {
if (kuduClient != null) {
kuduClient.close();
@@ -264,6 +321,26 @@ public class KuduWIthMultipleTableIT extends TestSuiteBase
implements TestResour
}
}
+ public List<List<Object>> readData(String tableName) throws KuduException {
+ List<List<Object>> result = new ArrayList<>();
+ KuduTable kuduTable = kuduClient.openTable(tableName);
+ KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
+ while (scanner.hasMoreRows()) {
+ RowResultIterator rowResults = scanner.nextRows();
+ List<Object> row = new ArrayList<>();
+ while (rowResults.hasNext()) {
+ RowResult rowResult = rowResults.next();
+ for (int i = 0; i < rowResult.getSchema().getColumns().size();
i++) {
+ row.add(rowResult.getObject(i).toString());
+ }
+ }
+ if (!row.isEmpty()) {
+ result.add(row);
+ }
+ }
+ return result;
+ }
+
private static String getHostIPAddress() {
try {
Enumeration<NetworkInterface> networkInterfaceEnumeration =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
new file mode 100644
index 0000000000..61dd950214
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "kudu_sink_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "kudu_sink_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+
+sink {
+ kudu{
+ kudu_masters = "kudu-master-multiple:7051"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
index 8e1fb641a7..93c7114850 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
@@ -21,8 +21,7 @@
env {
# You can set engine configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
- execution.checkpoint.interval = 5000
+ job.mode = "BATCH"
}
source {