This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 70c7040746 [Improve][Common] Improve SinkFlowTestUtils Checkpoint
Logic (#10071)
70c7040746 is described below
commit 70c7040746f89f72a2b21797eb280b79c55318b4
Author: Jast <[email protected]>
AuthorDate: Thu Jan 29 20:02:40 2026 +0800
[Improve][Common] Improve SinkFlowTestUtils Checkpoint Logic (#10071)
---
.github/workflows/backend.yml | 4 +-
.../seatunnel/sink/SinkFlowTestUtils.java | 338 +++++++++++++++++++--
.../connector-mongodb-e2e/pom.xml | 7 +
.../e2e/connector/v2/mongodb/MongodbIT.java | 103 +++++--
.../fake_source_to_transaction_sink_mongodb.conf | 74 -----
5 files changed, 403 insertions(+), 123 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 364e03ab6f..7389fe43f8 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -470,7 +470,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 180
+ timeout-minutes: 210
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
@@ -561,7 +561,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 180
+ timeout-minutes: 210
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
index 982ad50f67..2bd65c8e29 100644
---
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
+++
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
@@ -35,8 +35,11 @@ import org.apache.seatunnel.common.constants.JobMode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class SinkFlowTestUtils {
@@ -58,11 +61,25 @@ public class SinkFlowTestUtils {
TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
List<SeaTunnelRow> rows)
throws IOException {
+ runBatchWithCheckpointEnabled(
+ catalogTable,
+ options,
+ factory,
+ rows,
+ PeriodicCheckpointOptions.defaultSingleCheckpoint());
+ }
+
+ public static void runBatchWithCheckpointEnabled(
+ CatalogTable catalogTable,
+ ReadonlyConfig options,
+ TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+ List<SeaTunnelRow> rows,
+ PeriodicCheckpointOptions checkpointOptions)
+ throws IOException {
JobContext context = new JobContext(System.currentTimeMillis());
context.setJobMode(JobMode.BATCH);
context.setEnableCheckpoint(true);
- // TODO trigger checkpoint with interval
- runWithContext(catalogTable, options, factory, rows, context, 1);
+ runWithContext(catalogTable, options, factory, rows, context, 1,
checkpointOptions);
}
public static void runParallelSubtasksBatchWithCheckpointDisabled(
@@ -85,10 +102,35 @@ public class SinkFlowTestUtils {
boolean checkpointEnabled,
int parallelism)
throws IOException {
+ runBatchWithMultiTableSink(
+ factory,
+ tableSinkFactoryContext,
+ rows,
+ checkpointEnabled,
+ parallelism,
+ checkpointEnabled
+ ? PeriodicCheckpointOptions.defaultSingleCheckpoint()
+ : PeriodicCheckpointOptions.neverTrigger());
+ }
+
+ public static void runBatchWithMultiTableSink(
+ TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+ TableSinkFactoryContext tableSinkFactoryContext,
+ List<SeaTunnelRow> rows,
+ boolean checkpointEnabled,
+ int parallelism,
+ PeriodicCheckpointOptions checkpointOptions)
+ throws IOException {
JobContext context = new JobContext(System.currentTimeMillis());
context.setJobMode(JobMode.BATCH);
context.setEnableCheckpoint(checkpointEnabled);
- runWithContext(factory, tableSinkFactoryContext, rows, context,
parallelism);
+ runWithContext(
+ factory,
+ tableSinkFactoryContext,
+ rows,
+ context,
+ parallelism,
+ checkpointEnabled ? checkpointOptions :
PeriodicCheckpointOptions.neverTrigger());
}
private static void runWithContext(
@@ -104,7 +146,33 @@ public class SinkFlowTestUtils {
new TableSinkFactoryContext(
catalogTable, options,
Thread.currentThread().getContextClassLoader());
- runWithContext(factory, tableSinkFactoryContext, rows, context,
parallelism);
+ runWithContext(
+ factory,
+ tableSinkFactoryContext,
+ rows,
+ context,
+ parallelism,
+ context.isEnableCheckpoint()
+ ? PeriodicCheckpointOptions.defaultSingleCheckpoint()
+ : PeriodicCheckpointOptions.neverTrigger());
+ }
+
+ private static void runWithContext(
+ CatalogTable catalogTable,
+ ReadonlyConfig options,
+ TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+ List<SeaTunnelRow> rows,
+ JobContext context,
+ int parallelism,
+ PeriodicCheckpointOptions checkpointOptions)
+ throws IOException {
+
+ TableSinkFactoryContext tableSinkFactoryContext =
+ new TableSinkFactoryContext(
+ catalogTable, options,
Thread.currentThread().getContextClassLoader());
+
+ runWithContext(
+ factory, tableSinkFactoryContext, rows, context, parallelism,
checkpointOptions);
}
private static void runWithContext(
@@ -112,31 +180,49 @@ public class SinkFlowTestUtils {
TableSinkFactoryContext tableSinkFactoryContext,
List<SeaTunnelRow> rows,
JobContext context,
- int parallelism)
+ int parallelism,
+ PeriodicCheckpointOptions checkpointOptions)
throws IOException {
SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink =
factory.createSink(tableSinkFactoryContext).createSink();
sink.setJobContext(context);
- List<Object> commitInfos = new ArrayList<>();
- for (int i = 0; i < parallelism; i++) {
- SinkWriter<SeaTunnelRow, ?, ?> sinkWriter =
- sink.createWriter(new DefaultSinkWriterContext(i,
parallelism));
- for (SeaTunnelRow row : rows) {
- sinkWriter.write(row);
- }
- Optional<?> commitInfo = sinkWriter.prepareCommit(1);
- sinkWriter.snapshotState(1);
- sinkWriter.close();
- if (commitInfo.isPresent()) {
- commitInfos.add(commitInfo.get());
- }
+ List<List<Object>> writerCheckpointInfos =
+ IntStream.range(0, parallelism)
+ .mapToObj(i -> Collections.synchronizedList(new
ArrayList<>()))
+ .collect(Collectors.toList());
+
+ List<Throwable> asyncErrors = Collections.synchronizedList(new
ArrayList<>());
+ IntStream.range(0, parallelism)
+ .parallel()
+ .forEach(
+ writerIndex -> {
+ try {
+ runWriter(
+ sink,
+ rows,
+ checkpointOptions,
+ writerIndex,
+ parallelism,
+
writerCheckpointInfos.get(writerIndex));
+ } catch (Throwable t) {
+ t.addSuppressed(
+ new RuntimeException("Writer " +
writerIndex + " failed"));
+ asyncErrors.add(t);
+ }
+ });
+
+ if (!asyncErrors.isEmpty()) {
+ rethrow(asyncErrors.get(0));
}
+ LinkedHashMap<Long, List<Object>> checkpointCommitInfos =
+ buildCheckpointMap(writerCheckpointInfos);
+
Optional<? extends SinkCommitter<?>> sinkCommitter =
sink.createCommitter();
Optional<? extends SinkAggregatedCommitter<?, ?>>
aggregatedCommitterOptional =
sink.createAggregatedCommitter();
- if (!commitInfos.isEmpty()) {
+ if (!checkpointCommitInfos.isEmpty()) {
if (aggregatedCommitterOptional.isPresent()) {
SinkAggregatedCommitter<?, ?> aggregatedCommitter =
aggregatedCommitterOptional.get();
@@ -152,16 +238,220 @@ public class SinkFlowTestUtils {
.setMultiTableResourceManager(resourceManager, 0);
}
- Object aggregatedCommitInfoT =
- ((SinkAggregatedCommitter)
aggregatedCommitter).combine(commitInfos);
- ((SinkAggregatedCommitter) aggregatedCommitter)
-
.commit(Collections.singletonList(aggregatedCommitInfoT));
+ for (List<Object> commitInfos :
checkpointCommitInfos.values()) {
+ Object aggregatedCommitInfoT =
+ ((SinkAggregatedCommitter)
aggregatedCommitter).combine(commitInfos);
+ ((SinkAggregatedCommitter) aggregatedCommitter)
+
.commit(Collections.singletonList(aggregatedCommitInfoT));
+ }
aggregatedCommitter.close();
} else if (sinkCommitter.isPresent()) {
- ((SinkCommitter) sinkCommitter.get()).commit(commitInfos);
+ SinkCommitter sinkCommitterInstance = (SinkCommitter)
sinkCommitter.get();
+ for (List<Object> commitInfos :
checkpointCommitInfos.values()) {
+ sinkCommitterInstance.commit(commitInfos);
+ }
} else {
throw new RuntimeException("No committer found");
}
}
}
+
+ private static void runWriter(
+ SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink,
+ List<SeaTunnelRow> rows,
+ PeriodicCheckpointOptions checkpointOptions,
+ int writerIndex,
+ int parallelism,
+ List<Object> currentWriterCommits)
+ throws IOException {
+ SinkWriter<SeaTunnelRow, ?, ?> sinkWriter =
+ sink.createWriter(new DefaultSinkWriterContext(writerIndex,
parallelism));
+ long lastCheckpointTs = System.currentTimeMillis();
+ int recordsSinceLastCheckpoint = 0;
+ CheckpointState checkpointState = new CheckpointState();
+ for (SeaTunnelRow row : rows) {
+ sinkWriter.write(row);
+ recordsSinceLastCheckpoint++;
+ if (shouldTriggerCheckpoint(
+ checkpointOptions, recordsSinceLastCheckpoint,
lastCheckpointTs)
+ && triggerCheckpoint(
+ sinkWriter,
+ checkpointOptions,
+ checkpointState,
+ currentWriterCommits,
+ false)) {
+ recordsSinceLastCheckpoint = 0;
+ lastCheckpointTs = System.currentTimeMillis();
+ }
+ }
+ boolean needsFinalCheckpoint =
+ recordsSinceLastCheckpoint > 0
+ || checkpointState.triggeredCount == 0
+ || checkpointOptions.isTriggerOnFinish();
+ if (needsFinalCheckpoint) {
+ triggerCheckpoint(
+ sinkWriter, checkpointOptions, checkpointState,
currentWriterCommits, true);
+ }
+ sinkWriter.close();
+ }
+
+ private static boolean shouldTriggerCheckpoint(
+ PeriodicCheckpointOptions options,
+ int recordsSinceLastCheckpoint,
+ long lastCheckpointTs) {
+ if (!options.enablePeriodicTrigger()) {
+ return false;
+ }
+ boolean triggerByRecord =
+ options.getRecordsPerCheckpoint() > 0
+ && recordsSinceLastCheckpoint >=
options.getRecordsPerCheckpoint();
+ boolean triggerByInterval =
+ options.getIntervalMillis() > 0
+ && (System.currentTimeMillis() - lastCheckpointTs)
+ >= options.getIntervalMillis();
+ return triggerByRecord || triggerByInterval;
+ }
+
+ private static boolean triggerCheckpoint(
+ SinkWriter<SeaTunnelRow, ?, ?> sinkWriter,
+ PeriodicCheckpointOptions options,
+ CheckpointState checkpointState,
+ List<Object> writerCheckpointInfos,
+ boolean force)
+ throws IOException {
+ if (!force && !options.canTrigger(checkpointState.triggeredCount)) {
+ return false;
+ }
+ long checkpointId = checkpointState.nextCheckpointId();
+ Optional<?> commitInfo = sinkWriter.prepareCommit(checkpointId);
+ sinkWriter.snapshotState(checkpointId);
+ if (commitInfo.isPresent()) {
+ writerCheckpointInfos.add(commitInfo.get());
+ }
+ checkpointState.incrementTriggeredCount();
+ return true;
+ }
+
+ private static LinkedHashMap<Long, List<Object>> buildCheckpointMap(
+ List<List<Object>> writerCheckpointInfos) {
+ LinkedHashMap<Long, List<Object>> checkpointCommitInfos = new
LinkedHashMap<>();
+ int rounds = 0;
+ for (List<Object> infos : writerCheckpointInfos) {
+ rounds = Math.max(rounds, infos.size());
+ }
+ long checkpointId = 1L;
+ for (int round = 0; round < rounds; round++) {
+ List<Object> aggregatedInfos = new ArrayList<>();
+ for (List<Object> writerInfos : writerCheckpointInfos) {
+ if (round < writerInfos.size()) {
+ aggregatedInfos.add(writerInfos.get(round));
+ }
+ }
+ if (!aggregatedInfos.isEmpty()) {
+ checkpointCommitInfos.put(checkpointId++, aggregatedInfos);
+ }
+ }
+ return checkpointCommitInfos;
+ }
+
+ private static class CheckpointState {
+ private long checkpointId = 1L;
+ private int triggeredCount = 0;
+
+ private long nextCheckpointId() {
+ return checkpointId++;
+ }
+
+ private void incrementTriggeredCount() {
+ triggeredCount++;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <E extends Throwable> void rethrow(Throwable throwable)
throws E {
+ throw (E) throwable;
+ }
+
+ public static final class PeriodicCheckpointOptions {
+ private final int recordsPerCheckpoint;
+ private final long intervalMillis;
+ private final int maxCheckpointCount;
+ private final boolean triggerOnFinish;
+
+ private PeriodicCheckpointOptions(Builder builder) {
+ this.recordsPerCheckpoint = builder.recordsPerCheckpoint;
+ this.intervalMillis = builder.intervalMillis;
+ this.maxCheckpointCount = builder.maxCheckpointCount;
+ this.triggerOnFinish = builder.triggerOnFinish;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static PeriodicCheckpointOptions defaultSingleCheckpoint() {
+ return
builder().maxCheckpointCount(1).triggerOnFinish(true).build();
+ }
+
+ public static PeriodicCheckpointOptions neverTrigger() {
+ return
builder().maxCheckpointCount(0).triggerOnFinish(false).build();
+ }
+
+ public int getRecordsPerCheckpoint() {
+ return recordsPerCheckpoint;
+ }
+
+ public long getIntervalMillis() {
+ return intervalMillis;
+ }
+
+ public boolean isTriggerOnFinish() {
+ return triggerOnFinish;
+ }
+
+ private boolean enablePeriodicTrigger() {
+ return recordsPerCheckpoint > 0 || intervalMillis > 0;
+ }
+
+ private boolean canTrigger(int triggeredCount) {
+ return maxCheckpointCount <= 0 || triggeredCount <
maxCheckpointCount;
+ }
+
+ public static final class Builder {
+ private int recordsPerCheckpoint = 0;
+ private long intervalMillis = 0L;
+ private int maxCheckpointCount = 1;
+ private boolean triggerOnFinish = true;
+
+ public Builder recordsPerCheckpoint(int recordsPerCheckpoint) {
+ if (recordsPerCheckpoint < 0) {
+ throw new IllegalArgumentException("recordsPerCheckpoint
must be >= 0");
+ }
+ this.recordsPerCheckpoint = recordsPerCheckpoint;
+ return this;
+ }
+
+ public Builder intervalMillis(long intervalMillis) {
+ if (intervalMillis < 0) {
+ throw new IllegalArgumentException("intervalMillis must be
>= 0");
+ }
+ this.intervalMillis = intervalMillis;
+ return this;
+ }
+
+ public Builder maxCheckpointCount(int maxCheckpointCount) {
+ this.maxCheckpointCount = maxCheckpointCount;
+ return this;
+ }
+
+ public Builder triggerOnFinish(boolean triggerOnFinish) {
+ this.triggerOnFinish = triggerOnFinish;
+ return this;
+ }
+
+ public PeriodicCheckpointOptions build() {
+ return new PeriodicCheckpointOptions(this);
+ }
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
index 96c097c7fa..35439ba63c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
@@ -44,5 +44,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 711d6203bb..070b04c301 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.e2e.connector.v2.mongodb;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SinkWriter;
@@ -26,16 +27,21 @@ import
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongoKeyExtractor;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSink;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
+import
org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils.PeriodicCheckpointOptions;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -47,12 +53,14 @@ import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.WriteModel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
@@ -225,28 +233,8 @@ public class MongodbIT extends AbstractMongodbIT {
@TestTemplate
public void testTransactionSinkAndUpsert(TestContainer container)
throws IOException, InterruptedException {
- Container.ExecResult insertResult =
-
container.executeJob("/transactionIT/fake_source_to_transaction_sink_mongodb.conf");
- Assertions.assertEquals(0, insertResult.getExitCode(),
insertResult.getStderr());
-
- Container.ExecResult assertSinkResult =
- container.executeJob(
-
"/transactionIT/mongodb_source_transaction_sink_to_assert.conf");
- Assertions.assertEquals(0, assertSinkResult.getExitCode(),
assertSinkResult.getStderr());
-
- Container.ExecResult upsertResult =
- container.executeJob(
-
"/transactionIT/fake_source_to_transaction_upsert_mongodb.conf");
- Assertions.assertEquals(0, upsertResult.getExitCode(),
upsertResult.getStderr());
-
- Container.ExecResult assertUpsertResult =
- container.executeJob(
-
"/transactionIT/mongodb_source_transaction_upsert_to_assert.conf");
- Assertions.assertEquals(
- 0, assertUpsertResult.getExitCode(),
assertUpsertResult.getStderr());
-
- clearData(MONGODB_TRANSACTION_SINK_TABLE);
- clearData(MONGODB_TRANSACTION_UPSERT_TABLE);
+ runTransactionSinkFlow(MONGODB_TRANSACTION_SINK_TABLE, false);
+ runTransactionSinkFlow(MONGODB_TRANSACTION_UPSERT_TABLE, true);
}
@TestTemplate
@@ -432,7 +420,7 @@ public class MongodbIT extends AbstractMongodbIT {
}
private TableSchema getTableSchema() {
- return new TableSchema(getColumns(), null, null);
+ return TableSchema.builder().columns(getColumns()).build();
}
private List<Column> getColumns() {
@@ -442,4 +430,73 @@ public class MongodbIT extends AbstractMongodbIT {
columns.add(new PhysicalColumn("score", BasicType.INT_TYPE, 32L, 0,
true, "", ""));
return columns;
}
+
+ private void runTransactionSinkFlow(String collection, boolean upsert)
throws IOException {
+ clearData(collection);
+ List<SeaTunnelRow> rows = createTransactionRows(upsert);
+ SinkFlowTestUtils.runBatchWithCheckpointEnabled(
+ getCatalogTable(collection),
+ getTransactionSinkOptions(collection, upsert),
+ new MongodbSinkFactory(),
+ rows,
+ PeriodicCheckpointOptions.builder()
+ .recordsPerCheckpoint(2)
+ .maxCheckpointCount(5)
+ .triggerOnFinish(true)
+ .build());
+ assertTransactionSinkResult(collection, upsert);
+ clearData(collection);
+ }
+
+ private List<SeaTunnelRow> createTransactionRows(boolean upsert) {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ rows.add(createRow(RowKind.INSERT, 1L, "alpha", 10));
+ rows.add(createRow(RowKind.INSERT, 2L, "beta", 20));
+ rows.add(createRow(RowKind.INSERT, 3L, "gamma", 30));
+ if (upsert) {
+ rows.add(createRow(RowKind.UPDATE_AFTER, 2L, "beta-updated", 200));
+ }
+ return rows;
+ }
+
+ private SeaTunnelRow createRow(RowKind kind, long id, String name, int
score) {
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name, score});
+ row.setRowKind(kind);
+ return row;
+ }
+
+ private ReadonlyConfig getTransactionSinkOptions(String collection,
boolean upsert) {
+ String host = mongodbContainer.getHost();
+ int port = mongodbContainer.getFirstMappedPort();
+ String uri = String.format("mongodb://%s:%d", host, port);
+ HashMap<String, Object> config = new HashMap<>();
+ config.put(MongodbSinkOptions.URI.key(), uri);
+ config.put(MongodbSinkOptions.DATABASE.key(), MONGODB_DATABASE);
+ config.put(MongodbSinkOptions.COLLECTION.key(), collection);
+ config.put(MongodbSinkOptions.TRANSACTION.key(), true);
+ config.put(MongodbSinkOptions.DATA_SAVE_MODE.key(),
DataSaveMode.APPEND_DATA);
+ config.put(MongodbSinkOptions.BUFFER_FLUSH_MAX_ROWS.key(), 2);
+ if (upsert) {
+ config.put(MongodbSinkOptions.UPSERT_ENABLE.key(), true);
+ config.put(MongodbSinkOptions.PRIMARY_KEY.key(),
Arrays.asList("c_int"));
+ }
+ return ReadonlyConfig.fromMap(config);
+ }
+
+ private void assertTransactionSinkResult(String collection, boolean
upsert) {
+ MongoCollection<Document> mongoCollection =
+ client.getDatabase(MONGODB_DATABASE).getCollection(collection);
+ List<Document> documents =
+ mongoCollection.find().sort(Sorts.ascending("c_int")).into(new
ArrayList<>());
+ Assertions.assertEquals(3, documents.size());
+ Assertions.assertEquals("alpha", documents.get(0).getString("name"));
+ if (upsert) {
+ Assertions.assertEquals("beta-updated",
documents.get(1).getString("name"));
+ Assertions.assertEquals(200, documents.get(1).getInteger("score"));
+ } else {
+ Assertions.assertEquals("beta",
documents.get(1).getString("name"));
+ Assertions.assertEquals(20, documents.get(1).getInteger("score"));
+ }
+ Assertions.assertEquals("gamma", documents.get(2).getString("name"));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
deleted file mode 100644
index 8aaf669f9b..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
+++ /dev/null
@@ -1,74 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 1
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- row.num = 50
- int.template = [3]
- split.num = 5
- split.read-interval = 100
- plugin_output = "mongodb_table"
- schema = {
- fields {
- c_map = "map<string, string>"
- c_array = "array<int>"
- c_string = string
- c_boolean = boolean
- c_int = int
- c_bigint = bigint
- c_double = double
- c_bytes = bytes
- c_date = date
- c_decimal = "decimal(33, 18)"
- c_timestamp = timestamp
- c_row = {
- c_map = "map<string, string>"
- c_array = "array<int>"
- c_string = string
- c_boolean = boolean
- c_int = int
- c_bigint = bigint
- c_double = double
- c_bytes = bytes
- c_date = date
- c_decimal = "decimal(33, 18)"
- c_timestamp = timestamp
- }
- }
- }
- }
-}
-
-sink {
- MongoDB {
- uri = "mongodb://e2e_mongodb:27017"
- database = "test_db"
- collection = "test_source_transaction_sink_table"
- transaction = true
- }
-}
\ No newline at end of file