This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 316a0f956 [flink] support consumer-id for 
AlignedContinuousFileSplitEnumerator (#1794)
316a0f956 is described below

commit 316a0f956aca022941c1836209cb8f1e43036344
Author: liming.1018 <[email protected]>
AuthorDate: Mon Aug 14 17:23:55 2023 +0800

    [flink] support consumer-id for AlignedContinuousFileSplitEnumerator (#1794)
---
 .../paimon/flink/source/FlinkSourceBuilder.java    | 27 +++----
 .../AlignedContinuousFileSplitEnumerator.java      |  6 ++
 .../source/assigners/AlignedSplitAssigner.java     |  5 ++
 .../AlignedContinuousFileSplitEnumeratorTest.java  | 83 ++++++++++++++++++++++
 4 files changed, 109 insertions(+), 12 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 52e510eb5..0b7c835ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -140,17 +140,6 @@ public class FlinkSourceBuilder {
     }
 
     private DataStream<RowData> buildContinuousFileSource() {
-        if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
-            assertStreamingConfigurationForAlignMode(env);
-            return toDataStream(
-                    new AlignedContinuousFileStoreSource(
-                            createReadBuilder(),
-                            table.options(),
-                            limit,
-                            table instanceof FileStoreTable
-                                    ? ((FileStoreTable) table).bucketMode()
-                                    : BucketMode.FIXED));
-        }
         return toDataStream(
                 new ContinuousFileStoreSource(
                         createReadBuilder(),
@@ -161,6 +150,18 @@ public class FlinkSourceBuilder {
                                 : BucketMode.FIXED));
     }
 
+    private DataStream<RowData> buildAlignedContinuousFileSource() {
+        assertStreamingConfigurationForAlignMode(env);
+        return toDataStream(
+                new AlignedContinuousFileStoreSource(
+                        createReadBuilder(),
+                        table.options(),
+                        limit,
+                        table instanceof FileStoreTable
+                                ? ((FileStoreTable) table).bucketMode()
+                                : BucketMode.FIXED));
+    }
+
     private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
         DataStreamSource<RowData> dataStream =
                 env.fromSource(
@@ -212,7 +213,9 @@ public class FlinkSourceBuilder {
                                     .build());
                 }
             } else {
-                if (conf.contains(CoreOptions.CONSUMER_ID)) {
+                if 
(conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
+                    return buildAlignedContinuousFileSource();
+                } else if (conf.contains(CoreOptions.CONSUMER_ID)) {
                     return buildContinuousStreamOperator();
                 } else {
                     return buildContinuousFileSource();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index cdb313be3..379a3ffda 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -81,6 +81,8 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
 
     private long currentCheckpointId;
 
+    private Long lastConsumedSnapshotId;
+
     private boolean closed;
 
     public AlignedContinuousFileSplitEnumerator(
@@ -98,6 +100,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
         this.alignTimeout = alignTimeout;
         this.lock = new Object();
         this.currentCheckpointId = Long.MIN_VALUE;
+        this.lastConsumedSnapshotId = null;
         this.closed = false;
     }
 
@@ -161,6 +164,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
             assignSplits();
         }
         Preconditions.checkArgument(alignedAssigner.isAligned());
+        lastConsumedSnapshotId = alignedAssigner.minRemainingSnapshotId();
         alignedAssigner.removeFirst();
         currentCheckpointId = checkpointId;
 
@@ -182,6 +186,8 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
     @Override
     public void notifyCheckpointComplete(long checkpointId) {
         currentCheckpointId = Long.MIN_VALUE;
+        Long nextSnapshot = lastConsumedSnapshotId == null ? null : 
lastConsumedSnapshotId + 1;
+        scan.notifyCheckpointComplete(nextSnapshot);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
index e1892ec19..bb03fb454 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
@@ -114,6 +114,11 @@ public class AlignedSplitAssigner implements SplitAssigner 
{
                 "The head pending splits is not empty. This is a bug, please 
file an issue.");
     }
 
+    public Long minRemainingSnapshotId() {
+        PendingSnapshot head = pendingSplitAssignment.peek();
+        return head != null ? head.snapshotId : null;
+    }
+
     private static class PendingSnapshot {
         private final long snapshotId;
         private final boolean isPlaceHolder;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index 9cf56de7e..d64a35038 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -18,17 +18,36 @@
 
 package org.apache.paimon.flink.source.align;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
 
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -41,6 +60,33 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Unit tests for the {@link AlignedContinuousFileSplitEnumerator}. */
 public class AlignedContinuousFileSplitEnumeratorTest {
 
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                    new String[] {"pt", "a", "b"});
+
+    private static final String CONSUMER_ID = "consumer";
+
+    private @TempDir java.nio.file.Path tempDir;
+    private FileStoreTable table;
+
+    @BeforeEach
+    public void before() throws Exception {
+        Path tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+        FileIO fileIO = FileIOFinder.find(tablePath);
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        TableSchema tableSchema =
+                schemaManager.createTable(
+                        new Schema(
+                                ROW_TYPE.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "a"),
+                                Collections.singletonMap(
+                                        CoreOptions.CONSUMER_ID.key(), 
CONSUMER_ID),
+                                ""));
+        table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
+    }
+
     @Test
     public void testSplitsAssignedBySnapshot() throws Exception {
         final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
@@ -124,6 +170,43 @@ public class AlignedContinuousFileSplitEnumeratorTest {
         assertThat(checkpoint.splits()).containsExactly(splits.get(1));
     }
 
+    @Test
+    public void testScanWithConsumerId() throws Exception {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(1);
+        context.registerReader(0, "test-host");
+
+        final AlignedContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setScan(table.newStreamScan())
+                        .build();
+
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        for (int i = 1; i <= 2; i++) {
+            splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+        }
+        enumerator.addSplits(splits);
+
+        ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
+        assertThat(consumerManager.consumer(CONSUMER_ID)).isEmpty();
+
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.snapshotState(1L);
+        enumerator.notifyCheckpointComplete(1L);
+        assertThat(consumerManager.consumer(CONSUMER_ID))
+                .hasValueSatisfying(
+                        new Condition<>(consumer -> consumer.nextSnapshot() == 
2L, "condition"));
+
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.snapshotState(2L);
+        enumerator.notifyCheckpointComplete(2L);
+        assertThat(consumerManager.consumer(CONSUMER_ID))
+                .hasValueSatisfying(
+                        new Condition<>(consumer -> consumer.nextSnapshot() == 
3L, "condition"));
+    }
+
     private static class Builder {
         private SplitEnumeratorContext<FileStoreSourceSplit> context;
         private Collection<FileStoreSourceSplit> initialSplits = 
Collections.emptyList();

Reply via email to