This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 84be0f9fd [improve][SelectDB] Add a jobId to the selectDB label to
distinguish between tasks (#4864)
84be0f9fd is described below
commit 84be0f9fd057b1680d001de38802ce1c28d79f04
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Thu Jun 1 11:56:26 2023 +0800
[improve][SelectDB] Add a jobId to the selectDB label to distinguish
between tasks (#4864)
Co-authored-by: zhouyao <[email protected]>
---
.../connectors/selectdb/sink/SelectDBSink.java | 23 ++++++++++++++--------
.../sink/writer/SelectDBSinkStateSerializer.java | 5 +++--
.../selectdb/sink/writer/SelectDBSinkWriter.java | 6 ++++--
3 files changed, 22 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index e8789fb2e..c095f9401 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.selectdb.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -59,6 +60,7 @@ public class SelectDBSink
SeaTunnelRow, SelectDBSinkState, SelectDBCommitInfo,
SelectDBCommitInfo> {
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
+ private String jobId;
@Override
public String getPluginName() {
@@ -85,6 +87,11 @@ public class SelectDBSink
}
}
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobId = jobContext.getJobId();
+ }
+
@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
@@ -98,20 +105,20 @@ public class SelectDBSink
@Override
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
- SelectDBSinkWriter dorisWriter =
+ SelectDBSinkWriter selectDBSinkWriter =
new SelectDBSinkWriter(
- context, Collections.emptyList(), seaTunnelRowType,
pluginConfig);
- dorisWriter.initializeLoad(Collections.emptyList());
- return dorisWriter;
+ context, Collections.emptyList(), seaTunnelRowType,
pluginConfig, jobId);
+ selectDBSinkWriter.initializeLoad(Collections.emptyList());
+ return selectDBSinkWriter;
}
@Override
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState>
restoreWriter(
SinkWriter.Context context, List<SelectDBSinkState> states) throws
IOException {
- SelectDBSinkWriter dorisWriter =
- new SelectDBSinkWriter(context, states, seaTunnelRowType,
pluginConfig);
- dorisWriter.initializeLoad(states);
- return dorisWriter;
+ SelectDBSinkWriter selectDBSinkWriter =
+ new SelectDBSinkWriter(context, states, seaTunnelRowType,
pluginConfig, jobId);
+ selectDBSinkWriter.initializeLoad(states);
+ return selectDBSinkWriter;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
index 1546d3b4d..640a77e6a 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java
@@ -29,10 +29,11 @@ import java.io.IOException;
public class SelectDBSinkStateSerializer implements
Serializer<SelectDBSinkState> {
@Override
- public byte[] serialize(SelectDBSinkState obj) throws IOException {
+ public byte[] serialize(SelectDBSinkState selectDBSinkState) throws
IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
- out.writeUTF(obj.getLabelPrefix());
+ out.writeUTF(selectDBSinkState.getLabelPrefix());
+ out.writeLong(selectDBSinkState.getCheckpointId());
out.flush();
return baos.toByteArray();
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
index c284694c5..8a0bd0440 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -54,7 +54,8 @@ public class SelectDBSinkWriter
SinkWriter.Context context,
List<SelectDBSinkState> state,
SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig) {
+ Config pluginConfig,
+ String jobId) {
this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
this.lastCheckpointId = state.size() != 0 ?
state.get(0).getCheckpointId() : 0;
log.info("restore checkpointId {}", lastCheckpointId);
@@ -62,7 +63,8 @@ public class SelectDBSinkWriter
log.info("labelPrefix " + selectdbConfig.getLabelPrefix());
this.selectdbSinkState =
new SelectDBSinkState(selectdbConfig.getLabelPrefix(),
lastCheckpointId);
- this.labelPrefix = selectdbConfig.getLabelPrefix() + "_" +
context.getIndexOfSubtask();
+ this.labelPrefix =
+ selectdbConfig.getLabelPrefix() + "_" + jobId + "_" +
context.getIndexOfSubtask();
this.lineDelimiter =
selectdbConfig
.getStageLoadProps()