This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84be0f9fd [improve][SelectDB] Add a jobId to the selectDB label to 
distinguish between tasks (#4864)
84be0f9fd is described below

commit 84be0f9fd057b1680d001de38802ce1c28d79f04
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Thu Jun 1 11:56:26 2023 +0800

    [improve][SelectDB] Add a jobId to the selectDB label to distinguish 
between tasks (#4864)
    
    Co-authored-by: zhouyao <[email protected]>
---
 .../connectors/selectdb/sink/SelectDBSink.java     | 23 ++++++++++++++--------
 .../sink/writer/SelectDBSinkStateSerializer.java   |  5 +++--
 .../selectdb/sink/writer/SelectDBSinkWriter.java   |  6 ++++--
 3 files changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index e8789fb2e..c095f9401 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.selectdb.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -59,6 +60,7 @@ public class SelectDBSink
                 SeaTunnelRow, SelectDBSinkState, SelectDBCommitInfo, 
SelectDBCommitInfo> {
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
+    private String jobId;
 
     @Override
     public String getPluginName() {
@@ -85,6 +87,11 @@ public class SelectDBSink
         }
     }
 
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobId = jobContext.getJobId();
+    }
+
     @Override
     public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
@@ -98,20 +105,20 @@ public class SelectDBSink
     @Override
     public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
-        SelectDBSinkWriter dorisWriter =
+        SelectDBSinkWriter selectDBSinkWriter =
                 new SelectDBSinkWriter(
-                        context, Collections.emptyList(), seaTunnelRowType, 
pluginConfig);
-        dorisWriter.initializeLoad(Collections.emptyList());
-        return dorisWriter;
+                        context, Collections.emptyList(), seaTunnelRowType, 
pluginConfig, jobId);
+        selectDBSinkWriter.initializeLoad(Collections.emptyList());
+        return selectDBSinkWriter;
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> 
restoreWriter(
             SinkWriter.Context context, List<SelectDBSinkState> states) throws 
IOException {
-        SelectDBSinkWriter dorisWriter =
-                new SelectDBSinkWriter(context, states, seaTunnelRowType, 
pluginConfig);
-        dorisWriter.initializeLoad(states);
-        return dorisWriter;
+        SelectDBSinkWriter selectDBSinkWriter =
+                new SelectDBSinkWriter(context, states, seaTunnelRowType, 
pluginConfig, jobId);
+        selectDBSinkWriter.initializeLoad(states);
+        return selectDBSinkWriter;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
index 1546d3b4d..640a77e6a 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
@@ -29,10 +29,11 @@ import java.io.IOException;
 public class SelectDBSinkStateSerializer implements 
Serializer<SelectDBSinkState> {
 
     @Override
-    public byte[] serialize(SelectDBSinkState obj) throws IOException {
+    public byte[] serialize(SelectDBSinkState selectDBSinkState) throws 
IOException {
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 final DataOutputStream out = new DataOutputStream(baos)) {
-            out.writeUTF(obj.getLabelPrefix());
+            out.writeUTF(selectDBSinkState.getLabelPrefix());
+            out.writeLong(selectDBSinkState.getCheckpointId());
             out.flush();
             return baos.toByteArray();
         }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
index c284694c5..8a0bd0440 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -54,7 +54,8 @@ public class SelectDBSinkWriter
             SinkWriter.Context context,
             List<SelectDBSinkState> state,
             SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig) {
+            Config pluginConfig,
+            String jobId) {
         this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
         this.lastCheckpointId = state.size() != 0 ? 
state.get(0).getCheckpointId() : 0;
         log.info("restore checkpointId {}", lastCheckpointId);
@@ -62,7 +63,8 @@ public class SelectDBSinkWriter
         log.info("labelPrefix " + selectdbConfig.getLabelPrefix());
         this.selectdbSinkState =
                 new SelectDBSinkState(selectdbConfig.getLabelPrefix(), 
lastCheckpointId);
-        this.labelPrefix = selectdbConfig.getLabelPrefix() + "_" + 
context.getIndexOfSubtask();
+        this.labelPrefix =
+                selectdbConfig.getLabelPrefix() + "_" + jobId + "_" + 
context.getIndexOfSubtask();
         this.lineDelimiter =
                 selectdbConfig
                         .getStageLoadProps()

Reply via email to