This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f2145dcc4f [improve] dingtalk sink options (#8742)
f2145dcc4f is described below

commit f2145dcc4fea13b883d655afbd07d7a995899a84
Author: Jarvis <[email protected]>
AuthorDate: Mon Feb 17 17:24:55 2025 +0800

    [improve] dingtalk sink options (#8742)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  1 -
 ...ingTalkConfig.java => DingTalkSinkOptions.java} |  2 +-
 .../connectors/seatunnel/sink/DingTalkSink.java    | 48 +++++-----------------
 .../seatunnel/sink/DingTalkSinkFactory.java        | 11 ++++-
 4 files changed, 21 insertions(+), 41 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 3f08796f76..e8ea468741 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -178,7 +178,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("PulsarSinkOptions");
         whiteList.add("HttpSinkOptions");
         whiteList.add("SlsSinkOptions");
-        whiteList.add("DingTalkSinkOptions");
         whiteList.add("Neo4jSinkOptions");
         whiteList.add("MaxcomputeSinkOptions");
         whiteList.add("PaimonSinkOptions");
diff --git 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkConfig.java
 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkSinkOptions.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkConfig.java
rename to 
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkSinkOptions.java
index f1dba52731..4d0e9c72d2 100644
--- 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkSinkOptions.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class DingTalkConfig {
+public class DingTalkSinkOptions {
 
     public static final Option<String> URL =
             Options.key("url")
diff --git 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
index 42ec1e688a..5884063bd1 100644
--- 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
+++ 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -17,67 +17,41 @@
 
 package org.apache.seatunnel.connectors.seatunnel.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter.Context;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.exception.DingTalkConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
-import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.SECRET;
+import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.URL;
 
-/** DingTalk sink class */
-@AutoService(SeaTunnelSink.class)
 public class DingTalkSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private Config pluginConfig;
-    private SeaTunnelRowType seaTunnelRowType;
-
-    @Override
-    public String getPluginName() {
-        return "DingTalk";
-    }
+    private final ReadonlyConfig pluginConfig;
+    private final CatalogTable catalogTable;
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        if (pluginConfig.getIsNull(URL.key())) {
-            throw new DingTalkConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format("Config must include column : %s", 
URL.key()));
-        }
-        if (pluginConfig.getIsNull(SECRET.key())) {
-            throw new DingTalkConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format("Config must include column : %s", 
SECRET.key()));
-        }
+    public DingTalkSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
         this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return "DingTalk";
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context 
context) throws IOException {
-        return new DingTalkWriter(
-                pluginConfig.getString(URL.key()), 
pluginConfig.getString(SECRET.key()));
+        return new DingTalkWriter(pluginConfig.get(URL), 
pluginConfig.get(SECRET));
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
index c013be46dd..355bfcc425 100644
--- 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
@@ -18,13 +18,15 @@
 package org.apache.seatunnel.connectors.seatunnel.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
-import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.SECRET;
+import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.URL;
 
 @AutoService(Factory.class)
 public class DingTalkSinkFactory implements TableSinkFactory {
@@ -37,4 +39,9 @@ public class DingTalkSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder().required(URL, SECRET).build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new DingTalkSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }

Reply via email to