This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 70c7040746 [Improve][Common] Improve SinkFlowTestUtils Checkpoint 
Logic (#10071)
70c7040746 is described below

commit 70c7040746f89f72a2b21797eb280b79c55318b4
Author: Jast <[email protected]>
AuthorDate: Thu Jan 29 20:02:40 2026 +0800

    [Improve][Common] Improve SinkFlowTestUtils Checkpoint Logic (#10071)
---
 .github/workflows/backend.yml                      |   4 +-
 .../seatunnel/sink/SinkFlowTestUtils.java          | 338 +++++++++++++++++++--
 .../connector-mongodb-e2e/pom.xml                  |   7 +
 .../e2e/connector/v2/mongodb/MongodbIT.java        | 103 +++++--
 .../fake_source_to_transaction_sink_mongodb.conf   |  74 -----
 5 files changed, 403 insertions(+), 123 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 364e03ab6f..7389fe43f8 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -470,7 +470,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 180
+    timeout-minutes: 210
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
@@ -561,7 +561,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 180
+    timeout-minutes: 210
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
index 982ad50f67..2bd65c8e29 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
@@ -35,8 +35,11 @@ import org.apache.seatunnel.common.constants.JobMode;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class SinkFlowTestUtils {
 
@@ -58,11 +61,25 @@ public class SinkFlowTestUtils {
             TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
             List<SeaTunnelRow> rows)
             throws IOException {
+        runBatchWithCheckpointEnabled(
+                catalogTable,
+                options,
+                factory,
+                rows,
+                PeriodicCheckpointOptions.defaultSingleCheckpoint());
+    }
+
+    public static void runBatchWithCheckpointEnabled(
+            CatalogTable catalogTable,
+            ReadonlyConfig options,
+            TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+            List<SeaTunnelRow> rows,
+            PeriodicCheckpointOptions checkpointOptions)
+            throws IOException {
         JobContext context = new JobContext(System.currentTimeMillis());
         context.setJobMode(JobMode.BATCH);
         context.setEnableCheckpoint(true);
-        // TODO trigger checkpoint with interval
-        runWithContext(catalogTable, options, factory, rows, context, 1);
+        runWithContext(catalogTable, options, factory, rows, context, 1, 
checkpointOptions);
     }
 
     public static void runParallelSubtasksBatchWithCheckpointDisabled(
@@ -85,10 +102,35 @@ public class SinkFlowTestUtils {
             boolean checkpointEnabled,
             int parallelism)
             throws IOException {
+        runBatchWithMultiTableSink(
+                factory,
+                tableSinkFactoryContext,
+                rows,
+                checkpointEnabled,
+                parallelism,
+                checkpointEnabled
+                        ? PeriodicCheckpointOptions.defaultSingleCheckpoint()
+                        : PeriodicCheckpointOptions.neverTrigger());
+    }
+
+    public static void runBatchWithMultiTableSink(
+            TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+            TableSinkFactoryContext tableSinkFactoryContext,
+            List<SeaTunnelRow> rows,
+            boolean checkpointEnabled,
+            int parallelism,
+            PeriodicCheckpointOptions checkpointOptions)
+            throws IOException {
         JobContext context = new JobContext(System.currentTimeMillis());
         context.setJobMode(JobMode.BATCH);
         context.setEnableCheckpoint(checkpointEnabled);
-        runWithContext(factory, tableSinkFactoryContext, rows, context, 
parallelism);
+        runWithContext(
+                factory,
+                tableSinkFactoryContext,
+                rows,
+                context,
+                parallelism,
+                checkpointEnabled ? checkpointOptions : 
PeriodicCheckpointOptions.neverTrigger());
     }
 
     private static void runWithContext(
@@ -104,7 +146,33 @@ public class SinkFlowTestUtils {
                 new TableSinkFactoryContext(
                         catalogTable, options, 
Thread.currentThread().getContextClassLoader());
 
-        runWithContext(factory, tableSinkFactoryContext, rows, context, 
parallelism);
+        runWithContext(
+                factory,
+                tableSinkFactoryContext,
+                rows,
+                context,
+                parallelism,
+                context.isEnableCheckpoint()
+                        ? PeriodicCheckpointOptions.defaultSingleCheckpoint()
+                        : PeriodicCheckpointOptions.neverTrigger());
+    }
+
+    private static void runWithContext(
+            CatalogTable catalogTable,
+            ReadonlyConfig options,
+            TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+            List<SeaTunnelRow> rows,
+            JobContext context,
+            int parallelism,
+            PeriodicCheckpointOptions checkpointOptions)
+            throws IOException {
+
+        TableSinkFactoryContext tableSinkFactoryContext =
+                new TableSinkFactoryContext(
+                        catalogTable, options, 
Thread.currentThread().getContextClassLoader());
+
+        runWithContext(
+                factory, tableSinkFactoryContext, rows, context, parallelism, 
checkpointOptions);
     }
 
     private static void runWithContext(
@@ -112,31 +180,49 @@ public class SinkFlowTestUtils {
             TableSinkFactoryContext tableSinkFactoryContext,
             List<SeaTunnelRow> rows,
             JobContext context,
-            int parallelism)
+            int parallelism,
+            PeriodicCheckpointOptions checkpointOptions)
             throws IOException {
         SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink =
                 factory.createSink(tableSinkFactoryContext).createSink();
         sink.setJobContext(context);
-        List<Object> commitInfos = new ArrayList<>();
-        for (int i = 0; i < parallelism; i++) {
-            SinkWriter<SeaTunnelRow, ?, ?> sinkWriter =
-                    sink.createWriter(new DefaultSinkWriterContext(i, 
parallelism));
-            for (SeaTunnelRow row : rows) {
-                sinkWriter.write(row);
-            }
-            Optional<?> commitInfo = sinkWriter.prepareCommit(1);
-            sinkWriter.snapshotState(1);
-            sinkWriter.close();
-            if (commitInfo.isPresent()) {
-                commitInfos.add(commitInfo.get());
-            }
+        List<List<Object>> writerCheckpointInfos =
+                IntStream.range(0, parallelism)
+                        .mapToObj(i -> Collections.synchronizedList(new 
ArrayList<>()))
+                        .collect(Collectors.toList());
+
+        List<Throwable> asyncErrors = Collections.synchronizedList(new 
ArrayList<>());
+        IntStream.range(0, parallelism)
+                .parallel()
+                .forEach(
+                        writerIndex -> {
+                            try {
+                                runWriter(
+                                        sink,
+                                        rows,
+                                        checkpointOptions,
+                                        writerIndex,
+                                        parallelism,
+                                        
writerCheckpointInfos.get(writerIndex));
+                            } catch (Throwable t) {
+                                t.addSuppressed(
+                                        new RuntimeException("Writer " + 
writerIndex + " failed"));
+                                asyncErrors.add(t);
+                            }
+                        });
+
+        if (!asyncErrors.isEmpty()) {
+            rethrow(asyncErrors.get(0));
         }
 
+        LinkedHashMap<Long, List<Object>> checkpointCommitInfos =
+                buildCheckpointMap(writerCheckpointInfos);
+
         Optional<? extends SinkCommitter<?>> sinkCommitter = 
sink.createCommitter();
         Optional<? extends SinkAggregatedCommitter<?, ?>> 
aggregatedCommitterOptional =
                 sink.createAggregatedCommitter();
 
-        if (!commitInfos.isEmpty()) {
+        if (!checkpointCommitInfos.isEmpty()) {
             if (aggregatedCommitterOptional.isPresent()) {
                 SinkAggregatedCommitter<?, ?> aggregatedCommitter =
                         aggregatedCommitterOptional.get();
@@ -152,16 +238,220 @@ public class SinkFlowTestUtils {
                             .setMultiTableResourceManager(resourceManager, 0);
                 }
 
-                Object aggregatedCommitInfoT =
-                        ((SinkAggregatedCommitter) 
aggregatedCommitter).combine(commitInfos);
-                ((SinkAggregatedCommitter) aggregatedCommitter)
-                        
.commit(Collections.singletonList(aggregatedCommitInfoT));
+                for (List<Object> commitInfos : 
checkpointCommitInfos.values()) {
+                    Object aggregatedCommitInfoT =
+                            ((SinkAggregatedCommitter) 
aggregatedCommitter).combine(commitInfos);
+                    ((SinkAggregatedCommitter) aggregatedCommitter)
+                            
.commit(Collections.singletonList(aggregatedCommitInfoT));
+                }
                 aggregatedCommitter.close();
             } else if (sinkCommitter.isPresent()) {
-                ((SinkCommitter) sinkCommitter.get()).commit(commitInfos);
+                SinkCommitter sinkCommitterInstance = (SinkCommitter) 
sinkCommitter.get();
+                for (List<Object> commitInfos : 
checkpointCommitInfos.values()) {
+                    sinkCommitterInstance.commit(commitInfos);
+                }
             } else {
                 throw new RuntimeException("No committer found");
             }
         }
     }
+
+    private static void runWriter(
+            SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink,
+            List<SeaTunnelRow> rows,
+            PeriodicCheckpointOptions checkpointOptions,
+            int writerIndex,
+            int parallelism,
+            List<Object> currentWriterCommits)
+            throws IOException {
+        SinkWriter<SeaTunnelRow, ?, ?> sinkWriter =
+                sink.createWriter(new DefaultSinkWriterContext(writerIndex, 
parallelism));
+        long lastCheckpointTs = System.currentTimeMillis();
+        int recordsSinceLastCheckpoint = 0;
+        CheckpointState checkpointState = new CheckpointState();
+        for (SeaTunnelRow row : rows) {
+            sinkWriter.write(row);
+            recordsSinceLastCheckpoint++;
+            if (shouldTriggerCheckpoint(
+                            checkpointOptions, recordsSinceLastCheckpoint, 
lastCheckpointTs)
+                    && triggerCheckpoint(
+                            sinkWriter,
+                            checkpointOptions,
+                            checkpointState,
+                            currentWriterCommits,
+                            false)) {
+                recordsSinceLastCheckpoint = 0;
+                lastCheckpointTs = System.currentTimeMillis();
+            }
+        }
+        boolean needsFinalCheckpoint =
+                recordsSinceLastCheckpoint > 0
+                        || checkpointState.triggeredCount == 0
+                        || checkpointOptions.isTriggerOnFinish();
+        if (needsFinalCheckpoint) {
+            triggerCheckpoint(
+                    sinkWriter, checkpointOptions, checkpointState, 
currentWriterCommits, true);
+        }
+        sinkWriter.close();
+    }
+
+    private static boolean shouldTriggerCheckpoint(
+            PeriodicCheckpointOptions options,
+            int recordsSinceLastCheckpoint,
+            long lastCheckpointTs) {
+        if (!options.enablePeriodicTrigger()) {
+            return false;
+        }
+        boolean triggerByRecord =
+                options.getRecordsPerCheckpoint() > 0
+                        && recordsSinceLastCheckpoint >= 
options.getRecordsPerCheckpoint();
+        boolean triggerByInterval =
+                options.getIntervalMillis() > 0
+                        && (System.currentTimeMillis() - lastCheckpointTs)
+                                >= options.getIntervalMillis();
+        return triggerByRecord || triggerByInterval;
+    }
+
+    private static boolean triggerCheckpoint(
+            SinkWriter<SeaTunnelRow, ?, ?> sinkWriter,
+            PeriodicCheckpointOptions options,
+            CheckpointState checkpointState,
+            List<Object> writerCheckpointInfos,
+            boolean force)
+            throws IOException {
+        if (!force && !options.canTrigger(checkpointState.triggeredCount)) {
+            return false;
+        }
+        long checkpointId = checkpointState.nextCheckpointId();
+        Optional<?> commitInfo = sinkWriter.prepareCommit(checkpointId);
+        sinkWriter.snapshotState(checkpointId);
+        if (commitInfo.isPresent()) {
+            writerCheckpointInfos.add(commitInfo.get());
+        }
+        checkpointState.incrementTriggeredCount();
+        return true;
+    }
+
+    private static LinkedHashMap<Long, List<Object>> buildCheckpointMap(
+            List<List<Object>> writerCheckpointInfos) {
+        LinkedHashMap<Long, List<Object>> checkpointCommitInfos = new 
LinkedHashMap<>();
+        int rounds = 0;
+        for (List<Object> infos : writerCheckpointInfos) {
+            rounds = Math.max(rounds, infos.size());
+        }
+        long checkpointId = 1L;
+        for (int round = 0; round < rounds; round++) {
+            List<Object> aggregatedInfos = new ArrayList<>();
+            for (List<Object> writerInfos : writerCheckpointInfos) {
+                if (round < writerInfos.size()) {
+                    aggregatedInfos.add(writerInfos.get(round));
+                }
+            }
+            if (!aggregatedInfos.isEmpty()) {
+                checkpointCommitInfos.put(checkpointId++, aggregatedInfos);
+            }
+        }
+        return checkpointCommitInfos;
+    }
+
+    private static class CheckpointState {
+        private long checkpointId = 1L;
+        private int triggeredCount = 0;
+
+        private long nextCheckpointId() {
+            return checkpointId++;
+        }
+
+        private void incrementTriggeredCount() {
+            triggeredCount++;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <E extends Throwable> void rethrow(Throwable throwable) 
throws E {
+        throw (E) throwable;
+    }
+
+    public static final class PeriodicCheckpointOptions {
+        private final int recordsPerCheckpoint;
+        private final long intervalMillis;
+        private final int maxCheckpointCount;
+        private final boolean triggerOnFinish;
+
+        private PeriodicCheckpointOptions(Builder builder) {
+            this.recordsPerCheckpoint = builder.recordsPerCheckpoint;
+            this.intervalMillis = builder.intervalMillis;
+            this.maxCheckpointCount = builder.maxCheckpointCount;
+            this.triggerOnFinish = builder.triggerOnFinish;
+        }
+
+        public static Builder builder() {
+            return new Builder();
+        }
+
+        public static PeriodicCheckpointOptions defaultSingleCheckpoint() {
+            return 
builder().maxCheckpointCount(1).triggerOnFinish(true).build();
+        }
+
+        public static PeriodicCheckpointOptions neverTrigger() {
+            return 
builder().maxCheckpointCount(0).triggerOnFinish(false).build();
+        }
+
+        public int getRecordsPerCheckpoint() {
+            return recordsPerCheckpoint;
+        }
+
+        public long getIntervalMillis() {
+            return intervalMillis;
+        }
+
+        public boolean isTriggerOnFinish() {
+            return triggerOnFinish;
+        }
+
+        private boolean enablePeriodicTrigger() {
+            return recordsPerCheckpoint > 0 || intervalMillis > 0;
+        }
+
+        private boolean canTrigger(int triggeredCount) {
+            return maxCheckpointCount <= 0 || triggeredCount < 
maxCheckpointCount;
+        }
+
+        public static final class Builder {
+            private int recordsPerCheckpoint = 0;
+            private long intervalMillis = 0L;
+            private int maxCheckpointCount = 1;
+            private boolean triggerOnFinish = true;
+
+            public Builder recordsPerCheckpoint(int recordsPerCheckpoint) {
+                if (recordsPerCheckpoint < 0) {
+                    throw new IllegalArgumentException("recordsPerCheckpoint 
must be >= 0");
+                }
+                this.recordsPerCheckpoint = recordsPerCheckpoint;
+                return this;
+            }
+
+            public Builder intervalMillis(long intervalMillis) {
+                if (intervalMillis < 0) {
+                    throw new IllegalArgumentException("intervalMillis must be 
>= 0");
+                }
+                this.intervalMillis = intervalMillis;
+                return this;
+            }
+
+            public Builder maxCheckpointCount(int maxCheckpointCount) {
+                this.maxCheckpointCount = maxCheckpointCount;
+                return this;
+            }
+
+            public Builder triggerOnFinish(boolean triggerOnFinish) {
+                this.triggerOnFinish = triggerOnFinish;
+                return this;
+            }
+
+            public PeriodicCheckpointOptions build() {
+                return new PeriodicCheckpointOptions(this);
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
index 96c097c7fa..35439ba63c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
@@ -44,5 +44,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 711d6203bb..070b04c301 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.e2e.connector.v2.mongodb;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SinkWriter;
@@ -26,16 +27,21 @@ import 
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongoKeyExtractor;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSink;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils.PeriodicCheckpointOptions;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -47,12 +53,14 @@ import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 
 import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Sorts;
 import com.mongodb.client.model.WriteModel;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
@@ -225,28 +233,8 @@ public class MongodbIT extends AbstractMongodbIT {
     @TestTemplate
     public void testTransactionSinkAndUpsert(TestContainer container)
             throws IOException, InterruptedException {
-        Container.ExecResult insertResult =
-                
container.executeJob("/transactionIT/fake_source_to_transaction_sink_mongodb.conf");
-        Assertions.assertEquals(0, insertResult.getExitCode(), 
insertResult.getStderr());
-
-        Container.ExecResult assertSinkResult =
-                container.executeJob(
-                        
"/transactionIT/mongodb_source_transaction_sink_to_assert.conf");
-        Assertions.assertEquals(0, assertSinkResult.getExitCode(), 
assertSinkResult.getStderr());
-
-        Container.ExecResult upsertResult =
-                container.executeJob(
-                        
"/transactionIT/fake_source_to_transaction_upsert_mongodb.conf");
-        Assertions.assertEquals(0, upsertResult.getExitCode(), 
upsertResult.getStderr());
-
-        Container.ExecResult assertUpsertResult =
-                container.executeJob(
-                        
"/transactionIT/mongodb_source_transaction_upsert_to_assert.conf");
-        Assertions.assertEquals(
-                0, assertUpsertResult.getExitCode(), 
assertUpsertResult.getStderr());
-
-        clearData(MONGODB_TRANSACTION_SINK_TABLE);
-        clearData(MONGODB_TRANSACTION_UPSERT_TABLE);
+        runTransactionSinkFlow(MONGODB_TRANSACTION_SINK_TABLE, false);
+        runTransactionSinkFlow(MONGODB_TRANSACTION_UPSERT_TABLE, true);
     }
 
     @TestTemplate
@@ -432,7 +420,7 @@ public class MongodbIT extends AbstractMongodbIT {
     }
 
     private TableSchema getTableSchema() {
-        return new TableSchema(getColumns(), null, null);
+        return TableSchema.builder().columns(getColumns()).build();
     }
 
     private List<Column> getColumns() {
@@ -442,4 +430,73 @@ public class MongodbIT extends AbstractMongodbIT {
         columns.add(new PhysicalColumn("score", BasicType.INT_TYPE, 32L, 0, 
true, "", ""));
         return columns;
     }
+
+    private void runTransactionSinkFlow(String collection, boolean upsert) 
throws IOException {
+        clearData(collection);
+        List<SeaTunnelRow> rows = createTransactionRows(upsert);
+        SinkFlowTestUtils.runBatchWithCheckpointEnabled(
+                getCatalogTable(collection),
+                getTransactionSinkOptions(collection, upsert),
+                new MongodbSinkFactory(),
+                rows,
+                PeriodicCheckpointOptions.builder()
+                        .recordsPerCheckpoint(2)
+                        .maxCheckpointCount(5)
+                        .triggerOnFinish(true)
+                        .build());
+        assertTransactionSinkResult(collection, upsert);
+        clearData(collection);
+    }
+
+    private List<SeaTunnelRow> createTransactionRows(boolean upsert) {
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        rows.add(createRow(RowKind.INSERT, 1L, "alpha", 10));
+        rows.add(createRow(RowKind.INSERT, 2L, "beta", 20));
+        rows.add(createRow(RowKind.INSERT, 3L, "gamma", 30));
+        if (upsert) {
+            rows.add(createRow(RowKind.UPDATE_AFTER, 2L, "beta-updated", 200));
+        }
+        return rows;
+    }
+
+    private SeaTunnelRow createRow(RowKind kind, long id, String name, int 
score) {
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name, score});
+        row.setRowKind(kind);
+        return row;
+    }
+
+    private ReadonlyConfig getTransactionSinkOptions(String collection, 
boolean upsert) {
+        String host = mongodbContainer.getHost();
+        int port = mongodbContainer.getFirstMappedPort();
+        String uri = String.format("mongodb://%s:%d", host, port);
+        HashMap<String, Object> config = new HashMap<>();
+        config.put(MongodbSinkOptions.URI.key(), uri);
+        config.put(MongodbSinkOptions.DATABASE.key(), MONGODB_DATABASE);
+        config.put(MongodbSinkOptions.COLLECTION.key(), collection);
+        config.put(MongodbSinkOptions.TRANSACTION.key(), true);
+        config.put(MongodbSinkOptions.DATA_SAVE_MODE.key(), 
DataSaveMode.APPEND_DATA);
+        config.put(MongodbSinkOptions.BUFFER_FLUSH_MAX_ROWS.key(), 2);
+        if (upsert) {
+            config.put(MongodbSinkOptions.UPSERT_ENABLE.key(), true);
+            config.put(MongodbSinkOptions.PRIMARY_KEY.key(), 
Arrays.asList("c_int"));
+        }
+        return ReadonlyConfig.fromMap(config);
+    }
+
+    private void assertTransactionSinkResult(String collection, boolean 
upsert) {
+        MongoCollection<Document> mongoCollection =
+                client.getDatabase(MONGODB_DATABASE).getCollection(collection);
+        List<Document> documents =
+                mongoCollection.find().sort(Sorts.ascending("c_int")).into(new 
ArrayList<>());
+        Assertions.assertEquals(3, documents.size());
+        Assertions.assertEquals("alpha", documents.get(0).getString("name"));
+        if (upsert) {
+            Assertions.assertEquals("beta-updated", 
documents.get(1).getString("name"));
+            Assertions.assertEquals(200, documents.get(1).getInteger("score"));
+        } else {
+            Assertions.assertEquals("beta", 
documents.get(1).getString("name"));
+            Assertions.assertEquals(20, documents.get(1).getInteger("score"));
+        }
+        Assertions.assertEquals("gamma", documents.get(2).getString("name"));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
deleted file mode 100644
index 8aaf669f9b..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
+++ /dev/null
@@ -1,74 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 1
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    row.num = 50
-    int.template = [3]
-    split.num = 5
-    split.read-interval = 100
-    plugin_output = "mongodb_table"
-    schema = {
-      fields {
-        c_map = "map<string, string>"
-        c_array = "array<int>"
-        c_string = string
-        c_boolean = boolean
-        c_int = int
-        c_bigint = bigint
-        c_double = double
-        c_bytes = bytes
-        c_date = date
-        c_decimal = "decimal(33, 18)"
-        c_timestamp = timestamp
-        c_row = {
-          c_map = "map<string, string>"
-          c_array = "array<int>"
-          c_string = string
-          c_boolean = boolean
-          c_int = int
-          c_bigint = bigint
-          c_double = double
-          c_bytes = bytes
-          c_date = date
-          c_decimal = "decimal(33, 18)"
-          c_timestamp = timestamp
-        }
-      }
-    }
-  }
-}
-
-sink {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017"
-    database = "test_db"
-    collection = "test_source_transaction_sink_table"
-    transaction = true
-  }
-}
\ No newline at end of file

Reply via email to