This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 23dad789 [FLINK-30207] Move split initialization and discovery logic
fully into SnapshotEnumerator in Table Store
23dad789 is described below
commit 23dad789879735249651992ab9fe23b986ef1564
Author: tsreaper <[email protected]>
AuthorDate: Mon Dec 5 10:24:00 2022 +0800
[FLINK-30207] Move split initialization and discovery logic fully into
SnapshotEnumerator in Table Store
This closes #411
---
.../connector/lookup/FileStoreLookupFunction.java | 6 +-
.../source/ContinuousFileSplitEnumerator.java | 38 ++-
...eSource.java => ContinuousFileStoreSource.java} | 60 ++---
.../store/connector/source/FlinkSourceBuilder.java | 16 +-
.../connector/source/PendingSplitsCheckpoint.java | 10 +-
.../source/PendingSplitsCheckpointSerializer.java | 13 +-
...StoreSource.java => StaticFileStoreSource.java} | 43 +---
.../source/StaticFileStoreSplitEnumerator.java | 6 +-
.../store/connector/source/TableStoreSource.java | 9 +-
.../table/store/connector/FileStoreITCase.java | 8 +-
.../PendingSplitsCheckpointSerializerTest.java | 8 +-
.../flink/table/store/file/KeyValueFileStore.java | 1 -
.../file/operation/AbstractFileStoreScan.java | 42 +---
.../file/operation/AppendOnlyFileStoreScan.java | 2 -
.../table/store/file/operation/FileStoreScan.java | 2 +-
.../file/operation/KeyValueFileStoreScan.java | 3 -
.../flink/table/store/file/operation/ScanKind.java | 29 +++
.../table/store/table/source/DataTableScan.java | 13 +-
.../store/table/source/SnapshotEnumerator.java | 150 ------------
.../store/table/source/TableStreamingReader.java | 43 ++--
.../snapshot/DataFileSnapshotEnumerator.java | 159 +++++++++++++
.../source/snapshot/DeltaSnapshotEnumerator.java | 66 ++++++
.../FullCompactionChangelogSnapshotEnumerator.java | 72 ++++++
.../snapshot/InputChangelogSnapshotEnumerator.java | 70 ++++++
.../table/source/snapshot/SnapshotEnumerator.java | 50 ++--
.../flink/table/store/file/TestFileStore.java | 11 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 7 +-
.../ChangelogValueCountFileStoreTableTest.java | 9 +-
.../table/ChangelogWithKeyFileDataTableTest.java | 9 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 39 ++--
.../table/store/table/FileDataFilterTestBase.java | 13 +-
.../DataFileSnapshotEnumeratorTestBase.java | 132 +++++++++++
.../snapshot/DeltaSnapshotEnumeratorTest.java | 234 +++++++++++++++++++
...lCompactionChangelogSnapshotEnumeratorTest.java | 256 +++++++++++++++++++++
.../InputChangelogSnapshotEnumeratorTest.java | 239 +++++++++++++++++++
35 files changed, 1447 insertions(+), 421 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
index 92e663e9..4f565fe0 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -156,7 +156,7 @@ public class FileStoreLookupFunction extends
TableFunction<RowData> {
/** Used by code generation. */
@SuppressWarnings("unused")
- public void eval(Object... values) throws IOException {
+ public void eval(Object... values) throws Exception {
checkRefresh();
List<RowData> results = lookupTable.get(GenericRowData.of(values));
for (RowData matchedRow : results) {
@@ -164,7 +164,7 @@ public class FileStoreLookupFunction extends
TableFunction<RowData> {
}
}
- private void checkRefresh() throws IOException {
+ private void checkRefresh() throws Exception {
if (nextLoadTime > System.currentTimeMillis()) {
return;
}
@@ -179,7 +179,7 @@ public class FileStoreLookupFunction extends
TableFunction<RowData> {
nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
}
- private void refresh() throws IOException {
+ private void refresh() throws Exception {
while (true) {
Iterator<RowData> batch = streamingReader.nextBatch();
if (batch == null) {
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index e2614bc7..2a6102a4 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -21,12 +21,9 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
-import
org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +41,6 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import static
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -58,6 +54,8 @@ public class ContinuousFileSplitEnumerator
private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
+ private Long nextSnapshotId;
+
private final long discoveryInterval;
private final Set<Integer> readersAwaitingSplit;
@@ -66,26 +64,21 @@ public class ContinuousFileSplitEnumerator
private final SnapshotEnumerator snapshotEnumerator;
- private Long currentSnapshotId;
-
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
- Path location,
- DataTableScan scan,
- CoreOptions.ChangelogProducer changelogProducer,
Collection<FileStoreSourceSplit> remainSplits,
- long currentSnapshotId,
- long discoveryInterval) {
+ Long nextSnapshotId,
+ long discoveryInterval,
+ SnapshotEnumerator snapshotEnumerator) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.bucketSplits = new HashMap<>();
addSplits(remainSplits);
- this.currentSnapshotId = currentSnapshotId;
+ this.nextSnapshotId = nextSnapshotId;
this.discoveryInterval = discoveryInterval;
this.readersAwaitingSplit = new HashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
- this.snapshotEnumerator =
- new SnapshotEnumerator(location, scan, changelogProducer,
currentSnapshotId);
+ this.snapshotEnumerator = snapshotEnumerator;
}
private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -101,10 +94,7 @@ public class ContinuousFileSplitEnumerator
@Override
public void start() {
context.callAsync(
- snapshotEnumerator,
- this::processDiscoveredSplits,
- discoveryInterval,
- discoveryInterval);
+ snapshotEnumerator::enumerate, this::processDiscoveredSplits,
0, discoveryInterval);
}
@Override
@@ -139,8 +129,7 @@ public class ContinuousFileSplitEnumerator
List<FileStoreSourceSplit> splits = new ArrayList<>();
bucketSplits.values().forEach(splits::addAll);
final PendingSplitsCheckpoint checkpoint =
- new PendingSplitsCheckpoint(
- splits, currentSnapshotId == null ? INVALID_SNAPSHOT :
currentSnapshotId);
+ new PendingSplitsCheckpoint(splits, nextSnapshotId);
LOG.debug("Source Checkpoint is {}", checkpoint);
return checkpoint;
@@ -148,7 +137,8 @@ public class ContinuousFileSplitEnumerator
// ------------------------------------------------------------------------
- private void processDiscoveredSplits(@Nullable EnumeratorResult result,
Throwable error) {
+ private void processDiscoveredSplits(
+ @Nullable DataTableScan.DataFilePlan result, Throwable error) {
if (error != null) {
LOG.error("Failed to enumerate files", error);
return;
@@ -158,8 +148,8 @@ public class ContinuousFileSplitEnumerator
return;
}
- currentSnapshotId = result.snapshotId;
- addSplits(splitGenerator.createSplits(result.plan));
+ nextSnapshotId = result.snapshotId + 1;
+ addSplits(splitGenerator.createSplits(result));
assignSplits();
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
similarity index 51%
copy from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
copy to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index 573e5f98..581d746c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -19,91 +19,63 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
+import
org.apache.flink.table.store.table.source.snapshot.DataFileSnapshotEnumerator;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collection;
-import static
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-
-/** {@link Source} of file store. */
-public class FileStoreSource extends FlinkSource {
+/** Unbounded {@link FlinkSource} for reading records. It continuously
monitors new snapshots. */
+public class ContinuousFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 1L;
private final FileStoreTable table;
-
- private final boolean isContinuous;
-
private final long discoveryInterval;
- public FileStoreSource(
+ public ContinuousFileStoreSource(
FileStoreTable table,
- boolean isContinuous,
long discoveryInterval,
@Nullable int[][] projectedFields,
@Nullable Predicate predicate,
@Nullable Long limit) {
super(table, projectedFields, predicate, limit);
this.table = table;
- this.isContinuous = isContinuous;
this.discoveryInterval = discoveryInterval;
}
@Override
public Boundedness getBoundedness() {
- return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED;
+ return Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>
restoreEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
- SnapshotManager snapshotManager = table.snapshotManager();
DataTableScan scan = table.newScan();
if (predicate != null) {
scan.withFilter(predicate);
}
- Long snapshotId;
- Collection<FileStoreSourceSplit> splits;
- if (checkpoint == null) {
- DataFilePlan plan = isContinuous ?
SnapshotEnumerator.startup(scan) : scan.plan();
- snapshotId = plan.snapshotId;
- splits = new FileStoreSourceSplitGenerator().createSplits(plan);
- } else {
- // restore from checkpoint
- snapshotId = checkpoint.currentSnapshotId();
- if (snapshotId == INVALID_SNAPSHOT) {
- snapshotId = null;
- }
+ Long nextSnapshotId = null;
+ Collection<FileStoreSourceSplit> splits = new ArrayList<>();
+ if (checkpoint != null) {
+ nextSnapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
- // create enumerator from snapshotId and splits
- if (isContinuous) {
- long currentSnapshot = snapshotId == null ?
Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
- return new ContinuousFileSplitEnumerator(
- context,
- table.location(),
- scan.withIncremental(true), // the subsequent planning is
all incremental
- table.options().changelogProducer(),
- splits,
- currentSnapshot,
- discoveryInterval);
- } else {
- Snapshot snapshot = snapshotId == null ? null :
snapshotManager.snapshot(snapshotId);
- return new StaticFileStoreSplitEnumerator(context, snapshot,
splits);
- }
+ return new ContinuousFileSplitEnumerator(
+ context,
+ splits,
+ nextSnapshotId,
+ discoveryInterval,
+ DataFileSnapshotEnumerator.create(table, scan,
nextSnapshotId));
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index d5197542..57b544e4 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -117,9 +117,13 @@ public class FlinkSourceBuilder {
return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
}
- private FileStoreSource buildFileSource(boolean isContinuous) {
- return new FileStoreSource(
- table, isContinuous, discoveryIntervalMills(),
projectedFields, predicate, limit);
+ private StaticFileStoreSource buildStaticFileSource() {
+ return new StaticFileStoreSource(table, projectedFields, predicate,
limit);
+ }
+
+ private ContinuousFileStoreSource buildContinuousFileSource() {
+ return new ContinuousFileStoreSource(
+ table, discoveryIntervalMills(), projectedFields, predicate,
limit);
}
private Source<RowData, ?, ?> buildSource() {
@@ -144,20 +148,20 @@ public class FlinkSourceBuilder {
LogStartupMode startupMode = conf.get(LOG_SCAN);
if (logSourceProvider == null) {
- return buildFileSource(true);
+ return buildContinuousFileSource();
} else {
if (startupMode != LogStartupMode.FULL) {
return logSourceProvider.createSource(null);
}
return HybridSource.<RowData,
StaticFileStoreSplitEnumerator>builder(
- buildFileSource(false))
+ buildStaticFileSource())
.addSource(
new LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
.build();
}
} else {
- return buildFileSource(false);
+ return buildStaticFileSource();
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
index 10c2b23e..789a1a77 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.store.connector.source;
+import javax.annotation.Nullable;
+
import java.util.Collection;
/**
@@ -26,15 +28,13 @@ import java.util.Collection;
*/
public class PendingSplitsCheckpoint {
- public static final long INVALID_SNAPSHOT = -1L;
-
/** The splits in the checkpoint. */
private final Collection<FileStoreSourceSplit> splits;
- private final long currentSnapshotId;
+ private final @Nullable Long currentSnapshotId;
public PendingSplitsCheckpoint(
- Collection<FileStoreSourceSplit> splits, long currentSnapshotId) {
+ Collection<FileStoreSourceSplit> splits, @Nullable Long
currentSnapshotId) {
this.splits = splits;
this.currentSnapshotId = currentSnapshotId;
}
@@ -43,7 +43,7 @@ public class PendingSplitsCheckpoint {
return splits;
}
- public long currentSnapshotId() {
+ public @Nullable Long currentSnapshotId() {
return currentSnapshotId;
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
index b3df0aca..2f129ea2 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
@@ -31,6 +31,8 @@ import java.util.List;
public class PendingSplitsCheckpointSerializer
implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
+ private static final long INVALID_SNAPSHOT = -1;
+
private final FileStoreSourceSplitSerializer splitSerializer;
public PendingSplitsCheckpointSerializer(FileStoreSourceSplitSerializer
splitSerializer) {
@@ -46,19 +48,24 @@ public class PendingSplitsCheckpointSerializer
public byte[] serialize(PendingSplitsCheckpoint pendingSplitsCheckpoint)
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+
view.writeInt(pendingSplitsCheckpoint.splits().size());
for (FileStoreSourceSplit split : pendingSplitsCheckpoint.splits()) {
byte[] bytes = splitSerializer.serialize(split);
view.writeInt(bytes.length);
view.write(bytes);
}
- view.writeLong(pendingSplitsCheckpoint.currentSnapshotId());
+
+ Long currentSnapshotId = pendingSplitsCheckpoint.currentSnapshotId();
+ view.writeLong(currentSnapshotId == null ? INVALID_SNAPSHOT :
currentSnapshotId);
+
return out.toByteArray();
}
@Override
public PendingSplitsCheckpoint deserialize(int version, byte[] serialized)
throws IOException {
DataInputDeserializer view = new DataInputDeserializer(serialized);
+
int splitNumber = view.readInt();
List<FileStoreSourceSplit> splits = new ArrayList<>(splitNumber);
for (int i = 0; i < splitNumber; i++) {
@@ -67,7 +74,9 @@ public class PendingSplitsCheckpointSerializer
view.readFully(bytes);
splits.add(splitSerializer.deserialize(version, bytes));
}
+
long currentSnapshotId = view.readLong();
- return new PendingSplitsCheckpoint(splits, currentSnapshotId);
+ return new PendingSplitsCheckpoint(
+ splits, currentSnapshotId == INVALID_SNAPSHOT ? null :
currentSnapshotId);
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
similarity index 60%
rename from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
rename to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index 573e5f98..2223bbad 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.file.Snapshot;
@@ -27,42 +26,30 @@ import
org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import javax.annotation.Nullable;
import java.util.Collection;
-import static
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-
-/** {@link Source} of file store. */
-public class FileStoreSource extends FlinkSource {
+/** Bounded {@link FlinkSource} for reading records. It does not monitor new
snapshots. */
+public class StaticFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 1L;
private final FileStoreTable table;
- private final boolean isContinuous;
-
- private final long discoveryInterval;
-
- public FileStoreSource(
+ public StaticFileStoreSource(
FileStoreTable table,
- boolean isContinuous,
- long discoveryInterval,
@Nullable int[][] projectedFields,
@Nullable Predicate predicate,
@Nullable Long limit) {
super(table, projectedFields, predicate, limit);
this.table = table;
- this.isContinuous = isContinuous;
- this.discoveryInterval = discoveryInterval;
}
@Override
public Boundedness getBoundedness() {
- return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED;
+ return Boundedness.BOUNDED;
}
@Override
@@ -78,32 +65,16 @@ public class FileStoreSource extends FlinkSource {
Long snapshotId;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
- DataFilePlan plan = isContinuous ?
SnapshotEnumerator.startup(scan) : scan.plan();
+ DataTableScan.DataFilePlan plan = scan.plan();
snapshotId = plan.snapshotId;
splits = new FileStoreSourceSplitGenerator().createSplits(plan);
} else {
// restore from checkpoint
snapshotId = checkpoint.currentSnapshotId();
- if (snapshotId == INVALID_SNAPSHOT) {
- snapshotId = null;
- }
splits = checkpoint.splits();
}
- // create enumerator from snapshotId and splits
- if (isContinuous) {
- long currentSnapshot = snapshotId == null ?
Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
- return new ContinuousFileSplitEnumerator(
- context,
- table.location(),
- scan.withIncremental(true), // the subsequent planning is
all incremental
- table.options().changelogProducer(),
- splits,
- currentSnapshot,
- discoveryInterval);
- } else {
- Snapshot snapshot = snapshotId == null ? null :
snapshotManager.snapshot(snapshotId);
- return new StaticFileStoreSplitEnumerator(context, snapshot,
splits);
- }
+ Snapshot snapshot = snapshotId == null ? null :
snapshotManager.snapshot(snapshotId);
+ return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 599417f1..1712ebde 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -30,9 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import static
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-
-/** A SplitEnumerator implementation for bounded / batch {@link
FileStoreSource} input. */
+/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource}
input. */
public class StaticFileStoreSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit,
PendingSplitsCheckpoint> {
@@ -84,7 +82,7 @@ public class StaticFileStoreSplitEnumerator
@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
return new PendingSplitsCheckpoint(
- new ArrayList<>(splits), snapshot == null ? INVALID_SNAPSHOT :
snapshot.id());
+ new ArrayList<>(splits), snapshot == null ? null :
snapshot.id());
}
@Override
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 503f1009..77d7b460 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -53,10 +53,11 @@ import static
org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
import static
org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
/**
- * Table source to create {@link FileStoreSource} under batch mode or
change-tracking is disabled.
- * For streaming mode with change-tracking enabled and FULL scan mode, it will
create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link
FileStoreSource} and kafka
- * log source created by {@link LogSourceProvider}.
+ * Table source to create {@link StaticFileStoreSource} or {@link
ContinuousFileStoreSource} under
+ * batch mode or change-tracking is disabled. For streaming mode with
change-tracking enabled and
+ * FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link
StaticFileStoreSource} and
+ * kafka log source created by {@link LogSourceProvider}.
*/
public class TableStoreSource extends FlinkTableSource
implements LookupTableSource, SupportsWatermarkPushDown {
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 5b749d74..b4e8bb72 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -35,8 +35,9 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
import org.apache.flink.table.store.connector.sink.StoreSink;
-import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.connector.source.ContinuousFileStoreSource;
import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
+import org.apache.flink.table.store.connector.source.StaticFileStoreSource;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.BlockingIterator;
@@ -77,7 +78,10 @@ import static org.apache.flink.table.store.CoreOptions.PATH;
import static
org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem.retryArtificialException;
import static org.assertj.core.api.Assertions.assertThat;
-/** ITCase for {@link FileStoreSource} and {@link StoreSink}. */
+/**
+ * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource}
and {@link
+ * StoreSink}.
+ */
@RunWith(Parameterized.class)
public class FileStoreITCase extends AbstractTestBase {
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index 0a5a3862..7548cb61 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -37,7 +37,7 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void serializeEmptyCheckpoint() throws Exception {
final PendingSplitsCheckpoint checkpoint =
- new PendingSplitsCheckpoint(Collections.emptyList(), 5);
+ new PendingSplitsCheckpoint(Collections.emptyList(), 5L);
final PendingSplitsCheckpoint deSerialized =
serializeAndDeserialize(checkpoint);
@@ -48,7 +48,7 @@ public class PendingSplitsCheckpointSerializerTest {
public void serializeSomeSplits() throws Exception {
final PendingSplitsCheckpoint checkpoint =
new PendingSplitsCheckpoint(
- Arrays.asList(testSplit1(), testSplit2(),
testSplit3()), 3);
+ Arrays.asList(testSplit1(), testSplit2(),
testSplit3()), 3L);
final PendingSplitsCheckpoint deSerialized =
serializeAndDeserialize(checkpoint);
@@ -59,7 +59,7 @@ public class PendingSplitsCheckpointSerializerTest {
public void serializeSplitsAndContinuous() throws Exception {
final PendingSplitsCheckpoint checkpoint =
new PendingSplitsCheckpoint(
- Arrays.asList(testSplit1(), testSplit2(),
testSplit3()), 20);
+ Arrays.asList(testSplit1(), testSplit2(),
testSplit3()), 20L);
final PendingSplitsCheckpoint deSerialized =
serializeAndDeserialize(checkpoint);
@@ -69,7 +69,7 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void repeatedSerialization() throws Exception {
final PendingSplitsCheckpoint checkpoint =
- new PendingSplitsCheckpoint(Arrays.asList(testSplit3(),
testSplit1()), 5);
+ new PendingSplitsCheckpoint(Arrays.asList(testSplit3(),
testSplit1()), 5L);
serializeAndDeserialize(checkpoint);
serializeAndDeserialize(checkpoint);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 5332215f..802c1944 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -112,7 +112,6 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
manifestListFactory(),
options.bucket(),
checkNumOfBuckets,
- options.changelogProducer(),
options.readCompacted());
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index b98e7211..e7e563e0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
@@ -61,7 +60,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ManifestList manifestList;
private final int numOfBuckets;
private final boolean checkNumOfBuckets;
- private final CoreOptions.ChangelogProducer changelogProducer;
private final boolean readCompacted;
private final ConcurrentMap<Long, TableSchema> tableSchemas;
@@ -74,7 +72,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Long specifiedSnapshotId = null;
private Integer specifiedBucket = null;
private List<ManifestFileMeta> specifiedManifests = null;
- private boolean isIncremental = false;
+ private ScanKind scanKind = ScanKind.ALL;
private Integer specifiedLevel = null;
public AbstractFileStoreScan(
@@ -87,7 +85,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- CoreOptions.ChangelogProducer changelogProducer,
boolean readCompacted) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
@@ -101,7 +98,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
- this.changelogProducer = changelogProducer;
this.readCompacted = readCompacted;
this.tableSchemas = new ConcurrentHashMap<>();
}
@@ -167,8 +163,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withIncremental(boolean isIncremental) {
- this.isIncremental = isIncremental;
+ public FileStoreScan withKind(ScanKind scanKind) {
+ this.scanKind = scanKind;
return this;
}
@@ -193,10 +189,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
manifests = Collections.emptyList();
} else {
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
- manifests =
- isIncremental
- ? readIncremental(snapshot)
- : snapshot.readAllDataManifests(manifestList);
+ manifests = readManiests(snapshot);
}
}
@@ -263,9 +256,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
};
}
- private List<ManifestFileMeta> readIncremental(Snapshot snapshot) {
- switch (changelogProducer) {
- case INPUT:
+ private List<ManifestFileMeta> readManiests(Snapshot snapshot) {
+ switch (scanKind) {
+ case ALL:
+ return snapshot.readAllDataManifests(manifestList);
+ case DELTA:
+ return manifestList.read(snapshot.deltaManifestList());
+ case CHANGELOG:
if (snapshot.version() >= 2) {
if (snapshot.changelogManifestList() == null) {
return Collections.emptyList();
@@ -283,23 +280,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
String.format(
"Incremental scan does not accept %s snapshot",
snapshot.commitKind()));
- case FULL_COMPACTION:
- if (snapshot.changelogManifestList() == null) {
- return Collections.emptyList();
- } else {
- return manifestList.read(snapshot.changelogManifestList());
- }
- case NONE:
- if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
- throw new IllegalStateException(
- String.format(
- "Incremental scan does not accept %s
snapshot",
- snapshot.commitKind()));
- }
- return manifestList.read(snapshot.deltaManifestList());
default:
- throw new UnsupportedOperationException(
- "Unknown changelog producer " +
changelogProducer.name());
+ throw new UnsupportedOperationException("Unknown scan kind " +
scanKind.name());
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index b1930f3d..86bcca3b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -68,7 +67,6 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
- CoreOptions.ChangelogProducer.NONE,
readCompacted);
this.schemaRowStatsConverters = new ConcurrentHashMap<>();
this.rowType = rowType;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 0ce733b8..b9fe5f80 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -46,7 +46,7 @@ public interface FileStoreScan {
FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
- FileStoreScan withIncremental(boolean isIncremental);
+ FileStoreScan withKind(ScanKind scanKind);
FileStoreScan withLevel(int level);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index 3cb707b2..68e44f0a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -62,7 +61,6 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- CoreOptions.ChangelogProducer changelogProducer,
boolean readCompacted) {
super(
partitionType,
@@ -74,7 +72,6 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
- changelogProducer,
readCompacted);
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.schemaKeyStatsConverters = new ConcurrentHashMap<>();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/ScanKind.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/ScanKind.java
new file mode 100644
index 00000000..5c7fb78e
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/ScanKind.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+/** Scan which part of the snapshot. */
+public enum ScanKind {
+ /** Scan complete data files of a snapshot. */
+ ALL,
+ /** Only scan newly changed files of a snapshot. */
+ DELTA,
+ /** Only scan changelog files of a snapshot. */
+ CHANGELOG
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 2b8e288b..73e23986 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -47,7 +48,7 @@ public abstract class DataTableScan implements TableScan {
private final FileStorePathFactory pathFactory;
private final CoreOptions options;
- private boolean isIncremental = false;
+ private ScanKind scanKind = ScanKind.ALL;
protected DataTableScan(
FileStoreScan scan,
@@ -93,9 +94,9 @@ public abstract class DataTableScan implements TableScan {
return this;
}
- public DataTableScan withIncremental(boolean isIncremental) {
- this.isIncremental = isIncremental;
- scan.withIncremental(isIncremental);
+ public DataTableScan withKind(ScanKind scanKind) {
+ this.scanKind = scanKind;
+ scan.withKind(scanKind);
return this;
}
@@ -119,7 +120,8 @@ public abstract class DataTableScan implements TableScan {
private List<DataSplit> generateSplits(
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
- return generateSplits(isIncremental, splitGenerator(pathFactory),
groupedDataFiles);
+ return generateSplits(
+ scanKind != ScanKind.ALL, splitGenerator(pathFactory),
groupedDataFiles);
}
@VisibleForTesting
@@ -171,6 +173,7 @@ public abstract class DataTableScan implements TableScan {
this.splits = splits;
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
public List<Split> splits() {
return (List) splits;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
deleted file mode 100644
index 067ef812..00000000
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.concurrent.Callable;
-
-import static
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-
-/** Enumerator to enumerate incremental snapshots. */
-public class SnapshotEnumerator implements
Callable<SnapshotEnumerator.EnumeratorResult> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SnapshotEnumerator.class);
-
- private final SnapshotManager snapshotManager;
- private final DataTableScan scan;
- private final CoreOptions.ChangelogProducer changelogProducer;
-
- private long nextSnapshotId;
-
- public SnapshotEnumerator(
- Path tablePath,
- DataTableScan scan,
- CoreOptions.ChangelogProducer changelogProducer,
- long currentSnapshot) {
- this.snapshotManager = new SnapshotManager(tablePath);
- this.scan = scan;
- this.changelogProducer = changelogProducer;
-
- this.nextSnapshotId = currentSnapshot + 1;
- }
-
- @Nullable
- @Override
- public EnumeratorResult call() {
- // TODO sync with processDiscoveredSplits to avoid too more splits in
memory
- while (true) {
- if (!snapshotManager.snapshotExists(nextSnapshotId)) {
- // TODO check latest snapshot id, expired?
- LOG.debug(
- "Next snapshot id {} does not exist, wait for the
snapshot generation.",
- nextSnapshotId);
- return null;
- }
-
- Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
-
- if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
- LOG.warn("Ignore overwrite snapshot id {}.", nextSnapshotId);
- nextSnapshotId++;
- continue;
- }
-
- if (changelogProducer == CoreOptions.ChangelogProducer.NONE
- && snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
- LOG.debug(
- "ChangelogProducer is NONE. "
- + "Next snapshot id {} is not APPEND, but is
{}, check next one.",
- nextSnapshotId,
- snapshot.commitKind());
- nextSnapshotId++;
- continue;
- }
-
- DataFilePlan plan = scan.withSnapshot(nextSnapshotId).plan();
- EnumeratorResult result = new EnumeratorResult(nextSnapshotId,
plan);
- LOG.debug("Find snapshot id {}.", nextSnapshotId);
-
- nextSnapshotId++;
- return result;
- }
- }
-
- /** Enumerator result. */
- public static class EnumeratorResult {
-
- public final long snapshotId;
-
- public final DataFilePlan plan;
-
- private EnumeratorResult(long snapshotId, DataFilePlan plan) {
- this.snapshotId = snapshotId;
- this.plan = plan;
- }
- }
-
- /** Startup snapshot enumerator, this is the first plan for continuous
reading. */
- public static DataFilePlan startup(DataTableScan scan) {
- CoreOptions options = scan.options();
- SnapshotManager snapshotManager = scan.snapshotManager();
- CoreOptions.LogStartupMode startupMode = options.logStartupMode();
- switch (startupMode) {
- case FULL:
- DataFilePlan plan;
- if (options.changelogProducer() == FULL_COMPACTION) {
- // Read the results of the last full compaction.
- // Only full compaction results will appear on the max
level.
- plan = scan.withLevel(options.numLevels() - 1).plan();
- } else {
- plan = scan.plan();
- }
- return plan;
- case LATEST:
- return new DataFilePlan(
- snapshotManager.latestSnapshotId(),
Collections.emptyList());
- case FROM_TIMESTAMP:
- Long timestampMills = options.logScanTimestampMills();
- if (timestampMills == null) {
- throw new IllegalArgumentException(
- String.format(
- "%s can not be null when you use %s for
%s",
- CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
- CoreOptions.LogStartupMode.FROM_TIMESTAMP,
- CoreOptions.LOG_SCAN.key()));
- }
- return new DataFilePlan(
- snapshotManager.earlierThanTimeMills(timestampMills),
- Collections.emptyList());
- default:
- throw new UnsupportedOperationException("Unsupported startup
mode: " + startupMode);
- }
- }
-}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index a00bed89..be2d05e5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -19,11 +19,14 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.FileStoreTable;
+import
org.apache.flink.table.store.table.source.snapshot.DeltaSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
@@ -47,14 +50,14 @@ public class TableStreamingReader {
private final int[] projection;
@Nullable private final Predicate predicate;
@Nullable private final PredicateFilter recordFilter;
-
- private SnapshotEnumerator enumerator;
+ private final SnapshotEnumerator enumerator;
public TableStreamingReader(
FileStoreTable table, int[] projection, @Nullable Predicate
predicate) {
this.table = table;
this.projection = projection;
this.predicate = predicate;
+
if (predicate != null) {
List<String> fieldNames = table.schema().fieldNames();
List<String> primaryKeys = table.schema().primaryKeys();
@@ -79,34 +82,20 @@ public class TableStreamingReader {
} else {
recordFilter = null;
}
+
+ DataTableScan scan = table.newScan();
+ if (predicate != null) {
+ scan.withFilter(predicate);
+ }
+ enumerator =
+ new DeltaSnapshotEnumerator(
+ table.location(), scan,
CoreOptions.LogStartupMode.FULL, null, null);
}
@Nullable
- public Iterator<RowData> nextBatch() throws IOException {
- if (enumerator == null) {
- DataTableScan scan = table.newScan();
- if (predicate != null) {
- scan.withFilter(predicate);
- }
- DataTableScan.DataFilePlan plan = scan.plan();
- if (plan.snapshotId == null) {
- return null;
- }
- long snapshotId = plan.snapshotId;
- enumerator =
- new SnapshotEnumerator(
- table.location(),
- scan.withIncremental(true),
- table.options().changelogProducer(),
- snapshotId);
- return read(plan);
- } else {
- SnapshotEnumerator.EnumeratorResult result = enumerator.call();
- if (result == null) {
- return null;
- }
- return read(result.plan);
- }
+ public Iterator<RowData> nextBatch() throws Exception {
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ return plan == null ? null : read(plan);
}
private Iterator<RowData> read(DataTableScan.DataFilePlan plan) throws
IOException {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
new file mode 100644
index 00000000..c345eac5
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record
related data files. */
+public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
+
+ private final SnapshotManager snapshotManager;
+ private final DataTableScan scan;
+ private final CoreOptions.LogStartupMode startupMode;
+ private @Nullable final Long startupMillis;
+
+ private @Nullable Long nextSnapshotId;
+
+ public DataFileSnapshotEnumerator(
+ Path tablePath,
+ DataTableScan scan,
+ CoreOptions.LogStartupMode startupMode,
+ @Nullable Long startupMillis,
+ @Nullable Long nextSnapshotId) {
+ this.snapshotManager = new SnapshotManager(tablePath);
+ this.scan = scan;
+ this.startupMode = startupMode;
+ this.startupMillis = startupMillis;
+
+ this.nextSnapshotId = nextSnapshotId;
+ }
+
+ @Override
+ public DataTableScan.DataFilePlan enumerate() {
+ if (nextSnapshotId == null) {
+ return tryFirstEnumerate();
+ } else {
+ return nextEnumerate();
+ }
+ }
+
+ private DataTableScan.DataFilePlan tryFirstEnumerate() {
+ Long startingSnapshotId = snapshotManager.latestSnapshotId();
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no snapshot. Wait for the snapshot
generation.");
+ return null;
+ }
+
+ DataTableScan.DataFilePlan plan;
+ switch (startupMode) {
+ case FULL:
+ plan =
scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+ break;
+ case FROM_TIMESTAMP:
+ Preconditions.checkNotNull(
+ startupMillis,
+ String.format(
+ "%s can not be null when you use %s for %s",
+ CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+ CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ CoreOptions.LOG_SCAN.key()));
+ startingSnapshotId =
snapshotManager.earlierThanTimeMills(startupMillis);
+ plan = new DataTableScan.DataFilePlan(startingSnapshotId,
Collections.emptyList());
+ break;
+ case LATEST:
+ plan = new DataTableScan.DataFilePlan(startingSnapshotId,
Collections.emptyList());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown log startup mode " + startupMode.name());
+ }
+
+ nextSnapshotId = startingSnapshotId + 1;
+ return plan;
+ }
+
+ private DataTableScan.DataFilePlan nextEnumerate() {
+ while (true) {
+ if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+ LOG.debug(
+ "Next snapshot id {} does not exist, wait for the
snapshot generation.",
+ nextSnapshotId);
+ return null;
+ }
+
+ Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+
+ if (shouldReadSnapshot(snapshot)) {
+ LOG.debug("Find snapshot id {}.", nextSnapshotId);
+ DataTableScan.DataFilePlan plan =
getPlan(scan.withSnapshot(nextSnapshotId));
+ nextSnapshotId++;
+ return plan;
+ } else {
+ nextSnapshotId++;
+ }
+ }
+ }
+
+ protected abstract boolean shouldReadSnapshot(Snapshot snapshot);
+
+ protected abstract DataTableScan.DataFilePlan getPlan(DataTableScan scan);
+
+ public static DataFileSnapshotEnumerator create(
+ FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
+ Path location = table.location();
+ CoreOptions.LogStartupMode startupMode =
table.options().logStartupMode();
+ Long startupMillis = table.options().logScanTimestampMills();
+
+ switch (table.options().changelogProducer()) {
+ case NONE:
+ return new DeltaSnapshotEnumerator(
+ location, scan, startupMode, startupMillis,
nextSnapshotId);
+ case INPUT:
+ return new InputChangelogSnapshotEnumerator(
+ location, scan, startupMode, startupMillis,
nextSnapshotId);
+ case FULL_COMPACTION:
+ return new FullCompactionChangelogSnapshotEnumerator(
+ location,
+ scan,
+ table.options().numLevels() - 1,
+ startupMode,
+ startupMillis,
+ nextSnapshotId);
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown changelog producer " +
table.options().changelogProducer().name());
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
new file mode 100644
index 00000000..857ef88d
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DataFileSnapshotEnumerator} which scans incremental changes in
{@link
+ * Snapshot#deltaManifestList()} for each newly created snapshots.
+ */
+public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DeltaSnapshotEnumerator.class);
+
+ public DeltaSnapshotEnumerator(
+ Path tablePath,
+ DataTableScan scan,
+ CoreOptions.LogStartupMode startupMode,
+ @Nullable Long startupMillis,
+ @Nullable Long nextSnapshotId) {
+ super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
+ }
+
+ @Override
+ protected boolean shouldReadSnapshot(Snapshot snapshot) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+ return true;
+ }
+
+ LOG.debug(
+ "Next snapshot id {} is not APPEND, but is {}, check next
one.",
+ snapshot.id(),
+ snapshot.commitKind());
+ return false;
+ }
+
+ @Override
+ protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
+ return scan.withKind(ScanKind.DELTA).plan();
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
new file mode 100644
index 00000000..85c90f72
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DataFileSnapshotEnumerator} which scans incremental changes in
{@link
+ * Snapshot#changelogManifestList()} for each newly created snapshots.
+ *
+ * <p>This enumerator looks for changelog files produced from full compaction.
Can only be used when
+ * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link
+ * CoreOptions.ChangelogProducer#FULL_COMPACTION}.
+ */
+public class FullCompactionChangelogSnapshotEnumerator extends
DataFileSnapshotEnumerator {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(FullCompactionChangelogSnapshotEnumerator.class);
+
+ public FullCompactionChangelogSnapshotEnumerator(
+ Path tablePath,
+ DataTableScan scan,
+ int maxLevel,
+ CoreOptions.LogStartupMode startupMode,
+ @Nullable Long startupMillis,
+ @Nullable Long nextSnapshotId) {
+ super(tablePath, scan.withLevel(maxLevel), startupMode, startupMillis,
nextSnapshotId);
+ }
+
+ @Override
+ protected boolean shouldReadSnapshot(Snapshot snapshot) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ return true;
+ }
+
+ LOG.debug(
+ "Next snapshot id {} is not COMPACT, but is {}, check next
one.",
+ snapshot.id(),
+ snapshot.commitKind());
+ return false;
+ }
+
+ @Override
+ protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
+ return scan.withKind(ScanKind.CHANGELOG).plan();
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
new file mode 100644
index 00000000..03f221bf
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DataFileSnapshotEnumerator} which scans incremental changes in
{@link
+ * Snapshot#changelogManifestList()} for each newly created snapshots.
+ *
+ * <p>This enumerator looks for changelog files produced directly from input.
Can only be used when
+ * {@link CoreOptions#CHANGELOG_PRODUCER} is set to {@link
CoreOptions.ChangelogProducer#INPUT}.
+ */
+public class InputChangelogSnapshotEnumerator extends
DataFileSnapshotEnumerator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InputChangelogSnapshotEnumerator.class);
+
+ public InputChangelogSnapshotEnumerator(
+ Path tablePath,
+ DataTableScan scan,
+ CoreOptions.LogStartupMode startupMode,
+ @Nullable Long startupMillis,
+ Long nextSnapshotId) {
+ super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
+ }
+
+ @Override
+ protected boolean shouldReadSnapshot(Snapshot snapshot) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+ return true;
+ }
+
+ LOG.debug(
+ "Next snapshot id {} is not APPEND, but is {}, check next
one.",
+ snapshot.id(),
+ snapshot.commitKind());
+ return false;
+ }
+
+ @Override
+ protected DataTableScan.DataFilePlan getPlan(DataTableScan scan) {
+ return scan.withKind(ScanKind.CHANGELOG).plan();
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java
similarity index 50%
copy from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java
index 10c2b23e..93c1cb08 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java
@@ -16,34 +16,24 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.connector.source;
-
-import java.util.Collection;
-
-/**
- * A checkpoint of the current state of the containing the currently pending
splits that are not yet
- * assigned.
- */
-public class PendingSplitsCheckpoint {
-
- public static final long INVALID_SNAPSHOT = -1L;
-
- /** The splits in the checkpoint. */
- private final Collection<FileStoreSourceSplit> splits;
-
- private final long currentSnapshotId;
-
- public PendingSplitsCheckpoint(
- Collection<FileStoreSourceSplit> splits, long currentSnapshotId) {
- this.splits = splits;
- this.currentSnapshotId = currentSnapshotId;
- }
-
- public Collection<FileStoreSourceSplit> splits() {
- return splits;
- }
-
- public long currentSnapshotId() {
- return currentSnapshotId;
- }
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import javax.annotation.Nullable;
+
+/** Enumerate incremental changes from newly created snapshots. */
+public interface SnapshotEnumerator {
+
+ /**
+ * The first call to this method will produce a {@link
DataTableScan.DataFilePlan} containing
+ * the base files for the following incremental changes (or just return
null if there are no
+ * base files).
+ *
+ * <p>Following calls to this method will produce {@link
DataTableScan.DataFilePlan}s containing
+ * incremental changed files. If there is currently no newer snapshots,
null will be returned
+ * instead.
+ */
+ @Nullable
+ DataTableScan.DataFilePlan enumerate();
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index be0c1a9d..d8d46e03 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,6 +38,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -270,7 +271,15 @@ public class TestFileStore extends KeyValueFileStore {
snapshotId <= endInclusive;
snapshotId++) {
List<ManifestEntry> entries =
-
newScan().withIncremental(true).withSnapshot(snapshotId).plan().files();
+ newScan()
+ .withKind(
+ options.changelogProducer()
+ ==
CoreOptions.ChangelogProducer.NONE
+ ? ScanKind.DELTA
+ : ScanKind.CHANGELOG)
+ .withSnapshot(snapshotId)
+ .plan()
+ .files();
result.addAll(readKvsFromManifestEntries(entries, true));
}
return result;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 7e4ea531..c8332af7 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -115,7 +116,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -133,7 +134,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("+101|11", "+102|12"));
@@ -149,7 +150,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate predicate = builder.equal(2, 101L);
List<Split> splits =
-
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
+
table.newScan().withKind(ScanKind.DELTA).withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 24067da5..be4a59aa 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -103,7 +104,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -123,7 +124,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("-100|10", "+101|11"));
@@ -139,7 +140,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
Predicate predicate = builder.equal(2, 201L);
List<Split> splits =
-
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
+
table.newScan().withKind(ScanKind.DELTA).withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -190,7 +191,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
write.close();
// check that no data file is produced
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
assertThat(splits).isEmpty();
// check that no changelog file is produced
Path bucketPath = DataFilePathFactory.bucketPath(table.location(),
"1", 0);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
index 8c8dbfae..066e1516 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileDataTableTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -140,7 +141,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
PredicateBuilder builder =
new PredicateBuilder(RowDataType.toRowType(false,
SCHEMA_0_FIELDS));
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// filter with "b" = 15 in schema0
TableRead read =
table.newRead().withFilter(builder.equal(2, 15));
@@ -157,7 +158,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
PredicateBuilder builder =
new PredicateBuilder(RowDataType.toRowType(false,
SCHEMA_1_FIELDS));
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// filter with "d" = 15 in schema1 which should be mapped
to "b" = 15 in schema0
/// TODO: changelog with key only supports to filter on key
@@ -191,7 +192,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
PredicateBuilder builder =
new PredicateBuilder(RowDataType.toRowType(false,
SCHEMA_0_FIELDS));
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// filter with "kt" = 116 in schema0
TableRead read =
table.newRead().withFilter(builder.equal(4, 116));
@@ -205,7 +206,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
PredicateBuilder builder =
new PredicateBuilder(RowDataType.toRowType(false,
SCHEMA_1_FIELDS));
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// filter with "kt" = 120 in schema1
TableRead read =
table.newRead().withFilter(builder.equal(1, 120));
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 7dd1f502..00feaa6a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
-import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -35,9 +35,11 @@ import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import
org.apache.flink.table.store.table.source.snapshot.InputChangelogSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.utils.CompatibilityTestUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -166,7 +168,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -185,7 +187,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -202,7 +204,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(1, 21));
List<Split> splits =
-
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
+
table.newScan().withKind(ScanKind.DELTA).withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -233,7 +235,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.commit(0, write.prepareCommit(true, 0));
write.close();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
@@ -267,7 +269,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.compact(binaryRow(2), 0, true);
commit.commit(0, write.prepareCommit(true, 0));
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
@@ -288,7 +290,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.compact(binaryRow(2), 0, true);
commit.commit(2, write.prepareCommit(true, 2));
- splits = table.newScan().withIncremental(true).plan().splits();
+ splits = table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder("+I
1|30|130|binary|varbinary|mapKey:mapVal|multiset");
assertThat(getResult(read, splits, binaryRow(2), 0,
CHANGELOG_ROW_TO_STRING))
@@ -312,7 +314,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.compact(binaryRow(2), 0, true);
commit.commit(4, write.prepareCommit(true, 4));
- splits = table.newScan().withIncremental(true).plan().splits();
+ splits = table.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset",
@@ -371,18 +373,15 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Collections.singletonList("-D
2|10|301|binary|varbinary")));
SnapshotEnumerator enumerator =
- new SnapshotEnumerator(
- tablePath,
- (DataTableScan) table.newScan().withIncremental(true),
- ChangelogProducer.INPUT,
- Snapshot.FIRST_SNAPSHOT_ID - 1);
+ new InputChangelogSnapshotEnumerator(
+ tablePath, table.newScan(),
CoreOptions.LogStartupMode.FULL, null, 1L);
FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
i -> {
- SnapshotEnumerator.EnumeratorResult result =
enumerator.call();
- assertThat(result).isNotNull();
+ TableScan.Plan plan = enumerator.enumerate();
+ assertThat(plan).isNotNull();
- List<Split> splits = result.plan.splits();
+ List<Split> splits = plan.splits();
TableRead read = table.newRead();
for (int j = 0; j < 2; j++) {
assertThat(
@@ -403,7 +402,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
}
// no more changelog
- assertThat(enumerator.call()).isNull();
+ assertThat(enumerator.enumerate()).isNull();
// write another commit
TableWrite write = table.newWrite(commitUser);
@@ -418,7 +417,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
assertNextSnapshot.apply(2);
// no more changelog
- assertThat(enumerator.call()).isNull();
+ assertThat(enumerator.enumerate()).isNull();
}
private void writeData() throws Exception {
@@ -591,7 +590,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.write(rowData(1, 20, 200L));
commit.commit(0, write.prepareCommit(true, 0));
- DataTableScan scan = table.newScan().withIncremental(true);
+ DataTableScan scan = table.newScan().withKind(ScanKind.DELTA);
List<DataSplit> splits0 = scan.plan().splits;
assertThat(splits0).hasSize(1);
assertThat(splits0.get(0).files()).hasSize(1);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
index 21321635..47dadf70 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Equal;
import org.apache.flink.table.store.file.predicate.IsNull;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
@@ -340,7 +341,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
writeAndCheckFileResult(
schemas -> {
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits,
STREAMING_SCHEMA_0_ROW_TO_STRING))
@@ -353,7 +354,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
},
(files, schemas) -> {
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits,
STREAMING_SCHEMA_1_ROW_TO_STRING))
@@ -373,7 +374,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
writeAndCheckFileResult(
schemas -> {
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// project "c", "b", "pt" in schema0
TableRead read =
table.newRead().withProjection(PROJECTION);
@@ -384,7 +385,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
},
(files, schemas) -> {
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// project "a", "kt", "d" in schema1
TableRead read =
table.newRead().withProjection(PROJECTION);
@@ -404,7 +405,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
PredicateBuilder builder =
new PredicateBuilder(RowDataType.toRowType(false,
SCHEMA_0_FIELDS));
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// filter with "b" = 15 in schema0
TableRead read =
table.newRead().withFilter(builder.equal(2, 15));
@@ -418,7 +419,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
PredicateBuilder builder =
new PredicateBuilder(RowDataType.toRowType(false,
SCHEMA_1_FIELDS));
FileStoreTable table = createFileStoreTable(schemas);
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
+ List<Split> splits =
table.newScan().withKind(ScanKind.DELTA).plan().splits();
// filter with "d" = 15 in schema1 which should be mapped
to "b" = 15 in schema0
TableRead read1 =
table.newRead().withFilter(builder.equal(1, 15));
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
new file mode 100644
index 00000000..916df0cc
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumeratorTestBase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Base test class for {@link DataFileSnapshotEnumerator}.
+ *
+ * <p>TODO: merge this class with FileStoreTableTestBase.
+ */
+public abstract class DataFileSnapshotEnumeratorTestBase {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"pt", "a", "b"});
+
+ @TempDir java.nio.file.Path tempDir;
+
+ protected Path tablePath;
+ protected String commitUser;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(TestAtomicRenameFileSystem.SCHEME + "://" +
tempDir.toString());
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ protected GenericRowData rowData(Object... values) {
+ return GenericRowData.of(values);
+ }
+
+ protected GenericRowData rowDataWithKind(RowKind rowKind, Object...
values) {
+ return GenericRowData.ofKind(rowKind, values);
+ }
+
+ protected BinaryRowData binaryRow(int a) {
+ BinaryRowData b = new BinaryRowData(1);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, a);
+ writer.complete();
+ return b;
+ }
+
+ protected List<String> getResult(TableRead read, List<Split> splits)
throws Exception {
+ List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new
ArrayList<>();
+ for (Split split : splits) {
+ readers.add(() -> read.createReader(split));
+ }
+ RecordReader<RowData> recordReader =
ConcatRecordReader.create(readers);
+ RecordReaderIterator<RowData> iterator = new
RecordReaderIterator<>(recordReader);
+ List<String> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ result.add(
+ String.format(
+ "%s %d|%d|%d",
+ rowData.getRowKind().shortString(),
+ rowData.getInt(0),
+ rowData.getInt(1),
+ rowData.getLong(2)));
+ }
+ iterator.close();
+ return result;
+ }
+
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Configuration conf = getConf();
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ TableSchema tableSchema =
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(tablePath, tableSchema, conf);
+ }
+
+ protected abstract Configuration getConf();
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..65fe9f53
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DeltaSnapshotEnumerator}. */
+public class DeltaSnapshotEnumeratorTest extends
DataFileSnapshotEnumeratorTestBase {
+
+ @Test
+ public void testFullStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+ SnapshotEnumerator enumerator =
+ new DeltaSnapshotEnumerator(
+ tablePath, table.newScan(),
CoreOptions.LogStartupMode.FULL, null, null);
+
+ // first call without any snapshot, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ // write base data
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // first call with snapshot, should return complete records from 2nd
commit
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
+
+ // incremental call without new snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ // write incremental data
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // first incremental call, should return incremental records from 3rd
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201",
"+I 1|40|400"));
+
+ // second incremental call, should return incremental records from 4th
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+
+ // no more new snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testFromTimestampStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // log current time millis, we'll start from here
+ Thread.sleep(50);
+ long startMillis = System.currentTimeMillis();
+ Thread.sleep(50);
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // first call with snapshot, should return empty plan
+ SnapshotEnumerator enumerator =
+ new DeltaSnapshotEnumerator(
+ tablePath,
+ table.newScan(),
+ CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ startMillis,
+ null);
+
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(1);
+ assertThat(plan.splits()).isEmpty();
+
+ // first incremental call, should return incremental records from 2nd
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300",
"-D 1|40|400"));
+
+ // second incremental call, should return incremental records from 3rd
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201",
"+I 1|40|400"));
+
+ // no new snapshots
+ assertThat(enumerator.enumerate()).isNull();
+
+ // more incremental records
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testLatestStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ SnapshotEnumerator enumerator =
+ new DeltaSnapshotEnumerator(
+ tablePath, table.newScan(),
CoreOptions.LogStartupMode.LATEST, null, null);
+
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(plan.splits()).isEmpty();
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201",
"+I 1|40|400"));
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Override
+ protected Configuration getConf() {
+ return new Configuration();
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..72531f4f
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FullCompactionChangelogSnapshotEnumerator}. */
+public class FullCompactionChangelogSnapshotEnumeratorTest
+ extends DataFileSnapshotEnumeratorTestBase {
+
+ @Test
+ public void testFullStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+ SnapshotEnumerator enumerator =
+ new FullCompactionChangelogSnapshotEnumerator(
+ tablePath,
+ table.newScan(),
+ table.options().numLevels() - 1,
+ CoreOptions.LogStartupMode.FULL,
+ null,
+ null);
+
+ // first call without any snapshot, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ // write base data
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // some more records
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // first call with snapshot, should return full compacted records from
3rd commit
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
+
+ // incremental call without new snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ // write incremental data
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // no new compact snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+
+ // full compaction done, read new changelog
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(6);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "-U 1|10|101",
+ "+U 1|10|103",
+ "-U 1|20|200",
+ "+U 1|20|201",
+ "+I 1|50|500"));
+
+ // no more new snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testFromTimestampStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // log current time millis, we'll start from here
+ Thread.sleep(50);
+ long startMillis = System.currentTimeMillis();
+ Thread.sleep(50);
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ SnapshotEnumerator enumerator =
+ new FullCompactionChangelogSnapshotEnumerator(
+ tablePath,
+ table.newScan(),
+ table.options().numLevels() - 1,
+ CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ startMillis,
+ null);
+
+ // first call, should return empty plan
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(plan.splits()).isEmpty();
+
+ // first incremental call, no new compact snapshot, should be null
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(6);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "-U 1|10|101",
+ "+U 1|10|103",
+ "-U 1|20|200",
+ "+U 1|20|201",
+ "+I 1|50|500"));
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testLatestStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ SnapshotEnumerator enumerator =
+ new FullCompactionChangelogSnapshotEnumerator(
+ tablePath,
+ table.newScan(),
+ table.options().numLevels() - 1,
+ CoreOptions.LogStartupMode.LATEST,
+ null,
+ null);
+
+ // first call, should be empty plan
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(plan.splits()).isEmpty();
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // first incremental call, no new compact snapshot, should be null
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(6);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "-U 1|10|101",
+ "+U 1|10|103",
+ "-U 1|20|200",
+ "+U 1|20|201",
+ "+I 1|50|500"));
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Override
+ protected Configuration getConf() {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.FULL_COMPACTION);
+ return conf;
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
new file mode 100644
index 00000000..5760110c
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link InputChangelogSnapshotEnumerator}. */
+public class InputChangelogSnapshotEnumeratorTest extends
DataFileSnapshotEnumeratorTestBase {
+
+ @Test
+ public void testFullStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+ SnapshotEnumerator enumerator =
+ new InputChangelogSnapshotEnumerator(
+ tablePath, table.newScan(),
CoreOptions.LogStartupMode.FULL, null, null);
+
+ // first call without any snapshot, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ // write base data
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // first call with snapshot, should return complete records from 2nd
commit
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
+
+ // incremental call without new snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ // write incremental data
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // first incremental call, should return incremental records from 3rd
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I
1|20|201", "+I 1|40|400"));
+
+ // second incremental call, should return incremental records from 4th
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+
+ // no more new snapshots, should return null
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testFromTimestampStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // log current time millis, we'll start from here
+ Thread.sleep(50);
+ long startMillis = System.currentTimeMillis();
+ Thread.sleep(50);
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // first call with snapshot, should return empty plan
+ SnapshotEnumerator enumerator =
+ new InputChangelogSnapshotEnumerator(
+ tablePath,
+ table.newScan(),
+ CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ startMillis,
+ null);
+
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(1);
+ assertThat(plan.splits()).isEmpty();
+
+ // first incremental call, should return incremental records from 2nd
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|30|300",
"-D 1|40|400"));
+
+ // second incremental call, should return incremental records from 3rd
commit
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I
1|20|201", "+I 1|40|400"));
+
+ // no new snapshots
+ assertThat(enumerator.enumerate()).isNull();
+
+ // more incremental records
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testLatestStartupMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ SnapshotEnumerator enumerator =
+ new InputChangelogSnapshotEnumerator(
+ tablePath, table.newScan(),
CoreOptions.LogStartupMode.LATEST, null, null);
+
+ DataTableScan.DataFilePlan plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(plan.splits()).isEmpty();
+
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
+ write.write(rowData(1, 20, 201L));
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 40, 400L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(3);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(
+ Arrays.asList("-D 1|10|101", "+I 1|10|102", "+I
1|20|201", "+I 1|40|400"));
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.write(rowData(1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.write(rowData(1, 50, 500L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ plan = enumerator.enumerate();
+ assertThat(plan.snapshotId).isEqualTo(4);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400",
"+I 1|50|500"));
+ assertThat(enumerator.enumerate()).isNull();
+
+ write.close();
+ commit.close();
+ }
+
+ @Override
+ protected Configuration getConf() {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
+ return conf;
+ }
+}