This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7027162dec [Feature][Connector-V2] Datahub support multi-table sink
(#9212)
7027162dec is described below
commit 7027162dec8b928677140f2ccbe7f5c42a1ab5b9
Author: jiazhang <[email protected]>
AuthorDate: Wed Apr 23 11:29:15 2025 +0800
[Feature][Connector-V2] Datahub support multi-table sink (#9212)
Co-authored-by: jia zhang <[email protected]>
---
.../seatunnel/datahub/sink/DataHubSink.java | 7 ++-
.../seatunnel/datahub/sink/DataHubSinkFactory.java | 2 +
.../seatunnel/datahub/sink/DataHubWriter.java | 4 +-
.../seatunnel/e2e/connector/datahub/DatahubIT.java | 6 ++
.../resources/fakesource_to_multi_datahub.conf | 65 ++++++++++++++++++++++
5 files changed, 80 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
index d3d8b06e66..af21ecf2cc 100644
---
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
+++
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
@@ -19,10 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.datahub.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import java.io.IOException;
import java.util.Optional;
@@ -35,7 +35,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubSi
import static
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubSinkOptions.TIMEOUT;
import static
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubSinkOptions.TOPIC;
-public class DataHubSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class DataHubSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
private final ReadonlyConfig pluginConfig;
private final CatalogTable catalogTable;
@@ -51,7 +52,7 @@ public class DataHubSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context
context) throws IOException {
+ public DataHubWriter createWriter(Context context) throws IOException {
return new DataHubWriter(
catalogTable.getSeaTunnelRowType(),
pluginConfig.get(ENDPOINT),
diff --git
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSinkFactory.java
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSinkFactory.java
index d239ac0031..fcdcddf63d 100644
---
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSinkFactory.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.datahub.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -44,6 +45,7 @@ public class DataHubSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(ENDPOINT, ACCESS_ID, ACCESS_KEY, PROJECT, TOPIC,
TIMEOUT, RETRY_TIMES)
+ .optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubWriter.java
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubWriter.java
index 172bbc2a51..9144100b50 100644
---
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubWriter.java
+++
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.datahub.sink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -39,7 +40,8 @@ import java.util.List;
/** DataHub write class */
@Slf4j
-public class DataHubWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class DataHubWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private final DatahubClient dataHubClient;
private final String project;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/java/org/apache/seatunnel/e2e/connector/datahub/DatahubIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/java/org/apache/seatunnel/e2e/connector/datahub/DatahubIT.java
index 7777039ba8..a351a3e125 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/java/org/apache/seatunnel/e2e/connector/datahub/DatahubIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/java/org/apache/seatunnel/e2e/connector/datahub/DatahubIT.java
@@ -46,4 +46,10 @@ public class DatahubIT extends TestSuiteBase implements
TestResource {
Container.ExecResult execResult =
container.executeJob("/fakesource_to_datahub.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @TestTemplate
+ public void testDatahubMulti(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/fakesource_to_multi_datahub.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/resources/fakesource_to_multi_datahub.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/resources/fakesource_to_multi_datahub.conf
new file mode 100644
index 0000000000..5b0c0c217c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-datahub-e2e/src/test/resources/fakesource_to_multi_datahub.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+
+ tables_configs = [
+ {
+ row.num = 100
+ schema = {
+ table = "test3"
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ },
+ {
+ row.num = 200
+ schema = {
+ table = "test2"
+ fields {
+ name = "string"
+ id = "int"
+ }
+ }
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ DataHub {
+ endpoint = "xxx"
+ accessId = "xxx"
+ accessKey = "xxx"
+ project = "xxx"
+ topic = "${table_name}"
+ timeout = 3000
+ retryTimes = 3
+ }
+}
\ No newline at end of file