This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f2145dcc4f [improve] dingtalk sink options (#8742)
f2145dcc4f is described below
commit f2145dcc4fea13b883d655afbd07d7a995899a84
Author: Jarvis <[email protected]>
AuthorDate: Mon Feb 17 17:24:55 2025 +0800
[improve] dingtalk sink options (#8742)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
...ingTalkConfig.java => DingTalkSinkOptions.java} | 2 +-
.../connectors/seatunnel/sink/DingTalkSink.java | 48 +++++-----------------
.../seatunnel/sink/DingTalkSinkFactory.java | 11 ++++-
4 files changed, 21 insertions(+), 41 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 3f08796f76..e8ea468741 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -178,7 +178,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("PulsarSinkOptions");
whiteList.add("HttpSinkOptions");
whiteList.add("SlsSinkOptions");
- whiteList.add("DingTalkSinkOptions");
whiteList.add("Neo4jSinkOptions");
whiteList.add("MaxcomputeSinkOptions");
whiteList.add("PaimonSinkOptions");
diff --git
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkConfig.java
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkSinkOptions.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkConfig.java
rename to
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkSinkOptions.java
index f1dba52731..4d0e9c72d2 100644
---
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkConfig.java
+++
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/DingTalkSinkOptions.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class DingTalkConfig {
+public class DingTalkSinkOptions {
public static final Option<String> URL =
Options.key("url")
diff --git
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
index 42ec1e688a..5884063bd1 100644
---
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
+++
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -17,67 +17,41 @@
package org.apache.seatunnel.connectors.seatunnel.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.exception.DingTalkConnectorException;
-
-import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
-import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.SECRET;
+import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.URL;
-/** DingTalk sink class */
-@AutoService(SeaTunnelSink.class)
public class DingTalkSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private Config pluginConfig;
- private SeaTunnelRowType seaTunnelRowType;
-
- @Override
- public String getPluginName() {
- return "DingTalk";
- }
+ private final ReadonlyConfig pluginConfig;
+ private final CatalogTable catalogTable;
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- if (pluginConfig.getIsNull(URL.key())) {
- throw new DingTalkConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format("Config must include column : %s",
URL.key()));
- }
- if (pluginConfig.getIsNull(SECRET.key())) {
- throw new DingTalkConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format("Config must include column : %s",
SECRET.key()));
- }
+ public DingTalkSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public String getPluginName() {
+ return "DingTalk";
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context
context) throws IOException {
- return new DingTalkWriter(
- pluginConfig.getString(URL.key()),
pluginConfig.getString(SECRET.key()));
+ return new DingTalkWriter(pluginConfig.get(URL),
pluginConfig.get(SECRET));
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
index c013be46dd..355bfcc425 100644
---
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java
@@ -18,13 +18,15 @@
package org.apache.seatunnel.connectors.seatunnel.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
-import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.SECRET;
+import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkSinkOptions.URL;
@AutoService(Factory.class)
public class DingTalkSinkFactory implements TableSinkFactory {
@@ -37,4 +39,9 @@ public class DingTalkSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder().required(URL, SECRET).build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new DingTalkSink(context.getOptions(),
context.getCatalogTable());
+ }
}