This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 800a0f857 [flink] align snapshot and checkpoint in split enumerator 
and source. (#1640)
800a0f857 is described below

commit 800a0f8577b5d56d9f6c5f9f832a86149ea917a2
Author: liming.1018 <[email protected]>
AuthorDate: Thu Jul 27 14:01:25 2023 +0800

    [flink] align snapshot and checkpoint in split enumerator and source. 
(#1640)
---
 .../generated/flink_connector_configuration.html   |  12 +
 .../table/source/InnerStreamTableScanImpl.java     |   7 +-
 .../paimon/table/source/SnapshotNotExistPlan.java  |  36 +++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  14 ++
 .../source/ContinuousFileSplitEnumerator.java      |  58 +++--
 .../flink/source/ContinuousFileStoreSource.java    |  15 +-
 .../paimon/flink/source/FileStoreSourceReader.java |  18 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    |  48 ++++
 .../AlignedContinuousFileSplitEnumerator.java      | 246 +++++++++++++++++++++
 .../align/AlignedContinuousFileStoreSource.java    |  91 ++++++++
 .../flink/source/align/AlignedSourceReader.java    |  87 ++++++++
 .../paimon/flink/source/align/CheckpointEvent.java |  67 ++++++
 .../flink/source/align/PlaceholderSplit.java       | 120 ++++++++++
 .../source/assigners/AlignedSplitAssigner.java     | 158 +++++++++++++
 .../source/ContinuousFileSplitEnumeratorTest.java  |  15 +-
 .../flink/source/FileStoreSourceReaderTest.java    |   6 +-
 .../AlignedContinuousFileSplitEnumeratorTest.java  | 173 +++++++++++++++
 .../source/align/AlignedSourceReaderTest.java      |  71 ++++++
 18 files changed, 1206 insertions(+), 36 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 60a4629e2..f8f15a34c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -122,5 +122,17 @@ under the License.
             <td>Integer</td>
             <td>Defines a custom parallelism for the unaware-bucket table 
compaction job. By default, if this option is not defined, the planner will 
derive the parallelism for each statement individually by also considering the 
global configuration.</td>
         </tr>
+        <tr>
+            <td><h5>source.checkpoint-align.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to align the flink checkpoint with the snapshot of the 
paimon table, If true, a checkpoint will only be made if a snapshot is 
consumed.</td>
+        </tr>
+        <tr>
+            <td><h5>source.checkpoint-align.timeout</h5></td>
+            <td style="word-wrap: break-word;">30 s</td>
+            <td>Duration</td>
+            <td>If the new snapshot has not been generated when the checkpoint 
starts to trigger, the enumerator will block the checkpoint and wait for the 
new snapshot. Set the maximum waiting time to avoid infinite waiting, if 
timeout, the checkpoint will fail. Note that it should be set smaller than the 
checkpoint timeout.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index c30871378..3cf3d9b10 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -40,8 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
-
 /** {@link StreamTableScan} implementation for streaming planning. */
 public class InnerStreamTableScanImpl extends AbstractInnerTableScan
         implements InnerStreamTableScan {
@@ -107,6 +105,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
             nextSnapshotId = currentSnapshotId + 1;
             isFullPhaseEnd =
                     
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
+            return DataFilePlan.fromResult(result);
         } else if (result instanceof StartingScanner.NextSnapshot) {
             nextSnapshotId = ((StartingScanner.NextSnapshot) 
result).nextSnapshotId();
             isFullPhaseEnd =
@@ -114,7 +113,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
                             && boundedChecker.shouldEndInput(
                                     snapshotManager.snapshot(nextSnapshotId - 
1));
         }
-        return DataFilePlan.fromResult(result);
+        return SnapshotNotExistPlan.INSTANCE;
     }
 
     private Plan nextPlan() {
@@ -136,7 +135,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
                 LOG.debug(
                         "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
                         nextSnapshotId);
-                return new DataFilePlan(Collections.emptyList());
+                return SnapshotNotExistPlan.INSTANCE;
             }
 
             Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
new file mode 100644
index 000000000..df58c8067
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import java.util.Collections;
+import java.util.List;
+
+/** This is used to distinguish the case where the snapshot does not exist and 
the plan is empty. */
+public class SnapshotNotExistPlan implements TableScan.Plan {
+    public static final SnapshotNotExistPlan INSTANCE = new 
SnapshotNotExistPlan();
+
+    private SnapshotNotExistPlan() {
+        // private
+    }
+
+    @Override
+    public List<Split> splits() {
+        return Collections.emptyList();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e4030760e..c56369fb3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -210,6 +210,20 @@ public class FlinkConnectorOptions {
                             "If true, flink will push down projection, 
filters, limit to the source. "
                                     + "The cost is that it is difficult to 
reuse the source in a job.");
 
+    public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
+            ConfigOptions.key("source.checkpoint-align.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to align the flink checkpoint with the 
snapshot of the paimon table, If true, a checkpoint will only be made if a 
snapshot is consumed.");
+
+    public static final ConfigOption<Duration> SOURCE_CHECKPOINT_ALIGN_TIMEOUT 
=
+            ConfigOptions.key("source.checkpoint-align.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "If the new snapshot has not been generated when 
the checkpoint starts to trigger, the enumerator will block the checkpoint and 
wait for the new snapshot. Set the maximum waiting time to avoid infinite 
waiting, if timeout, the checkpoint will fail. Note that it should be set 
smaller than the checkpoint timeout.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 0c695ebff..46bc71e5c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -56,23 +56,23 @@ public class ContinuousFileSplitEnumerator
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
 
-    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
+    protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
-    private final long discoveryInterval;
+    protected final long discoveryInterval;
 
-    private final Set<Integer> readersAwaitingSplit;
+    protected final Set<Integer> readersAwaitingSplit;
 
-    private final FileStoreSourceSplitGenerator splitGenerator;
+    protected final FileStoreSourceSplitGenerator splitGenerator;
 
-    private final StreamTableScan scan;
+    protected final StreamTableScan scan;
 
-    private final SplitAssigner splitAssigner;
+    protected final SplitAssigner splitAssigner;
 
-    private final BucketMode bucketMode;
+    protected final BucketMode bucketMode;
 
-    @Nullable private Long nextSnapshotId;
+    @Nullable protected Long nextSnapshotId;
 
-    private boolean finished = false;
+    protected boolean finished = false;
 
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
@@ -89,14 +89,11 @@ public class ContinuousFileSplitEnumerator
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
         this.bucketMode = bucketMode;
-        this.splitAssigner =
-                bucketMode == BucketMode.UNAWARE
-                        ? new FIFOSplitAssigner(Collections.emptyList())
-                        : new PreAssignSplitAssigner(1, context, 
Collections.emptyList());
+        this.splitAssigner = createSplitAssigner();
         addSplits(remainSplits);
     }
 
-    private void addSplits(Collection<FileStoreSourceSplit> splits) {
+    protected void addSplits(Collection<FileStoreSourceSplit> splits) {
         splits.forEach(this::addSplit);
     }
 
@@ -138,7 +135,7 @@ public class ContinuousFileSplitEnumerator
     }
 
     @Override
-    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
+    public PendingSplitsCheckpoint snapshotState(long checkpointId) throws 
Exception {
         List<FileStoreSourceSplit> splits = new 
ArrayList<>(splitAssigner.remainingSplits());
         final PendingSplitsCheckpoint checkpoint =
                 new PendingSplitsCheckpoint(splits, nextSnapshotId);
@@ -149,13 +146,13 @@ public class ContinuousFileSplitEnumerator
 
     // ------------------------------------------------------------------------
 
-    private PlanWithNextSnapshotId scanNextSnapshot() {
+    protected PlanWithNextSnapshotId scanNextSnapshot() {
         TableScan.Plan plan = scan.plan();
         Long nextSnapshotId = scan.checkpoint();
         return new PlanWithNextSnapshotId(plan, nextSnapshotId);
     }
 
-    private void processDiscoveredSplits(
+    protected void processDiscoveredSplits(
             PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
         if (error != null) {
             if (error instanceof EndOfScanException) {
@@ -184,9 +181,9 @@ public class ContinuousFileSplitEnumerator
      * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
      * #processDiscoveredSplits} have thread conflicts.
      */
-    private synchronized void assignSplits() {
+    protected synchronized void assignSplits() {
         Map<Integer, List<FileStoreSourceSplit>> assignment = 
createAssignment();
-        if (finished) {
+        if (noMoreSplits()) {
             Iterator<Integer> iterator = readersAwaitingSplit.iterator();
             while (iterator.hasNext()) {
                 Integer reader = iterator.next();
@@ -217,7 +214,7 @@ public class ContinuousFileSplitEnumerator
         return assignment;
     }
 
-    private int assignTask(int bucket) {
+    protected int assignTask(int bucket) {
         if (bucketMode == BucketMode.UNAWARE) {
             // we just assign task 0 when bucket unaware
             return 0;
@@ -228,7 +225,18 @@ public class ContinuousFileSplitEnumerator
         }
     }
 
-    private static class PlanWithNextSnapshotId {
+    protected SplitAssigner createSplitAssigner() {
+        return bucketMode == BucketMode.UNAWARE
+                ? new FIFOSplitAssigner(Collections.emptyList())
+                : new PreAssignSplitAssigner(1, context, 
Collections.emptyList());
+    }
+
+    protected boolean noMoreSplits() {
+        return finished;
+    }
+
+    /** The result of scan. */
+    protected static class PlanWithNextSnapshotId {
         private final TableScan.Plan plan;
         private final Long nextSnapshotId;
 
@@ -236,5 +244,13 @@ public class ContinuousFileSplitEnumerator
             this.plan = plan;
             this.nextSnapshotId = nextSnapshotId;
         }
+
+        public TableScan.Plan plan() {
+            return plan;
+        }
+
+        public Long nextSnapshotId() {
+            return nextSnapshotId;
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 4a2be4a58..aa08a1dfd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -38,8 +38,8 @@ public class ContinuousFileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 4L;
 
-    private final Map<String, String> options;
-    private final BucketMode bucketMode;
+    protected final Map<String, String> options;
+    protected final BucketMode bucketMode;
 
     public ContinuousFileStoreSource(
             ReadBuilder readBuilder, Map<String, String> options, @Nullable 
Long limit) {
@@ -72,14 +72,21 @@ public class ContinuousFileStoreSource extends FlinkSource {
             nextSnapshotId = checkpoint.currentSnapshotId();
             splits = checkpoint.splits();
         }
-        CoreOptions coreOptions = CoreOptions.fromMap(options);
         StreamTableScan scan = readBuilder.newStreamScan();
         scan.restore(nextSnapshotId);
+        return buildEnumerator(context, splits, nextSnapshotId, scan);
+    }
+
+    protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
buildEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Collection<FileStoreSourceSplit> splits,
+            @Nullable Long nextSnapshotId,
+            StreamTableScan scan) {
         return new ContinuousFileSplitEnumerator(
                 context,
                 splits,
                 nextSnapshotId,
-                coreOptions.continuousDiscoveryInterval().toMillis(),
+                
CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
                 scan,
                 bucketMode);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index fea81f932..a9664347a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -22,7 +22,9 @@ import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
 import org.apache.flink.table.data.RowData;
 
@@ -31,7 +33,7 @@ import javax.annotation.Nullable;
 import java.util.Map;
 
 /** A {@link SourceReader} that read records from {@link 
FileStoreSourceSplit}. */
-public final class FileStoreSourceReader
+public class FileStoreSourceReader
         extends SingleThreadMultiplexSourceReaderBase<
                 RecordIterator<RowData>, RowData, FileStoreSourceSplit, 
FileStoreSourceSplitState> {
 
@@ -45,6 +47,20 @@ public final class FileStoreSourceReader
                 readerContext);
     }
 
+    public FileStoreSourceReader(
+            SourceReaderContext readerContext,
+            TableRead tableRead,
+            @Nullable Long limit,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
+                    elementsQueue) {
+        super(
+                elementsQueue,
+                () -> new FileStoreSourceSplitReader(tableRead, 
RecordLimiter.create(limit)),
+                FlinkRecordsWithSplitIds::emitRecord,
+                readerContext.getConfiguration(),
+                readerContext);
+    }
+
     @Override
     public void start() {
         // we request a split only if we did not get splits during the 
checkpoint restore
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index bab49ca4c..52e510eb5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -24,6 +24,7 @@ import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.Projection;
 import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
 import org.apache.paimon.flink.source.operator.MonitorFunction;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
@@ -38,8 +39,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
@@ -53,6 +57,7 @@ import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
@@ -135,6 +140,17 @@ public class FlinkSourceBuilder {
     }
 
     private DataStream<RowData> buildContinuousFileSource() {
+        if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
+            assertStreamingConfigurationForAlignMode(env);
+            return toDataStream(
+                    new AlignedContinuousFileStoreSource(
+                            createReadBuilder(),
+                            table.options(),
+                            limit,
+                            table instanceof FileStoreTable
+                                    ? ((FileStoreTable) table).bucketMode()
+                                    : BucketMode.FIXED));
+        }
         return toDataStream(
                 new ContinuousFileStoreSource(
                         createReadBuilder(),
@@ -229,4 +245,36 @@ public class FlinkSourceBuilder {
         }
         return dataStream;
     }
+
+    public void 
assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
+        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
+        checkArgument(
+                checkpointConfig.isCheckpointingEnabled(),
+                "The align mode of paimon source is only supported when 
checkpoint enabled. Please set "
+                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key()
+                        + "larger than 0");
+        checkArgument(
+                checkpointConfig.getMaxConcurrentCheckpoints() == 1,
+                "The align mode of paimon source supports at most one ongoing 
checkpoint at the same time. Please set "
+                        + 
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key()
+                        + " to 1");
+        checkArgument(
+                checkpointConfig.getCheckpointTimeout()
+                        > 
conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT)
+                                .toMillis(),
+                "The align mode of paimon source requires that the timeout of 
checkpoint is greater than the timeout of the source's snapshot alignment. 
Please increase "
+                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()
+                        + " or decrease "
+                        + 
FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key());
+        checkArgument(
+                !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
+                "The align mode of paimon source currently does not support 
unaligned checkpoints. Please set "
+                        + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
+                        + " to false.");
+        checkArgument(
+                env.getCheckpointConfig().getCheckpointingMode() == 
CheckpointingMode.EXACTLY_ONCE,
+                "The align mode of paimon source currently only supports 
EXACTLY_ONCE checkpoint mode. Please set "
+                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
+                        + " to exactly-once");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
new file mode 100644
index 000000000..4ffe36b5e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.paimon.flink.source.ContinuousFileSplitEnumerator;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
+import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * An aligned continuously monitoring enumerator. It requires checkpoint to be 
aligned with
+ * snapshot.
+ *
+ * <p>There are two alignment cases here:
+ *
+ * <ul>
+ *   <li>The snapshot has been consumed, but the checkpoint has not yet been 
triggered: {@link
+ *       AlignedSourceReader} will not request splits until checkpoint is 
triggered.
+ *   <li>The checkpoint has been triggered, but the snapshot has not yet been 
produced: The
+ *       checkpoint will block until the snapshot of the upstream table is 
generated or times out.
+ * </ul>
+ */
+public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnumerator {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AlignedContinuousFileSplitEnumerator.class);
+
+    private static final String PLACEHOLDER_SPLIT = "placeholder";
+
+    private static final int MAX_PENDING_PLAN = 10;
+
+    private final ArrayBlockingQueue<PlanWithNextSnapshotId> pendingPlans;
+
+    private final AlignedSplitAssigner alignedAssigner;
+
+    private final long alignTimeout;
+
+    private final Object lock;
+
+    private long currentCheckpointId;
+
+    private boolean closed;
+
+    public AlignedContinuousFileSplitEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Collection<FileStoreSourceSplit> remainSplits,
+            @Nullable Long nextSnapshotId,
+            long discoveryInterval,
+            StreamTableScan scan,
+            BucketMode bucketMode,
+            long alignTimeout) {
+        super(context, remainSplits, nextSnapshotId, discoveryInterval, scan, 
bucketMode);
+        this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
+        this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
+        this.nextSnapshotId = nextSnapshotId;
+        this.alignTimeout = alignTimeout;
+        this.lock = new Object();
+        this.currentCheckpointId = Long.MIN_VALUE;
+        this.closed = false;
+    }
+
+    @Override
+    protected void addSplits(Collection<FileStoreSourceSplit> splits) {
+        Map<Long, List<FileStoreSourceSplit>> splitsBySnapshot = new 
TreeMap<>();
+
+        for (FileStoreSourceSplit split : splits) {
+            long snapshotId = ((DataSplit) split.split()).snapshotId();
+            splitsBySnapshot.computeIfAbsent(snapshotId, snapshot -> new 
ArrayList<>()).add(split);
+        }
+
+        for (List<FileStoreSourceSplit> previousSplits : 
splitsBySnapshot.values()) {
+            Map<Integer, List<FileStoreSourceSplit>> subtaskSplits =
+                    computeForBucket(previousSplits);
+            subtaskSplits.forEach(
+                    (subtask, taskSplits) ->
+                            taskSplits.forEach(split -> 
splitAssigner.addSplit(subtask, split)));
+        }
+    }
+
+    private Map<Integer, List<FileStoreSourceSplit>> computeForBucket(
+            Collection<FileStoreSourceSplit> splits) {
+        Map<Integer, List<FileStoreSourceSplit>> subtaskSplits = new 
HashMap<>();
+        for (FileStoreSourceSplit split : splits) {
+            int taskId = assignTask(((DataSplit) split.split()).bucket());
+            subtaskSplits.computeIfAbsent(taskId, subtask -> new 
ArrayList<>()).add(split);
+        }
+        return subtaskSplits;
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+        synchronized (lock) {
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<FileStoreSourceSplit> splits, int 
subtaskId) {
+        super.addSplitsBack(splits, subtaskId);
+    }
+
+    @Override
+    public PendingSplitsCheckpoint snapshotState(long checkpointId) throws 
Exception {
+        if (!alignedAssigner.isAligned() && !closed) {
+            synchronized (lock) {
+                if (pendingPlans.isEmpty()) {
+                    lock.wait(alignTimeout);
+                    Preconditions.checkArgument(!closed, "Enumerator has been 
closed.");
+                    Preconditions.checkArgument(
+                            !pendingPlans.isEmpty(),
+                            "Timeout while waiting for snapshot from paimon 
source.");
+                }
+            }
+            PlanWithNextSnapshotId pendingPlan = pendingPlans.poll();
+            addSplits(splitGenerator.createSplits(pendingPlan.plan()));
+            nextSnapshotId = pendingPlan.nextSnapshotId();
+            assignSplits();
+        }
+        Preconditions.checkArgument(alignedAssigner.isAligned());
+        alignedAssigner.removeFirst();
+        currentCheckpointId = checkpointId;
+
+        // send checkpoint event to the source reader
+        CheckpointEvent event = new CheckpointEvent(checkpointId);
+        for (int i = 0; i < context.currentParallelism(); i++) {
+            context.sendEventToSourceReader(i, event);
+        }
+        return new PendingSplitsCheckpoint(alignedAssigner.remainingSplits(), 
nextSnapshotId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) {
+        if (currentCheckpointId == checkpointId) {
+            throw new FlinkRuntimeException("Checkpoint failure is not allowed 
in aligned mode.");
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        currentCheckpointId = Long.MIN_VALUE;
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    protected PlanWithNextSnapshotId scanNextSnapshot() {
+        if (pendingPlans.remainingCapacity() > 0) {
+            PlanWithNextSnapshotId scannedPlan = super.scanNextSnapshot();
+            if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
+                synchronized (lock) {
+                    pendingPlans.add(scannedPlan);
+                    lock.notifyAll();
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    protected void processDiscoveredSplits(PlanWithNextSnapshotId ignore, 
Throwable error) {
+        if (error != null) {
+            if (error instanceof EndOfScanException) {
+                // finished
+                LOG.debug("Catching EndOfStreamException, the stream is 
finished.");
+                finished = true;
+            } else {
+                LOG.error("Failed to enumerate files", error);
+            }
+        }
+
+        if (alignedAssigner.remainingSnapshots() >= MAX_PENDING_PLAN) {
+            assignSplits();
+            return;
+        }
+
+        PlanWithNextSnapshotId nextPlan = pendingPlans.poll();
+        if (nextPlan != null) {
+            nextSnapshotId = nextPlan.nextSnapshotId();
+            TableScan.Plan plan = nextPlan.plan();
+            if (plan.splits().isEmpty()) {
+                addSplits(
+                        Collections.singletonList(
+                                new FileStoreSourceSplit(
+                                        PLACEHOLDER_SPLIT,
+                                        new PlaceholderSplit(nextSnapshotId - 
1))));
+            } else {
+                addSplits(splitGenerator.createSplits(plan));
+            }
+        }
+        assignSplits();
+    }
+
+    @Override
+    protected boolean noMoreSplits() {
+        return super.noMoreSplits()
+                && alignedAssigner.remainingSnapshots() == 0
+                && pendingPlans.isEmpty();
+    }
+
+    @Override
+    protected SplitAssigner createSplitAssigner() {
+        return new AlignedSplitAssigner();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
new file mode 100644
index 000000000..fefa26710
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.source.ContinuousFileStoreSource;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
+/** Used to create {@link AlignedSourceReader} and {@link 
AlignedContinuousFileSplitEnumerator}. */
+public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource {
+
+    public AlignedContinuousFileStoreSource(
+            ReadBuilder readBuilder,
+            Map<String, String> options,
+            @Nullable Long limit,
+            BucketMode bucketMode) {
+        super(readBuilder, options, limit, bucketMode);
+    }
+
+    @Override
+    public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
+        IOManager ioManager =
+                new IOManagerImpl(
+                        splitPaths(
+                                context.getConfiguration()
+                                        
.get(org.apache.flink.configuration.CoreOptions.TMP_DIRS)));
+        return new AlignedSourceReader(
+                context,
+                readBuilder.newRead().withIOManager(ioManager),
+                limit,
+                new FutureCompletingBlockingQueue<>(
+                        context.getConfiguration()
+                                
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
+    }
+
+    @Override
+    protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
buildEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Collection<FileStoreSourceSplit> splits,
+            @Nullable Long nextSnapshotId,
+            StreamTableScan scan) {
+        Options options = Options.fromMap(this.options);
+        return new AlignedContinuousFileSplitEnumerator(
+                context,
+                splits,
+                nextSnapshotId,
+                
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
+                scan,
+                bucketMode,
+                
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
new file mode 100644
index 000000000..99b840a09
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.paimon.flink.source.FileStoreSourceReader;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitState;
+import org.apache.paimon.table.source.TableRead;
+
+import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The difference with {@link FileStoreSourceReader} is that only after the 
allocated splits are
+ * fully consumed, checkpoints will be made and the next batch of splits will 
be requested.
+ */
+public class AlignedSourceReader extends FileStoreSourceReader
+        implements ExternallyInducedSourceReader<RowData, 
FileStoreSourceSplit> {
+
+    private final FutureCompletingBlockingQueue<
+                    RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
+            elementsQueue;
+    private Long nextCheckpointId;
+
+    public AlignedSourceReader(
+            SourceReaderContext readerContext,
+            TableRead tableRead,
+            @Nullable Long limit,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
+                    elementsQueue) {
+        super(readerContext, tableRead, limit, elementsQueue);
+        this.elementsQueue = elementsQueue;
+        this.nextCheckpointId = null;
+    }
+
+    @Override
+    public void handleSourceEvents(SourceEvent sourceEvent) {
+        if (sourceEvent instanceof CheckpointEvent) {
+            nextCheckpointId = ((CheckpointEvent) 
sourceEvent).getCheckpointId();
+            elementsQueue.notifyAvailable();
+        } else {
+            super.handleSourceEvents(sourceEvent);
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, FileStoreSourceSplitState> 
finishedSplitIds) {
+        // ignore
+    }
+
+    @Override
+    public Optional<Long> shouldTriggerCheckpoint() {
+        if (getNumberOfCurrentlyAssignedSplits() == 0 && nextCheckpointId != 
null) {
+            long checkpointId = nextCheckpointId;
+            nextCheckpointId = null;
+            context.sendSplitRequest();
+            return Optional.of(checkpointId);
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
new file mode 100644
index 000000000..eddce4f08
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/**
+ * Event sent from {@link AlignedContinuousFileSplitEnumerator} to {@link 
AlignedSourceReader} to
+ * notify the checkpointId that is about to be triggered.
+ */
+public class CheckpointEvent implements SourceEvent {
+
+    private static final long serialVersionUID = 1L;
+
+    @Nonnull private final long checkpointId;
+
+    public CheckpointEvent(long checkpointId) {
+        this.checkpointId = checkpointId;
+    }
+
+    @Nonnull
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CheckpointEvent that = (CheckpointEvent) o;
+        return checkpointId == that.checkpointId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(checkpointId);
+    }
+
+    @Override
+    public String toString() {
+        return "CheckpointEvent{" + "checkpointId=" + checkpointId + '}';
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
new file mode 100644
index 000000000..d6c172fed
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.table.source.DataSplit;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Used as a placeholder for an empty snapshot, indicating that the current 
snapshot does not
+ * contain any {@link org.apache.paimon.table.source.Split}.
+ */
+public class PlaceholderSplit extends DataSplit {
+    private static final long serialVersionUID = 3L;
+
+    private final DataSplit dataSplit;
+
+    public PlaceholderSplit(long snapshotId) {
+        dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(snapshotId)
+                        .withBeforeFiles(Collections.emptyList())
+                        .withBucket(0)
+                        .withDataFiles(Collections.emptyList())
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .isStreaming(true)
+                        .build();
+    }
+
+    public PlaceholderSplit(DataSplit dataSplit) {
+        this.dataSplit = dataSplit;
+    }
+
+    @Override
+    public long snapshotId() {
+        return dataSplit.snapshotId();
+    }
+
+    @Override
+    public BinaryRow partition() {
+        return dataSplit.partition();
+    }
+
+    @Override
+    public int bucket() {
+        return dataSplit.bucket();
+    }
+
+    @Override
+    public List<DataFileMeta> beforeFiles() {
+        return dataSplit.beforeFiles();
+    }
+
+    @Override
+    public List<DataFileMeta> dataFiles() {
+        return dataSplit.dataFiles();
+    }
+
+    @Override
+    public boolean isStreaming() {
+        return dataSplit.isStreaming();
+    }
+
+    @Override
+    public long rowCount() {
+        return dataSplit.rowCount();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PlaceholderSplit that = (PlaceholderSplit) o;
+        return Objects.equals(dataSplit, that.dataSplit);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataSplit);
+    }
+
+    public void serialize(DataOutputView out) throws IOException {
+        dataSplit.serialize(out);
+    }
+
+    public static PlaceholderSplit deserialize(DataInputView in) throws 
IOException {
+        DataSplit dataSplit = DataSplit.deserialize(in);
+        return new PlaceholderSplit(dataSplit);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
new file mode 100644
index 000000000..41d6f097b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.assigners;
+
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.align.PlaceholderSplit;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Splits are allocated at the granularity of snapshots. When the splits of 
the current snapshot are
+ * not fully allocated and checkpoint are not triggered, the next snapshot 
will not be allocated.
+ */
+public class AlignedSplitAssigner implements SplitAssigner {
+
+    private final Deque<PendingSnapshot> pendingSplitAssignment;
+
+    public AlignedSplitAssigner() {
+        this.pendingSplitAssignment = new LinkedList<>();
+    }
+
+    @Override
+    public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String 
hostname) {
+        PendingSnapshot head = pendingSplitAssignment.peek();
+        if (head != null && !head.isPlaceHolder) {
+            List<FileStoreSourceSplit> subtaskSplits = head.remove(subtask);
+            return subtaskSplits != null ? subtaskSplits : 
Collections.emptyList();
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    @Override
+    public void addSplit(int subtask, FileStoreSourceSplit splits) {
+        long snapshotId = ((DataSplit) splits.split()).snapshotId();
+        PendingSnapshot last = pendingSplitAssignment.peekLast();
+        boolean isPlaceholder = splits.split() instanceof PlaceholderSplit;
+        if (last == null || last.snapshotId != snapshotId) {
+            last = new PendingSnapshot(snapshotId, isPlaceholder, new 
HashMap<>());
+            last.add(subtask, splits);
+            pendingSplitAssignment.addLast(last);
+        } else {
+            last.add(subtask, splits);
+        }
+    }
+
+    @Override
+    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
+        if (splits.isEmpty()) {
+            return;
+        }
+
+        long snapshotId = ((DataSplit) splits.get(0).split()).snapshotId();
+        boolean isPlaceholder = splits.get(0).split() instanceof 
PlaceholderSplit;
+        PendingSnapshot head = pendingSplitAssignment.peek();
+        if (head == null || snapshotId != head.snapshotId) {
+            head = new PendingSnapshot(snapshotId, isPlaceholder, new 
HashMap<>());
+            head.addAll(subtask, splits);
+            pendingSplitAssignment.addFirst(head);
+        } else {
+            head.addAll(subtask, splits);
+        }
+    }
+
+    @Override
+    public Collection<FileStoreSourceSplit> remainingSplits() {
+        List<FileStoreSourceSplit> remainingSplits = new ArrayList<>();
+        for (PendingSnapshot pendingSnapshot : pendingSplitAssignment) {
+            
pendingSnapshot.subtaskSplits.values().forEach(remainingSplits::addAll);
+        }
+        return remainingSplits;
+    }
+
+    public boolean isAligned() {
+        PendingSnapshot head = pendingSplitAssignment.peek();
+        return head != null && head.empty();
+    }
+
+    public int remainingSnapshots() {
+        return pendingSplitAssignment.size();
+    }
+
+    public void removeFirst() {
+        PendingSnapshot head = pendingSplitAssignment.poll();
+        Preconditions.checkArgument(
+                head != null && head.empty(),
+                "The head pending splits is not empty. This is a bug, please 
file an issue.");
+    }
+
+    private static class PendingSnapshot {
+        private final long snapshotId;
+        private final boolean isPlaceHolder;
+        private final Map<Integer, List<FileStoreSourceSplit>> subtaskSplits;
+
+        public PendingSnapshot(
+                long snapshotId,
+                boolean isPlaceHolder,
+                Map<Integer, List<FileStoreSourceSplit>> subtaskSplits) {
+            this.snapshotId = snapshotId;
+            this.isPlaceHolder = isPlaceHolder;
+            this.subtaskSplits = subtaskSplits;
+        }
+
+        public List<FileStoreSourceSplit> remove(int subtask) {
+            return subtaskSplits.remove(subtask);
+        }
+
+        public void add(int subtask, FileStoreSourceSplit split) {
+            Preconditions.checkArgument(
+                    ((DataSplit) split.split()).snapshotId() == snapshotId,
+                    "SnapshotId not equal. This is a bug, please file an 
issue.");
+            subtaskSplits.computeIfAbsent(subtask, id -> new 
ArrayList<>()).add(split);
+        }
+
+        public void addAll(int subtask, List<FileStoreSourceSplit> splits) {
+            Preconditions.checkArgument(
+                    !subtaskSplits.containsKey(subtask),
+                    "Encountered a non-empty list of subtask pending splits. 
This is a bug, please file an issue.");
+            splits.forEach(
+                    split ->
+                            Preconditions.checkArgument(
+                                    ((DataSplit) split.split()).snapshotId() 
== snapshotId,
+                                    "SnapshotId not equal"));
+            subtaskSplits.put(subtask, splits);
+        }
+
+        public boolean empty() {
+            return subtaskSplits.isEmpty() || isPlaceHolder;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index a35e309f0..ce4acc286 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -532,7 +532,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         // empty plan
         context.runInCoordinatorThread(
-                () -> checkpoint.set(enumerator.snapshotState(1L))); // 
checkpoint
+                () -> checkpoint.set(checkpointWithoutException(enumerator, 
1L))); // checkpoint
         context.triggerAlCoordinatorAction();
         state = checkpoint.getAndSet(null);
         assertThat(state).isNotNull();
@@ -543,7 +543,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.triggerAllWorkerAction(); // scan next plan
         context.triggerAlCoordinatorAction(); // processDiscoveredSplits
         context.runInCoordinatorThread(
-                () -> checkpoint.set(enumerator.snapshotState(1L))); // 
snapshotState
+                () -> checkpoint.set(checkpointWithoutException(enumerator, 
2L))); // snapshotState
         context.triggerAlCoordinatorAction();
         state = checkpoint.getAndSet(null);
         assertThat(state).isNotNull();
@@ -557,7 +557,7 @@ public class ContinuousFileSplitEnumeratorTest {
         // multiple plans happen before processDiscoveredSplits
         context.triggerAllWorkerAction(); // scan next plan
         context.runInCoordinatorThread(
-                () -> checkpoint.set(enumerator.snapshotState(2L))); // 
snapshotState
+                () -> checkpoint.set(checkpointWithoutException(enumerator, 
3L))); // snapshotState
         context.triggerAllWorkerAction(); // scan next plan
         context.triggerNextCoordinatorAction(); // process first discovered 
splits
         context.triggerNextCoordinatorAction(); // checkpoint
@@ -567,6 +567,15 @@ public class ContinuousFileSplitEnumeratorTest {
         
assertThat(toDataSplits(state.splits())).containsExactlyElementsOf(expectedResults.get(2L));
     }
 
+    private static PendingSplitsCheckpoint checkpointWithoutException(
+            ContinuousFileSplitEnumerator enumerator, long checkpointId) {
+        try {
+            return enumerator.snapshotState(checkpointId);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     private static List<DataSplit> 
toDataSplits(Collection<FileStoreSourceSplit> splits) {
         return splits.stream()
                 .map(FileStoreSourceSplit::split)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index a825cf331..b2f217acf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -44,7 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Unit tests for the {@link FileStoreSourceReader}. */
 public class FileStoreSourceReaderTest {
 
-    @TempDir java.nio.file.Path tempDir;
+    @TempDir protected java.nio.file.Path tempDir;
 
     @BeforeEach
     public void beforeEach() throws Exception {
@@ -104,14 +104,14 @@ public class FileStoreSourceReaderTest {
         assertThat(context.getNumSplitRequests()).isEqualTo(2);
     }
 
-    private FileStoreSourceReader createReader(TestingReaderContext context) {
+    protected FileStoreSourceReader createReader(TestingReaderContext context) 
{
         return new FileStoreSourceReader(
                 context,
                 new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
                 null);
     }
 
-    private static FileStoreSourceSplit createTestFileSplit(String id) {
+    protected static FileStoreSourceSplit createTestFileSplit(String id) {
         return newSourceSplit(id, row(1), 0, Collections.emptyList());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
new file mode 100644
index 000000000..9cf56de7e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.paimon.flink.source.ContinuousFileSplitEnumeratorTest.createSnapshotSplit;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit tests for the {@link AlignedContinuousFileSplitEnumerator}. */
+public class AlignedContinuousFileSplitEnumeratorTest {
+
+    @Test
+    public void testSplitsAssignedBySnapshot() throws Exception {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        List<FileStoreSourceSplit> initialSplits = new ArrayList<>();
+        for (int i = 1; i <= 2; i++) {
+            initialSplits.add(createSnapshotSplit(i, 0, 
Collections.emptyList()));
+        }
+        initialSplits.add(createSnapshotSplit(2, 1, Collections.emptyList()));
+        List<FileStoreSourceSplit> expectedSplits = new 
ArrayList<>(initialSplits);
+
+        final AlignedContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(initialSplits)
+                        .setDiscoveryInterval(3)
+                        .build();
+
+        // first request
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        Map<Integer, 
TestingSplitEnumeratorContext.SplitAssignmentState<FileStoreSourceSplit>>
+                assignments = context.getSplitAssignments();
+        // Only subtask-0 is allocated.
+        assertThat(assignments).containsOnlyKeys(0);
+        List<FileStoreSourceSplit> assignedSplits = 
assignments.get(0).getAssignedSplits();
+        assertThat(assignedSplits).containsExactly(expectedSplits.get(0));
+
+        // second request
+        context.getSplitAssignments().clear();
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        assertThat(context.getSplitAssignments()).isEmpty();
+
+        // snapshot state
+        enumerator.snapshotState(1L);
+        assertThat(context.getSplitAssignments()).isEmpty();
+
+        // third request
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        
assertThat(assignments.get(0).getAssignedSplits()).containsExactly(expectedSplits.get(1));
+        
assertThat(assignments.get(1).getAssignedSplits()).containsExactly(expectedSplits.get(2));
+    }
+
+    @Test
+    public void testEnumeratorSnapshotState() throws Exception {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(1);
+        context.registerReader(0, "test-host");
+
+        final AlignedContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(3)
+                        .setAlignedTimeout(10L)
+                        .build();
+        assertThatThrownBy(() -> enumerator.snapshotState(1L))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                "Timeout while waiting for snapshot from 
paimon source."));
+
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        for (int i = 1; i <= 2; i++) {
+            splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+        }
+        enumerator.addSplits(splits);
+        enumerator.handleSplitRequest(0, "test-host");
+
+        Map<Integer, 
TestingSplitEnumeratorContext.SplitAssignmentState<FileStoreSourceSplit>>
+                assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        
assertThat(assignments.get(0).getAssignedSplits()).containsExactly(splits.get(0));
+        PendingSplitsCheckpoint checkpoint = enumerator.snapshotState(1L);
+        assertThat(checkpoint.splits()).containsExactly(splits.get(1));
+    }
+
+    private static class Builder {
+        private SplitEnumeratorContext<FileStoreSourceSplit> context;
+        private Collection<FileStoreSourceSplit> initialSplits = 
Collections.emptyList();
+        private long discoveryInterval = Long.MAX_VALUE;
+
+        private StreamTableScan scan;
+        private BucketMode bucketMode = BucketMode.FIXED;
+
+        private long timeout = 30000L;
+
+        public Builder setSplitEnumeratorContext(
+                SplitEnumeratorContext<FileStoreSourceSplit> context) {
+            this.context = context;
+            return this;
+        }
+
+        public Builder setInitialSplits(Collection<FileStoreSourceSplit> 
initialSplits) {
+            this.initialSplits = initialSplits;
+            return this;
+        }
+
+        public Builder setDiscoveryInterval(long discoveryInterval) {
+            this.discoveryInterval = discoveryInterval;
+            return this;
+        }
+
+        public Builder setScan(StreamTableScan scan) {
+            this.scan = scan;
+            return this;
+        }
+
+        public Builder withBucketMode(BucketMode bucketMode) {
+            this.bucketMode = bucketMode;
+            return this;
+        }
+
+        public Builder setAlignedTimeout(long timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public AlignedContinuousFileSplitEnumerator build() {
+            return new AlignedContinuousFileSplitEnumerator(
+                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode, timeout);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
new file mode 100644
index 000000000..95db401c8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.align;
+
+import org.apache.paimon.flink.source.FileStoreSourceReader;
+import org.apache.paimon.flink.source.FileStoreSourceReaderTest;
+import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
+
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link AlignedSourceReader}. */
+public class AlignedSourceReaderTest extends FileStoreSourceReaderTest {
+
+    @Override
+    @Test
+    public void testAddMultipleSplits() throws Exception {
+        final TestingReaderContext context = new TestingReaderContext();
+        final AlignedSourceReader reader = (AlignedSourceReader) 
createReader(context);
+
+        reader.start();
+        assertThat(context.getNumSplitRequests()).isEqualTo(1);
+
+        reader.addSplits(Arrays.asList(createTestFileSplit("id1"), 
createTestFileSplit("id2")));
+        TestingReaderOutput<RowData> output = new TestingReaderOutput<>();
+        while (reader.getNumberOfCurrentlyAssignedSplits() > 0) {
+            reader.pollNext(output);
+            Thread.sleep(10);
+        }
+        // splits are only requested when a checkpoint is ready to be triggered
+        assertThat(context.getNumSplitRequests()).isEqualTo(1);
+
+        // prepare to trigger checkpoint
+        reader.handleSourceEvents(new CheckpointEvent(1L));
+        
assertThat(reader.shouldTriggerCheckpoint()).isEqualTo(Optional.of(1L));
+        assertThat(context.getNumSplitRequests()).isEqualTo(2);
+    }
+
+    @Override
+    protected FileStoreSourceReader createReader(TestingReaderContext context) 
{
+        return new AlignedSourceReader(
+                context,
+                new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+                null,
+                new FutureCompletingBlockingQueue<>(2));
+    }
+}

Reply via email to