This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c7a384af2b [Improve][Connector-v2] Use checkpointId as the commit's
identifier instead of the hash for streaming write of paimon sink (#7835)
c7a384af2b is described below
commit c7a384af2b8bff37861ec249bff6670100bda9b2
Author: dailai <[email protected]>
AuthorDate: Tue Oct 15 17:26:51 2024 +0800
[Improve][Connector-v2] Use checkpointId as the commit's identifier instead
of the hash for streaming write of paimon sink (#7835)
---
.../org/apache/seatunnel/api/sink/SinkWriter.java | 15 ++++++++
.../sink/multitablesink/MultiTableSinkWriter.java | 9 ++++-
.../multitablesink/MultiTableSinkWriterTest.java | 2 +-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 12 +++++--
.../sink/commit/PaimonAggregatedCommitInfo.java | 4 ++-
.../sink/commit/PaimonAggregatedCommitter.java | 42 +++++++++++++++-------
.../paimon/sink/commit/PaimonCommitInfo.java | 2 ++
.../engine/server/task/flow/SinkFlowLifeCycle.java | 2 +-
.../translation/flink/sink/FlinkSinkWriter.java | 2 +-
.../spark/sink/writer/SparkDataWriter.java | 2 +-
.../spark/sink/write/SeaTunnelSparkDataWriter.java | 2 +-
11 files changed, 72 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 4567e98cbf..330580b980 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -62,8 +62,23 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
*
* @return the commit info need to commit
*/
+ @Deprecated
Optional<CommitInfoT> prepareCommit() throws IOException;
+ /**
+ * prepare the commit, will be called before {@link #snapshotState(long
checkpointId)}. If you
+ * need to use 2pc, you can return the commit info in this method, and
receive the commit info
+ * in {@link SinkCommitter#commit(List)}. If this method failed (by throw
exception), **Only**
+ * Spark engine will call {@link #abortPrepare()}
+ *
+ * @param checkpointId checkpointId
+ * @return the commit info need to commit
+ * @throws IOException If fail to prepareCommit
+ */
+ default Optional<CommitInfoT> prepareCommit(long checkpointId) throws
IOException {
+ return prepareCommit();
+ }
+
/**
* @return The writer's state.
* @throws IOException if fail to snapshot writer's state.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index f01c3d65dc..f5b30be537 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -220,6 +220,11 @@ public class MultiTableSinkWriter
@Override
public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<MultiTableCommitInfo> prepareCommit(long checkpointId)
throws IOException {
checkQueueRemain();
subSinkErrorCheck();
MultiTableCommitInfo multiTableCommitInfo =
@@ -238,7 +243,9 @@ public class MultiTableSinkWriter
.entrySet()) {
Optional<?> commit;
try {
- commit =
sinkWriterEntry.getValue().prepareCommit();
+ SinkWriter<SeaTunnelRow, ?, ?>
sinkWriter =
+ sinkWriterEntry.getValue();
+ commit =
sinkWriter.prepareCommit(checkpointId);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
index 66e0ff0d4e..86722eb246 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
@@ -57,7 +57,7 @@ public class MultiTableSinkWriterTest {
DefaultSerializer<Serializable> defaultSerializer = new
DefaultSerializer<>();
for (int i = 0; i < 100; i++) {
- byte[] bytes =
defaultSerializer.serialize(multiTableSinkWriter.prepareCommit().get());
+ byte[] bytes =
defaultSerializer.serialize(multiTableSinkWriter.prepareCommit(i).get());
defaultSerializer.deserialize(bytes);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 7a3fe6d033..8cc6d0d485 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -130,6 +130,7 @@ public class PaimonSinkWriter
return;
}
this.commitUser = states.get(0).getCommitUser();
+ long checkpointId = states.get(0).getCheckpointId();
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> commitables =
states.stream()
@@ -142,7 +143,7 @@ public class PaimonSinkWriter
((BatchTableCommit) tableCommit).commit(commitables);
} else {
log.debug("Trying to recommit states streaming mode");
- ((StreamTableCommit)
tableCommit).commit(Objects.hash(commitables), commitables);
+ ((StreamTableCommit) tableCommit).commit(checkpointId,
commitables);
}
} catch (Exception e) {
throw new PaimonConnectorException(
@@ -174,16 +175,21 @@ public class PaimonSinkWriter
@Override
public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<PaimonCommitInfo> prepareCommit(long checkpointId) throws
IOException {
try {
List<CommitMessage> fileCommittables;
if (JobContextUtil.isBatchJob(jobContext)) {
fileCommittables = ((BatchTableWrite)
tableWrite).prepareCommit();
} else {
fileCommittables =
- ((StreamTableWrite) tableWrite).prepareCommit(false,
committables.size());
+ ((StreamTableWrite) tableWrite).prepareCommit(false,
checkpointId);
}
committables.addAll(fileCommittables);
- return Optional.of(new PaimonCommitInfo(fileCommittables));
+ return Optional.of(new PaimonCommitInfo(fileCommittables,
checkpointId));
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_PRE_COMMIT_FAILED,
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
index 8a7ad84a2e..83ed71f615 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
@@ -24,6 +24,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
/** Paimon connector aggregate commit information class. */
@Data
@@ -32,5 +33,6 @@ public class PaimonAggregatedCommitInfo implements
Serializable {
private static final long serialVersionUID = 1;
- private List<List<CommitMessage>> committables;
+ // key: checkpointId value: Paimon commit message List
+ private Map<Long, List<CommitMessage>> committablesMap;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 5c3f68f336..a3e457907e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -36,10 +36,11 @@ import org.apache.paimon.table.sink.WriteBuilder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
/** Paimon connector aggregated committer class */
@@ -70,21 +71,32 @@ public class PaimonAggregatedCommitter
public List<PaimonAggregatedCommitInfo> commit(
List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
- List<CommitMessage> fileCommittables =
- aggregatedCommitInfo.stream()
- .map(PaimonAggregatedCommitInfo::getCommittables)
- .flatMap(List::stream)
- .flatMap(List::stream)
- .collect(Collectors.toList());
PaimonSecurityContext.runSecured(
() -> {
if (JobContextUtil.isBatchJob(jobContext)) {
log.debug("Trying to commit states batch mode");
+ List<CommitMessage> fileCommittables =
+ aggregatedCommitInfo.stream()
+ .flatMap(
+ info ->
+
info.getCommittablesMap().values()
+ .stream())
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
((BatchTableCommit)
tableCommit).commit(fileCommittables);
} else {
log.debug("Trying to commit states streaming
mode");
- ((StreamTableCommit) tableCommit)
- .commit(Objects.hash(fileCommittables),
fileCommittables);
+ aggregatedCommitInfo.stream()
+ .flatMap(
+ paimonAggregatedCommitInfo ->
+
paimonAggregatedCommitInfo.getCommittablesMap()
+
.entrySet().stream())
+ .forEach(
+ entry ->
+ ((StreamTableCommit)
tableCommit)
+ .commit(
+
entry.getKey(),
+
entry.getValue()));
}
return null;
});
@@ -99,8 +111,14 @@ public class PaimonAggregatedCommitter
@Override
public PaimonAggregatedCommitInfo combine(List<PaimonCommitInfo>
commitInfos) {
- List<List<CommitMessage>> committables = new ArrayList<>();
- commitInfos.forEach(commitInfo ->
committables.add(commitInfo.getCommittables()));
+ Map<Long, List<CommitMessage>> committables = new HashMap<>();
+ commitInfos.forEach(
+ commitInfo ->
+ committables
+ .computeIfAbsent(
+ commitInfo.getCheckpointId(),
+ id -> new CopyOnWriteArrayList<>())
+ .addAll(commitInfo.getCommittables()));
return new PaimonAggregatedCommitInfo(committables);
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
index 9927973821..1d9844103f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
@@ -32,4 +32,6 @@ public class PaimonCommitInfo implements Serializable {
private static final long serialVersionUID = 1L;
List<CommitMessage> committables;
+
+ Long checkpointId;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index bce6e9f637..5970d9a745 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -185,7 +185,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
}
if (barrier.snapshot()) {
try {
- lastCommitInfo = writer.prepareCommit();
+ lastCommitInfo = writer.prepareCommit(barrier.getId());
} catch (Exception e) {
writer.abortPrepare();
throw e;
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 7a47052c01..3ee6e3533c 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -101,7 +101,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
@Override
public List<CommitWrapper<CommT>> prepareCommit(boolean flush) throws
IOException {
- Optional<CommT> commTOptional = sinkWriter.prepareCommit();
+ Optional<CommT> commTOptional = sinkWriter.prepareCommit(checkpointId);
return commTOptional
.map(CommitWrapper::new)
.map(Collections::singletonList)
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
index a9eac50062..c34a0783eb 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
@@ -87,7 +87,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements
DataWriter<Internal
// 2. commit fails
// 2.1. We have the commit info, we need to execute the
sinkCommitter#abort to rollback
// the transaction.
- Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
+ Optional<CommitInfoT> commitInfoTOptional =
sinkWriter.prepareCommit(epochId);
commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT =
commitInfoT);
sinkWriter.snapshotState(epochId++);
if (sinkCommitter != null) {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
index c2c24aa914..1a97f6e618 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
@@ -79,7 +79,7 @@ public class SeaTunnelSparkDataWriter<CommitInfoT, StateT>
implements DataWriter
@Override
public WriterCommitMessage commit() throws IOException {
- Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
+ Optional<CommitInfoT> commitInfoTOptional =
sinkWriter.prepareCommit(epochId);
commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT =
commitInfoT);
sinkWriter.snapshotState(epochId++);
if (sinkCommitter != null) {