This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 19b9d5bcce [Improve][Connectors-v2] Refactor Slack sink using Factory
to create instance (#10514)
19b9d5bcce is described below
commit 19b9d5bcce404dd2a42fe7ff8816f5a10b7cf602
Author: Jarvis <[email protected]>
AuthorDate: Wed Mar 25 17:06:11 2026 +0800
[Improve][Connectors-v2] Refactor Slack sink using Factory to create
instance (#10514)
---
.../seatunnel/slack/client/SlackClient.java | 13 +++---
.../connectors/seatunnel/slack/sink/SlackSink.java | 47 ++++------------------
.../seatunnel/slack/sink/SlackSinkFactory.java | 7 ++++
.../seatunnel/slack/sink/SlackWriter.java | 5 +--
4 files changed, 23 insertions(+), 49 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
index 565d03bb69..8b037bbb5c 100644
---
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/client/SlackClient.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.slack.client;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
@@ -39,10 +38,10 @@ import static
org.apache.seatunnel.connectors.seatunnel.slack.config.SlackSinkOp
@Slf4j
public class SlackClient {
- private final Config pluginConfig;
+ private final ReadonlyConfig pluginConfig;
private final MethodsClient methodsClient;
- public SlackClient(Config pluginConfig) {
+ public SlackClient(ReadonlyConfig pluginConfig) {
this.pluginConfig = pluginConfig;
this.methodsClient = Slack.getInstance().methods();
}
@@ -58,10 +57,10 @@ public class SlackClient {
r ->
r
// The Token used to initialize app
-
.token(pluginConfig.getString(OAUTH_TOKEN.key())));
+
.token(pluginConfig.get(OAUTH_TOKEN)));
channels = conversationsListResponse.getChannels();
for (Conversation channel : channels) {
- if
(channel.getName().equals(pluginConfig.getString(SLACK_CHANNEL.key()))) {
+ if (channel.getName().equals(pluginConfig.get(SLACK_CHANNEL)))
{
conversionId = channel.getId();
// Break from for loop
break;
@@ -84,7 +83,7 @@ public class SlackClient {
r ->
r
// The Token used to initialize app
-
.token(pluginConfig.getString(SLACK_CHANNEL.key()))
+
.token(pluginConfig.get(SLACK_CHANNEL))
.channel(channelId)
.text(text));
publishMessageSuccess = chatPostMessageResponse.isOk();
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
index 214a08c403..eca6806d86 100644
---
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -17,44 +17,31 @@
package org.apache.seatunnel.connectors.seatunnel.slack.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackSinkOptions;
-import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorException;
-
-import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Optional;
/** Slack sink class */
-@AutoService(SeaTunnelSink.class)
public class SlackSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private Config pluginConfig;
- private SeaTunnelRowType seaTunnelRowType;
+ private ReadonlyConfig pluginConfig;
+ private CatalogTable catalogTable;
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public SlackSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+ this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new SlackWriter(seaTunnelRowType, pluginConfig);
+ return new SlackWriter(catalogTable.getSeaTunnelRowType(),
pluginConfig);
}
@Override
@@ -62,26 +49,8 @@ public class SlackSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
return "SlackSink";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- SlackSinkOptions.WEBHOOKS_URL.key(),
- SlackSinkOptions.OAUTH_TOKEN.key(),
- SlackSinkOptions.SLACK_CHANNEL.key());
- if (!checkResult.isSuccess()) {
- throw new SlackConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
checkResult.getMsg()));
- }
- this.pluginConfig = pluginConfig;
- }
-
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
index a9dd6accb6..4fbe80562e 100644
---
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSinkFactory.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.slack.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.slack.config.SlackSinkOptions;
import com.google.auto.service.AutoService;
@@ -40,4 +42,9 @@ public class SlackSinkFactory implements TableSinkFactory {
SlackSinkOptions.SLACK_CHANNEL)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new SlackSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
index 2fa7faec42..7c9cf69b62 100644
---
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.slack.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -39,7 +38,7 @@ public class SlackWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
private static final long POST_MSG_WAITING_TIME = 1500L;
- public SlackWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig)
{
+ public SlackWriter(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig
pluginConfig) {
this.seaTunnelRowType = seaTunnelRowType;
this.slackClient = new SlackClient(pluginConfig);
this.conversationId = slackClient.findConversation();