This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 316a0f956 [flink] support consumer-id for
AlignedContinuousFileSplitEnumerator (#1794)
316a0f956 is described below
commit 316a0f956aca022941c1836209cb8f1e43036344
Author: liming.1018 <[email protected]>
AuthorDate: Mon Aug 14 17:23:55 2023 +0800
[flink] support consumer-id for AlignedContinuousFileSplitEnumerator (#1794)
---
.../paimon/flink/source/FlinkSourceBuilder.java | 27 +++----
.../AlignedContinuousFileSplitEnumerator.java | 6 ++
.../source/assigners/AlignedSplitAssigner.java | 5 ++
.../AlignedContinuousFileSplitEnumeratorTest.java | 83 ++++++++++++++++++++++
4 files changed, 109 insertions(+), 12 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 52e510eb5..0b7c835ff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -140,17 +140,6 @@ public class FlinkSourceBuilder {
}
private DataStream<RowData> buildContinuousFileSource() {
- if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
- assertStreamingConfigurationForAlignMode(env);
- return toDataStream(
- new AlignedContinuousFileStoreSource(
- createReadBuilder(),
- table.options(),
- limit,
- table instanceof FileStoreTable
- ? ((FileStoreTable) table).bucketMode()
- : BucketMode.FIXED));
- }
return toDataStream(
new ContinuousFileStoreSource(
createReadBuilder(),
@@ -161,6 +150,18 @@ public class FlinkSourceBuilder {
: BucketMode.FIXED));
}
+ private DataStream<RowData> buildAlignedContinuousFileSource() {
+ assertStreamingConfigurationForAlignMode(env);
+ return toDataStream(
+ new AlignedContinuousFileStoreSource(
+ createReadBuilder(),
+ table.options(),
+ limit,
+ table instanceof FileStoreTable
+ ? ((FileStoreTable) table).bucketMode()
+ : BucketMode.FIXED));
+ }
+
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
DataStreamSource<RowData> dataStream =
env.fromSource(
@@ -212,7 +213,9 @@ public class FlinkSourceBuilder {
.build());
}
} else {
- if (conf.contains(CoreOptions.CONSUMER_ID)) {
+ if
(conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
+ return buildAlignedContinuousFileSource();
+ } else if (conf.contains(CoreOptions.CONSUMER_ID)) {
return buildContinuousStreamOperator();
} else {
return buildContinuousFileSource();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index cdb313be3..379a3ffda 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -81,6 +81,8 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
private long currentCheckpointId;
+ private Long lastConsumedSnapshotId;
+
private boolean closed;
public AlignedContinuousFileSplitEnumerator(
@@ -98,6 +100,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
this.alignTimeout = alignTimeout;
this.lock = new Object();
this.currentCheckpointId = Long.MIN_VALUE;
+ this.lastConsumedSnapshotId = null;
this.closed = false;
}
@@ -161,6 +164,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
assignSplits();
}
Preconditions.checkArgument(alignedAssigner.isAligned());
+ lastConsumedSnapshotId = alignedAssigner.minRemainingSnapshotId();
alignedAssigner.removeFirst();
currentCheckpointId = checkpointId;
@@ -182,6 +186,8 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
@Override
public void notifyCheckpointComplete(long checkpointId) {
currentCheckpointId = Long.MIN_VALUE;
+ Long nextSnapshot = lastConsumedSnapshotId == null ? null :
lastConsumedSnapshotId + 1;
+ scan.notifyCheckpointComplete(nextSnapshot);
}
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
index e1892ec19..bb03fb454 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java
@@ -114,6 +114,11 @@ public class AlignedSplitAssigner implements SplitAssigner
{
"The head pending splits is not empty. This is a bug, please
file an issue.");
}
+ public Long minRemainingSnapshotId() {
+ PendingSnapshot head = pendingSplitAssignment.peek();
+ return head != null ? head.snapshotId : null;
+ }
+
private static class PendingSnapshot {
private final long snapshotId;
private final boolean isPlaceHolder;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index 9cf56de7e..d64a35038 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -18,17 +18,36 @@
package org.apache.paimon.flink.source.align;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -41,6 +60,33 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit tests for the {@link AlignedContinuousFileSplitEnumerator}. */
public class AlignedContinuousFileSplitEnumeratorTest {
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "a", "b"});
+
+ private static final String CONSUMER_ID = "consumer";
+
+ private @TempDir java.nio.file.Path tempDir;
+ private FileStoreTable table;
+
+ @BeforeEach
+ public void before() throws Exception {
+ Path tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ FileIO fileIO = FileIOFinder.find(tablePath);
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ TableSchema tableSchema =
+ schemaManager.createTable(
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ Collections.singletonMap(
+ CoreOptions.CONSUMER_ID.key(),
CONSUMER_ID),
+ ""));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
+ }
+
@Test
public void testSplitsAssignedBySnapshot() throws Exception {
final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
@@ -124,6 +170,43 @@ public class AlignedContinuousFileSplitEnumeratorTest {
assertThat(checkpoint.splits()).containsExactly(splits.get(1));
}
+ @Test
+ public void testScanWithConsumerId() throws Exception {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(1);
+ context.registerReader(0, "test-host");
+
+ final AlignedContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setScan(table.newStreamScan())
+ .build();
+
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+ }
+ enumerator.addSplits(splits);
+
+ ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
+ assertThat(consumerManager.consumer(CONSUMER_ID)).isEmpty();
+
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.snapshotState(1L);
+ enumerator.notifyCheckpointComplete(1L);
+ assertThat(consumerManager.consumer(CONSUMER_ID))
+ .hasValueSatisfying(
+ new Condition<>(consumer -> consumer.nextSnapshot() ==
2L, "condition"));
+
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.snapshotState(2L);
+ enumerator.notifyCheckpointComplete(2L);
+ assertThat(consumerManager.consumer(CONSUMER_ID))
+ .hasValueSatisfying(
+ new Condition<>(consumer -> consumer.nextSnapshot() ==
3L, "condition"));
+ }
+
private static class Builder {
private SplitEnumeratorContext<FileStoreSourceSplit> context;
private Collection<FileStoreSourceSplit> initialSplits =
Collections.emptyList();