This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7e580bda8 [core] Clean TableScan related codes (#805)
7e580bda8 is described below
commit 7e580bda8c786b1037d983b8d523e73f6e6e6dc7
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 4 13:24:06 2023 +0800
[core] Clean TableScan related codes (#805)
---
.../main/java/org/apache/paimon/CoreOptions.java | 9 ++
.../paimon/table/AbstractFileStoreTable.java | 33 ++++---
.../java/org/apache/paimon/table/DataTable.java | 8 --
...aTableScan.java => AbstractInnerTableScan.java} | 13 ++-
.../{BatchDataTableScan.java => DataFilePlan.java} | 30 +++---
.../apache/paimon/table/source/DataTableScan.java | 65 -------------
...ScanImpl.java => InnerStreamTableScanImpl.java} | 83 ++++------------
.../apache/paimon/table/source/InnerTableScan.java | 10 --
...aTableScanImpl.java => InnerTableScanImpl.java} | 34 +------
.../paimon/table/source/StreamDataTableScan.java | 52 -----------
.../CompactionChangelogFollowUpScanner.java | 8 +-
.../ContinuousCompactorFollowUpScanner.java | 8 +-
.../source/snapshot/DeltaFollowUpScanner.java | 8 +-
.../table/source/snapshot/FollowUpScanner.java | 14 +--
.../snapshot/InputChangelogFollowUpScanner.java | 8 +-
.../table/source/snapshot/SnapshotSplitReader.java | 46 ---------
.../source/snapshot/SnapshotSplitReaderImpl.java | 44 ++++++++-
.../table/source/snapshot/StartingScanner.java | 8 +-
.../StaticFromSnapshotStartingScanner.java | 3 +-
.../apache/paimon/table/system/AuditLogTable.java | 98 +++----------------
.../apache/paimon/table/system/BucketsTable.java | 11 +--
.../org/apache/paimon/table/system/FilesTable.java | 24 ++---
.../table/ChangelogWithKeyFileStoreTableTest.java | 9 +-
.../paimon/table/source/StartupModeTest.java | 104 ++++++++++-----------
...TableScanTest.java => StreamTableScanTest.java} | 20 ++--
...chDataTableScanTest.java => TableScanTest.java} | 8 +-
.../CompactionChangelogFollowUpScannerTest.java | 4 +-
.../ContinuousCompactorFollowUpScannerTest.java | 4 +-
.../source/snapshot/DeltaFollowUpScannerTest.java | 4 +-
.../InputChangelogFollowUpScannerTest.java | 4 +-
.../flink/lookup/FileStoreLookupFunction.java | 1 -
.../paimon/flink/lookup}/TableStreamingReader.java | 30 ++++--
.../flink/source/CompactorSourceBuilder.java | 36 +++++--
.../flink/source/ContinuousFileStoreSource.java | 16 +---
.../paimon/flink/source/StaticFileStoreSource.java | 14 +--
.../apache/paimon/flink/utils/TableScanUtils.java | 56 +----------
.../paimon/flink/action/CompactActionITCase.java | 17 ++--
.../flink/action/DropPartitionActionITCase.java | 6 +-
.../flink/sink/CommitterOperatorTestBase.java | 2 +-
.../paimon/flink/sink/CompactorSinkITCase.java | 14 +--
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 4 +-
.../paimon/flink/source/CompactorSourceITCase.java | 27 +++++-
.../source/ContinuousFileSplitEnumeratorTest.java | 74 +++------------
.../source/FileStoreSourceSplitGeneratorTest.java | 8 +-
.../paimon/hive/mapred/PaimonInputFormat.java | 9 +-
45 files changed, 376 insertions(+), 712 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 865ba1b3a..ae188173b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -559,6 +559,15 @@ public class CoreOptions implements Serializable {
.withDescription(
"Full compaction will be constantly triggered
after delta commits.");
+ @ExcludeFromDocumentation("Internal use only")
+ public static final ConfigOption<Boolean> STREAMING_COMPACT =
+ key("streaming-compact")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Only used to force TableScan to construct
'ContinuousCompactorStartingScanner' and "
+ + "'ContinuousCompactorFollowUpScanner'
for dedicated streaming compaction job.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index aea57e2c8..7881d113a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -31,16 +31,17 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.TableCommitImpl;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.BatchDataTableScanImpl;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.InnerTableScanImpl;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScanImpl;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -78,14 +79,13 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
@Override
- public BatchDataTableScan newScan() {
- return new BatchDataTableScanImpl(
- coreOptions(), newSnapshotSplitReader(), snapshotManager());
+ public InnerTableScan newScan() {
+ return new InnerTableScanImpl(coreOptions(), newSnapshotSplitReader(),
snapshotManager());
}
@Override
- public StreamDataTableScan newStreamScan() {
- return new StreamDataTableScanImpl(
+ public InnerStreamTableScan newStreamScan() {
+ return new InnerStreamTableScanImpl(
coreOptions(),
newSnapshotSplitReader(),
snapshotManager(),
@@ -103,7 +103,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
// check option is not immutable
- Map<String, String> options = tableSchema.options();
+ Map<String, String> options = new HashMap<>(tableSchema.options());
dynamicOptions.forEach(
(k, v) -> {
if (!Objects.equals(v, options.get(k))) {
@@ -111,10 +111,17 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
});
- Options newOptions = Options.fromMap(options);
+ // merge non-null dynamic options into schema.options
+ dynamicOptions.forEach(
+ (k, v) -> {
+ if (v == null) {
+ options.remove(k);
+ } else {
+ options.put(k, v);
+ }
+ });
- // merge dynamic options into schema.options
- dynamicOptions.forEach(newOptions::setString);
+ Options newOptions = Options.fromMap(options);
// set path always
newOptions.set(PATH, path.toString());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index f3b7f92af..0ab7d97cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.utils.SnapshotManager;
@@ -31,12 +29,6 @@ public interface DataTable extends InnerTable {
SnapshotSplitReader newSnapshotSplitReader();
- @Override
- BatchDataTableScan newScan();
-
- @Override
- StreamDataTableScan newStreamScan();
-
CoreOptions coreOptions();
SnapshotManager snapshotManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
similarity index 89%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 345535d7e..14066239f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
+import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
@@ -37,18 +38,18 @@ import org.apache.paimon.utils.Preconditions;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
-public abstract class AbstractDataTableScan implements DataTableScan {
+public abstract class AbstractInnerTableScan implements InnerTableScan {
private final CoreOptions options;
protected final SnapshotSplitReader snapshotSplitReader;
- protected AbstractDataTableScan(CoreOptions options, SnapshotSplitReader
snapshotSplitReader) {
+ protected AbstractInnerTableScan(CoreOptions options, SnapshotSplitReader
snapshotSplitReader) {
this.options = options;
this.snapshotSplitReader = snapshotSplitReader;
}
@VisibleForTesting
- public AbstractDataTableScan withBucket(int bucket) {
+ public AbstractInnerTableScan withBucket(int bucket) {
snapshotSplitReader.withBucket(bucket);
return this;
}
@@ -58,6 +59,12 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
}
protected StartingScanner createStartingScanner(boolean isStreaming) {
+ if (options.toConfiguration().get(CoreOptions.STREAMING_COMPACT)) {
+ Preconditions.checkArgument(
+ isStreaming, "Set 'streaming-compact' in batch mode. This
is unexpected.");
+ return new ContinuousCompactorStartingScanner();
+ }
+
CoreOptions.StartupMode startupMode = options.startupMode();
switch (startupMode) {
case LATEST_FULL:
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
similarity index 59%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
index a25f2dfae..380325b9d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
@@ -18,25 +18,29 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
-/** {@link DataTableScan} for batch planning. */
-public interface BatchDataTableScan extends DataTableScan {
+import javax.annotation.Nullable;
- @Override
- BatchDataTableScan withSnapshot(long snapshotId);
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
- @Override
- BatchDataTableScan withFilter(Predicate predicate);
+/** Scanning plan containing snapshot ID and input splits. */
+public class DataFilePlan implements TableScan.Plan {
- @Override
- BatchDataTableScan withKind(ScanKind scanKind);
+ private final List<DataSplit> splits;
+
+ public DataFilePlan(List<DataSplit> splits) {
+ this.splits = splits;
+ }
@Override
- BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter);
+ public List<Split> splits() {
+ return new ArrayList<>(splits);
+ }
- BatchDataTableScan withStartingScanner(StartingScanner startingScanner);
+ public static DataFilePlan fromResult(@Nullable StartingScanner.Result
result) {
+ return new DataFilePlan(result == null ? Collections.emptyList() :
result.splits());
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
deleted file mode 100644
index c5249c5bc..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-
-/** A {@link TableScan} for reading data. */
-public interface DataTableScan extends InnerTableScan {
-
- DataTableScan withKind(ScanKind kind);
-
- DataTableScan withSnapshot(long snapshotId);
-
- DataTableScan withLevelFilter(Filter<Integer> levelFilter);
-
- DataTableScan withFilter(Predicate predicate);
-
- DataTableScan.DataFilePlan plan();
-
- /** Scanning plan containing snapshot ID and input splits. */
- class DataFilePlan implements Plan {
-
- public final List<DataSplit> splits;
-
- @VisibleForTesting
- public DataFilePlan(List<DataSplit> splits) {
- this.splits = splits;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public List<Split> splits() {
- return (List) splits;
- }
-
- public static DataFilePlan fromResult(@Nullable StartingScanner.Result
result) {
- return new DataFilePlan(result == null ? Collections.emptyList() :
result.splits());
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
similarity index 72%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index afa4a302b..46c01b219 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -20,18 +20,15 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
+import
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -41,10 +38,11 @@ import javax.annotation.Nullable;
import java.util.Collections;
-/** {@link DataTableScan} for streaming planning. */
-public class StreamDataTableScanImpl extends AbstractDataTableScan implements
StreamDataTableScan {
+/** {@link StreamTableScan} implementation for streaming planning. */
+public class InnerStreamTableScanImpl extends AbstractInnerTableScan
+ implements InnerStreamTableScan {
- private static final Logger LOG =
LoggerFactory.getLogger(StreamDataTableScan.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(InnerStreamTableScanImpl.class);
private final CoreOptions options;
private final SnapshotManager snapshotManager;
@@ -56,7 +54,7 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
private boolean isEnd = false;
@Nullable private Long nextSnapshotId;
- public StreamDataTableScanImpl(
+ public InnerStreamTableScanImpl(
CoreOptions options,
SnapshotSplitReader snapshotSplitReader,
SnapshotManager snapshotManager,
@@ -68,63 +66,13 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
}
@Override
- public StreamDataTableScanImpl withSnapshot(long snapshotId) {
- snapshotSplitReader.withSnapshot(snapshotId);
- return this;
- }
-
- @Override
- public StreamDataTableScanImpl withFilter(Predicate predicate) {
+ public InnerStreamTableScanImpl withFilter(Predicate predicate) {
snapshotSplitReader.withFilter(predicate);
return this;
}
@Override
- public StreamDataTableScanImpl withKind(ScanKind scanKind) {
- snapshotSplitReader.withKind(scanKind);
- return this;
- }
-
- @Override
- public StreamDataTableScanImpl withLevelFilter(Filter<Integer>
levelFilter) {
- snapshotSplitReader.withLevelFilter(levelFilter);
- return this;
- }
-
- @Override
- public boolean supportStreamingReadOverwrite() {
- return supportStreamingReadOverwrite;
- }
-
- @Override
- public StreamDataTableScan withStartingScanner(StartingScanner
startingScanner) {
- this.startingScanner = startingScanner;
- return this;
- }
-
- @Override
- public StreamDataTableScan withFollowUpScanner(FollowUpScanner
followUpScanner) {
- this.followUpScanner = followUpScanner;
- return this;
- }
-
- @Override
- public StreamDataTableScan withBoundedChecker(BoundedChecker
boundedChecker) {
- this.boundedChecker = boundedChecker;
- return this;
- }
-
- @Override
- public StreamDataTableScan withSnapshotStarting() {
- startingScanner =
- options.startupMode() == CoreOptions.StartupMode.COMPACTED_FULL
- ? new CompactedStartingScanner()
- : new FullStartingScanner();
- return this;
- }
-
- @Override
- public DataFilePlan plan() {
+ public Plan plan() {
if (startingScanner == null) {
startingScanner = createStartingScanner(true);
}
@@ -142,7 +90,7 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
}
}
- private DataFilePlan tryFirstPlan() {
+ private Plan tryFirstPlan() {
StartingScanner.Result result = startingScanner.scan(snapshotManager,
snapshotSplitReader);
if (result != null) {
long snapshotId = result.snapshotId();
@@ -154,7 +102,7 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
return DataFilePlan.fromResult(result);
}
- private DataFilePlan nextPlan() {
+ private Plan nextPlan() {
while (true) {
if (isEnd) {
throw new EndOfScanException();
@@ -175,17 +123,16 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
// first check changes of overwrite
if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE
- && supportStreamingReadOverwrite()) {
+ && supportStreamingReadOverwrite) {
LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
- DataTableScan.DataFilePlan overwritePlan =
+ Plan overwritePlan =
followUpScanner.getOverwriteChangesPlan(
nextSnapshotId, snapshotSplitReader);
nextSnapshotId++;
return overwritePlan;
} else if (followUpScanner.shouldScanSnapshot(snapshot)) {
LOG.debug("Find snapshot id {}.", nextSnapshotId);
- DataTableScan.DataFilePlan plan =
- followUpScanner.scan(nextSnapshotId,
snapshotSplitReader);
+ Plan plan = followUpScanner.scan(nextSnapshotId,
snapshotSplitReader);
nextSnapshotId++;
return plan;
} else {
@@ -195,6 +142,10 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
}
private FollowUpScanner createFollowUpScanner() {
+ if (options.toConfiguration().get(CoreOptions.STREAMING_COMPACT)) {
+ return new ContinuousCompactorFollowUpScanner();
+ }
+
CoreOptions.ChangelogProducer changelogProducer =
options.changelogProducer();
FollowUpScanner followUpScanner;
switch (changelogProducer) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 8c485743e..5d78e6e2a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -19,19 +19,9 @@
package org.apache.paimon.table.source;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-
-import java.util.List;
/** Inner {@link TableScan} contains filter push down. */
public interface InnerTableScan extends TableScan {
- default InnerTableScan withFilter(List<Predicate> predicates) {
- if (predicates == null || predicates.isEmpty()) {
- return this;
- }
- return withFilter(PredicateBuilder.and(predicates));
- }
-
InnerTableScan withFilter(Predicate predicate);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
similarity index 67%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index d0dead613..e47c22ed5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -19,15 +19,13 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
-/** {@link DataTableScan} for batch planning. */
-public class BatchDataTableScanImpl extends AbstractDataTableScan implements
BatchDataTableScan {
+/** {@link TableScan} implementation for batch planning. */
+public class InnerTableScanImpl extends AbstractInnerTableScan {
private final SnapshotManager snapshotManager;
@@ -35,7 +33,7 @@ public class BatchDataTableScanImpl extends
AbstractDataTableScan implements Bat
private boolean hasNext;
- public BatchDataTableScanImpl(
+ public InnerTableScanImpl(
CoreOptions options,
SnapshotSplitReader snapshotSplitReader,
SnapshotManager snapshotManager) {
@@ -45,35 +43,11 @@ public class BatchDataTableScanImpl extends
AbstractDataTableScan implements Bat
}
@Override
- public BatchDataTableScan withSnapshot(long snapshotId) {
- snapshotSplitReader.withSnapshot(snapshotId);
- return this;
- }
-
- @Override
- public BatchDataTableScan withFilter(Predicate predicate) {
+ public InnerTableScan withFilter(Predicate predicate) {
snapshotSplitReader.withFilter(predicate);
return this;
}
- @Override
- public BatchDataTableScan withKind(ScanKind scanKind) {
- snapshotSplitReader.withKind(scanKind);
- return this;
- }
-
- @Override
- public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
- snapshotSplitReader.withLevelFilter(levelFilter);
- return this;
- }
-
- @Override
- public BatchDataTableScan withStartingScanner(StartingScanner
startingScanner) {
- this.startingScanner = startingScanner;
- return this;
- }
-
@Override
public DataFilePlan plan() {
if (startingScanner == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
deleted file mode 100644
index 4d60c4402..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
-
-/** {@link DataTableScan} for streaming planning. */
-public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan {
-
- @Override
- StreamDataTableScan withSnapshot(long snapshotId);
-
- @Override
- StreamDataTableScan withFilter(Predicate predicate);
-
- @Override
- StreamDataTableScan withKind(ScanKind scanKind);
-
- @Override
- StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter);
-
- boolean supportStreamingReadOverwrite();
-
- StreamDataTableScan withStartingScanner(StartingScanner startingScanner);
-
- StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner);
-
- StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker);
-
- StreamDataTableScan withSnapshotStarting();
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index ac957587a..0f737e7e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -21,7 +21,8 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +50,8 @@ public class CompactionChangelogFollowUpScanner implements
FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan scan(
- long snapshotId, SnapshotSplitReader snapshotSplitReader) {
- return new DataTableScan.DataFilePlan(
+ public TableScan.Plan scan(long snapshotId, SnapshotSplitReader
snapshotSplitReader) {
+ return new DataFilePlan(
snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index aa9e83e26..2ea87b325 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -20,7 +20,8 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,9 +46,8 @@ public class ContinuousCompactorFollowUpScanner implements
FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan scan(
- long snapshotId, SnapshotSplitReader snapshotSplitReader) {
- return new DataTableScan.DataFilePlan(
+ public TableScan.Plan scan(long snapshotId, SnapshotSplitReader
snapshotSplitReader) {
+ return new DataFilePlan(
snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
index 97e30d141..0ba63f71a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
@@ -21,7 +21,8 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,9 +46,8 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan scan(
- long snapshotId, SnapshotSplitReader snapshotSplitReader) {
- return new DataTableScan.DataFilePlan(
+ public TableScan.Plan scan(long snapshotId, SnapshotSplitReader
snapshotSplitReader) {
+ return new DataFilePlan(
snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
index 33886e5a5..543a06ca9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
@@ -19,19 +19,19 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
-/** Helper class for the follow-up planning of {@link StreamDataTableScan}. */
+/** Helper class for the follow-up planning of {@link StreamTableScan}. */
public interface FollowUpScanner {
boolean shouldScanSnapshot(Snapshot snapshot);
- DataTableScan.DataFilePlan scan(long snapshotId, SnapshotSplitReader
snapshotSplitReader);
+ TableScan.Plan scan(long snapshotId, SnapshotSplitReader
snapshotSplitReader);
- default DataTableScan.DataFilePlan getOverwriteChangesPlan(
+ default TableScan.Plan getOverwriteChangesPlan(
long snapshotId, SnapshotSplitReader snapshotSplitReader) {
- return new DataTableScan.DataFilePlan(
-
snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
+ return new
DataFilePlan(snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
index c901ef528..1e0ea0919 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -21,7 +21,8 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.TableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,9 +46,8 @@ public class InputChangelogFollowUpScanner implements
FollowUpScanner {
}
@Override
- public DataTableScan.DataFilePlan scan(
- long snapshotId, SnapshotSplitReader snapshotSplitReader) {
- return new DataTableScan.DataFilePlan(
+ public TableScan.Plan scan(long snapshotId, SnapshotSplitReader
snapshotSplitReader) {
+ return new DataFilePlan(
snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
index 814ff32e3..d813ffcc7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
@@ -19,17 +19,12 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.Filter;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/** Read splits from specified {@link Snapshot} with given configuration. */
public interface SnapshotSplitReader {
@@ -49,45 +44,4 @@ public interface SnapshotSplitReader {
/** Get splits from an overwrite snapshot. */
List<DataSplit> overwriteSplits();
-
- static List<DataSplit> generateSplits(
- long snapshotId,
- boolean isIncremental,
- boolean reverseRowKind,
- SplitGenerator splitGenerator,
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
- List<DataSplit> splits = new ArrayList<>();
- for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
- groupedDataFiles.entrySet()) {
- BinaryRow partition = entry.getKey();
- Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
- for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
- int bucket = bucketEntry.getKey();
- if (isIncremental) {
- // Don't split when incremental
- splits.add(
- new DataSplit(
- snapshotId,
- partition,
- bucket,
- bucketEntry.getValue(),
- true,
- reverseRowKind));
- } else {
- splitGenerator.split(bucketEntry.getValue()).stream()
- .map(
- files ->
- new DataSplit(
- snapshotId,
- partition,
- bucket,
- files,
- false,
- reverseRowKind))
- .forEach(splits::add);
- }
- }
- }
- return splits;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
index eac6893ca..5b3176e50 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
@@ -43,7 +44,6 @@ import java.util.Optional;
import java.util.function.BiConsumer;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
-import static
org.apache.paimon.table.source.snapshot.SnapshotSplitReader.generateSplits;
/** Implementation of {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderImpl implements SnapshotSplitReader {
@@ -194,4 +194,46 @@ public class SnapshotSplitReaderImpl implements
SnapshotSplitReader {
}
return lazyPartitionComparator;
}
+
+ @VisibleForTesting
+ public static List<DataSplit> generateSplits(
+ long snapshotId,
+ boolean isIncremental,
+ boolean reverseRowKind,
+ SplitGenerator splitGenerator,
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
+ List<DataSplit> splits = new ArrayList<>();
+ for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
+ groupedDataFiles.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
+ int bucket = bucketEntry.getKey();
+ if (isIncremental) {
+ // Don't split when incremental
+ splits.add(
+ new DataSplit(
+ snapshotId,
+ partition,
+ bucket,
+ bucketEntry.getValue(),
+ true,
+ reverseRowKind));
+ } else {
+ splitGenerator.split(bucketEntry.getValue()).stream()
+ .map(
+ files ->
+ new DataSplit(
+ snapshotId,
+ partition,
+ bucket,
+ files,
+ false,
+ reverseRowKind))
+ .forEach(splits::add);
+ }
+ }
+ }
+ return splits;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index e0be16c6e..3e850de3c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -18,9 +18,8 @@
package org.apache.paimon.table.source.snapshot;
-import org.apache.paimon.table.source.BatchDataTableScan;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -28,10 +27,7 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
-/**
- * Helper class for the first planning of {@link BatchDataTableScan} and {@link
- * StreamDataTableScan}.
- */
+/** Helper class for the first planning of {@link TableScan}. */
public interface StartingScanner {
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 4fee6fafd..5ccf82563 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -26,8 +26,7 @@ import javax.annotation.Nullable;
/**
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
- * org.apache.flink.table.store.CoreOptions.StartupMode#FROM_SNAPSHOT_FULL}
startup mode of a batch
- * read.
+ * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
*/
public class StaticFromSnapshotStartingScanner implements StartingScanner {
private final long snapshotId;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 78536c951..a12cd0862 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -33,16 +33,12 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.BatchDataTableScan;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -127,12 +123,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public BatchDataTableScan newScan() {
+ public InnerTableScan newScan() {
return new AuditLogBatchScan(dataTable.newScan());
}
@Override
- public StreamDataTableScan newStreamScan() {
+ public InnerStreamTableScan newStreamScan() {
return new AuditLogStreamScan(dataTable.newStreamScan());
}
@@ -222,111 +218,45 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
}
- private class AuditLogBatchScan implements BatchDataTableScan {
+ private class AuditLogBatchScan implements InnerTableScan {
- private final BatchDataTableScan batchScan;
+ private final InnerTableScan batchScan;
- private AuditLogBatchScan(BatchDataTableScan batchScan) {
+ private AuditLogBatchScan(InnerTableScan batchScan) {
this.batchScan = batchScan;
}
@Override
- public BatchDataTableScan withFilter(Predicate predicate) {
+ public InnerTableScan withFilter(Predicate predicate) {
convert(predicate).ifPresent(batchScan::withFilter);
return this;
}
@Override
- public BatchDataTableScan withKind(ScanKind kind) {
- batchScan.withKind(kind);
- return this;
- }
-
- @Override
- public BatchDataTableScan withSnapshot(long snapshotId) {
- batchScan.withSnapshot(snapshotId);
- return this;
- }
-
- @Override
- public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter)
{
- batchScan.withLevelFilter(levelFilter);
- return this;
- }
-
- @Override
- public DataTableScan.DataFilePlan plan() {
+ public Plan plan() {
return batchScan.plan();
}
-
- @Override
- public BatchDataTableScan withStartingScanner(StartingScanner
startingScanner) {
- return batchScan.withStartingScanner(startingScanner);
- }
}
- private class AuditLogStreamScan implements StreamDataTableScan {
+ private class AuditLogStreamScan implements InnerStreamTableScan {
- private final StreamDataTableScan streamScan;
+ private final InnerStreamTableScan streamScan;
- private AuditLogStreamScan(StreamDataTableScan streamScan) {
+ private AuditLogStreamScan(InnerStreamTableScan streamScan) {
this.streamScan = streamScan;
}
@Override
- public StreamDataTableScan withFilter(Predicate predicate) {
+ public InnerStreamTableScan withFilter(Predicate predicate) {
convert(predicate).ifPresent(streamScan::withFilter);
return this;
}
@Override
- public StreamDataTableScan withKind(ScanKind kind) {
- streamScan.withKind(kind);
- return this;
- }
-
- @Override
- public StreamDataTableScan withSnapshot(long snapshotId) {
- streamScan.withSnapshot(snapshotId);
- return this;
- }
-
- @Override
- public StreamDataTableScan withLevelFilter(Filter<Integer>
levelFilter) {
- streamScan.withLevelFilter(levelFilter);
- return this;
- }
-
- @Override
- public DataTableScan.DataFilePlan plan() {
+ public Plan plan() {
return streamScan.plan();
}
- @Override
- public boolean supportStreamingReadOverwrite() {
- return streamScan.supportStreamingReadOverwrite();
- }
-
- @Override
- public StreamDataTableScan withStartingScanner(StartingScanner
startingScanner) {
- return streamScan.withStartingScanner(startingScanner);
- }
-
- @Override
- public StreamDataTableScan withFollowUpScanner(FollowUpScanner
followUpScanner) {
- return streamScan.withFollowUpScanner(followUpScanner);
- }
-
- @Override
- public StreamDataTableScan withBoundedChecker(BoundedChecker
boundedChecker) {
- return streamScan.withBoundedChecker(boundedChecker);
- }
-
- @Override
- public StreamDataTableScan withSnapshotStarting() {
- return streamScan.withSnapshotStarting();
- }
-
@Nullable
@Override
public Long checkpoint() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 819454e78..d1ed08ab4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -31,12 +31,11 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.BatchDataTableScan;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
@@ -120,12 +119,12 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
}
@Override
- public BatchDataTableScan newScan() {
+ public InnerTableScan newScan() {
return wrapped.newScan();
}
@Override
- public StreamDataTableScan newStreamScan() {
+ public InnerStreamTableScan newStreamScan() {
return wrapped.newStreamScan();
}
@@ -140,7 +139,7 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
}
@Override
- public Table copy(Map<String, String> dynamicOptions) {
+ public BucketsTable copy(Map<String, String> dynamicOptions) {
return new BucketsTable(wrapped.copy(dynamicOptions), isContinuous);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 3c213b04c..5974925eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -35,11 +35,11 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -166,14 +166,14 @@ public class FilesTable implements ReadonlyTable {
@Override
public long rowCount() {
- DataTableScan.DataFilePlan plan = dataFilePlan();
- if (plan == null) {
- return 0;
- }
- return plan.splits.stream().mapToLong(s -> s.files().size()).sum();
+ TableScan.Plan plan = plan();
+ return plan.splits().stream()
+ .map(s -> (DataSplit) s)
+ .mapToLong(s -> s.files().size())
+ .sum();
}
- private DataTableScan.DataFilePlan dataFilePlan() {
+ private TableScan.Plan plan() {
return storeTable.newScan().plan();
}
@@ -224,8 +224,8 @@ public class FilesTable implements ReadonlyTable {
}
FilesSplit filesSplit = (FilesSplit) split;
FileStoreTable table = filesSplit.storeTable;
- DataTableScan.DataFilePlan dataFilePlan =
filesSplit.dataFilePlan();
- if (dataFilePlan == null) {
+ TableScan.Plan plan = filesSplit.plan();
+ if (plan.splits().isEmpty()) {
return new IteratorRecordReader<>(Collections.emptyIterator());
}
@@ -260,13 +260,13 @@ public class FilesTable implements ReadonlyTable {
});
}
};
- for (DataSplit dataSplit : dataFilePlan.splits) {
+ for (Split dataSplit : plan.splits()) {
iteratorList.add(
Iterators.transform(
- dataSplit.files().iterator(),
+ ((DataSplit) dataSplit).files().iterator(),
file ->
toRow(
- dataSplit,
+ (DataSplit) dataSplit,
partitionConverter,
keyConverters,
file,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index c957ddf28..04ae89aa9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -44,11 +44,9 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
-import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.system.AuditLogTable;
import org.apache.paimon.types.DataType;
@@ -423,10 +421,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
// partition 2
Collections.singletonList("-D
2|10|301|binary|varbinary")));
- StreamDataTableScan scan =
- table.newStreamScan()
- .withStartingScanner(new FullStartingScanner())
- .withFollowUpScanner(new
InputChangelogFollowUpScanner());
+ StreamTableScan scan = table.newStreamScan();
scan.restore(1L);
Function<Integer, Void> assertNextSnapshot =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index ace15b7c3..92853ac9d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -65,23 +65,23 @@ public class StartupModeTest extends ScannerTestBase {
initializeTestData(); // initialize 3 commits
// streaming Mode
- StreamDataTableScan dataTableScan = table.newStreamScan();
- DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
- DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
- assertThat(firstPlan.splits).isEmpty();
- assertThat(secondPlan.splits).isEmpty();
+ assertThat(firstPlan.splits()).isEmpty();
+ assertThat(secondPlan.splits()).isEmpty();
// write next data
writeAndCommit(4, rowData(1, 10, 103L));
- DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
- assertThat(thirdPlan.splits)
+ TableScan.Plan thirdPlan = dataTableScan.plan();
+ assertThat(thirdPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
// batch mode
- BatchDataTableScan batchScan = table.newScan();
- DataTableScan.DataFilePlan plan = batchScan.plan();
- assertThat(plan.splits)
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
}
@@ -91,24 +91,24 @@ public class StartupModeTest extends ScannerTestBase {
initializeTestData(); // initialize 3 commits
// streaming Mode
- StreamDataTableScan dataTableScan = table.newStreamScan();
- DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
- DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
- assertThat(firstPlan.splits)
+ assertThat(firstPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
- assertThat(secondPlan.splits).isEmpty();
+ assertThat(secondPlan.splits()).isEmpty();
// write next data
writeAndCommit(4, rowData(1, 10, 103L));
- DataTableScan.DataFilePlan thirdPlan = dataTableScan.plan();
- assertThat(thirdPlan.splits)
+ TableScan.Plan thirdPlan = dataTableScan.plan();
+ assertThat(thirdPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
// batch mode
- BatchDataTableScan batchScan = table.newScan();
- DataTableScan.DataFilePlan plan = batchScan.plan();
- assertThat(plan.splits)
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
}
@@ -129,18 +129,18 @@ public class StartupModeTest extends ScannerTestBase {
FileStoreTable readTable = table.copy(properties);
// streaming Mode
- StreamDataTableScan dataTableScan = readTable.newStreamScan();
- DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
- DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+ StreamTableScan dataTableScan = readTable.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
- assertThat(firstPlan.splits).isEmpty();
- assertThat(secondPlan.splits)
+ assertThat(firstPlan.splits()).isEmpty();
+ assertThat(secondPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.DELTA).splits());
// batch mode
- BatchDataTableScan batchScan = readTable.newScan();
- DataTableScan.DataFilePlan plan = batchScan.plan();
- assertThat(plan.splits)
+ TableScan batchScan = readTable.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.ALL).splits());
}
@@ -154,19 +154,19 @@ public class StartupModeTest extends ScannerTestBase {
writeAndCommit(5, rowData(1, 10, 103L));
// streaming Mode
- StreamDataTableScan dataTableScan = table.newStreamScan();
- DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
- DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
- assertThat(firstPlan.splits)
+ assertThat(firstPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
- assertThat(secondPlan.splits)
+ assertThat(secondPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(5).withKind(ScanKind.DELTA).splits());
// batch mode
- BatchDataTableScan batchScan = table.newScan();
- DataTableScan.DataFilePlan plan = batchScan.plan();
- assertThat(plan.splits)
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(4).withKind(ScanKind.ALL).splits());
}
@@ -178,18 +178,18 @@ public class StartupModeTest extends ScannerTestBase {
initializeTestData(); // initialize 3 commits
// streaming Mode
- StreamDataTableScan dataTableScan = table.newStreamScan();
- DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
- DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
- assertThat(firstPlan.splits).isEmpty();
- assertThat(secondPlan.splits)
+ assertThat(firstPlan.splits()).isEmpty();
+ assertThat(secondPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.DELTA).splits());
// batch mode
- BatchDataTableScan batchScan = table.newScan();
- DataTableScan.DataFilePlan plan = batchScan.plan();
- assertThat(plan.splits)
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
}
@@ -200,19 +200,19 @@ public class StartupModeTest extends ScannerTestBase {
initializeTable(StartupMode.FROM_SNAPSHOT_FULL, properties);
initializeTestData(); // initialize 3 commits
- StreamDataTableScan dataTableScan = table.newStreamScan();
- DataTableScan.DataFilePlan firstPlan = dataTableScan.plan();
- DataTableScan.DataFilePlan secondPlan = dataTableScan.plan();
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
- assertThat(firstPlan.splits)
+ assertThat(firstPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
- assertThat(secondPlan.splits)
+ assertThat(secondPlan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(3).withKind(ScanKind.DELTA).splits());
// batch mode
- BatchDataTableScan batchScan = table.newScan();
- DataTableScan.DataFilePlan plan = batchScan.plan();
- assertThat(plan.splits)
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits())
.isEqualTo(snapshotSplitReader.withSnapshot(2).withKind(ScanKind.ALL).splits());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
similarity index 94%
rename from
paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
index 062a50856..a249b2b2e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
@@ -38,15 +38,15 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link StreamDataTableScan}. */
-public class StreamDataTableScanTest extends ScannerTestBase {
+/** Tests for {@link StreamTableScan}. */
+public class StreamTableScanTest extends ScannerTestBase {
@Test
public void testPlan() throws Exception {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- StreamDataTableScan scan = table.newStreamScan();
+ StreamTableScan scan = table.newStreamScan();
// first call without any snapshot, should return empty plan
assertThat(scan.plan().splits()).isEmpty();
@@ -63,7 +63,7 @@ public class StreamDataTableScanTest extends ScannerTestBase {
commit.commit(1, write.prepareCommit(true, 1));
// first call with snapshot, should return complete records from 2nd
commit
- DataTableScan.DataFilePlan plan = scan.plan();
+ TableScan.Plan plan = scan.plan();
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
@@ -108,7 +108,7 @@ public class StreamDataTableScanTest extends
ScannerTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- StreamDataTableScan scan = table.newStreamScan();
+ StreamTableScan scan = table.newStreamScan();
// first call without any snapshot, should return empty plan
assertThat(scan.plan().splits()).isEmpty();
@@ -133,7 +133,7 @@ public class StreamDataTableScanTest extends
ScannerTestBase {
commit.commit(2, write.prepareCommit(true, 2));
// first call with snapshot, should return full compacted records from
3rd commit
- DataTableScan.DataFilePlan plan = scan.plan();
+ TableScan.Plan plan = scan.plan();
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
@@ -178,14 +178,14 @@ public class StreamDataTableScanTest extends
ScannerTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
TableCommitImpl commit = table.newCommit(commitUser);
- StreamDataTableScan scan = table.newStreamScan();
+ StreamTableScan scan = table.newStreamScan();
write.write(rowData(1, 10, 100L));
ManifestCommittable committable = new ManifestCommittable(0, 5L);
write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
commit.commit(committable);
- DataTableScan.DataFilePlan plan = scan.plan();
+ TableScan.Plan plan = scan.plan();
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
@@ -203,14 +203,14 @@ public class StreamDataTableScanTest extends
ScannerTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
TableCommitImpl commit = table.newCommit(commitUser);
- StreamDataTableScan scan = table.newStreamScan();
+ StreamTableScan scan = table.newStreamScan();
write.write(rowData(1, 10, 100L));
ManifestCommittable committable = new ManifestCommittable(0, 5L);
write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
commit.commit(committable);
- DataTableScan.DataFilePlan plan = scan.plan();
+ TableScan.Plan plan = scan.plan();
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
assertThat(scan.plan().splits()).isEmpty();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
similarity index 92%
rename from
paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
index 61e9236ca..0109125f5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
@@ -31,15 +31,15 @@ import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link BatchDataTableScan}. */
-public class BatchDataTableScanTest extends ScannerTestBase {
+/** Tests for {@link TableScan}. */
+public class TableScanTest extends ScannerTestBase {
@Test
public void testPlan() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- BatchDataTableScan scan = table.newScan();
+ TableScan scan = table.newScan();
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 20, 200L));
@@ -53,7 +53,7 @@ public class BatchDataTableScanTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
- DataTableScan.DataFilePlan plan = scan.plan();
+ TableScan.Plan plan = scan.plan();
assertThat(getResult(table.newRead(), plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index bc3ed34d3..406be8279 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -79,7 +79,7 @@ public class CompactionChangelogFollowUpScannerTest extends
ScannerTestBase {
snapshot = snapshotManager.snapshot(3);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.scan(3, snapshotSplitReader);
+ TableScan.Plan plan = scanner.scan(3, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|200",
"+I 1|30|300"));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 3ee8d0d67..72c0e62f8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -23,8 +23,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -84,7 +84,7 @@ public class ContinuousCompactorFollowUpScannerTest extends
ScannerTestBase {
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
+ TableScan.Plan plan = scanner.scan(1, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
index 116b17702..a70d8f01e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
@@ -21,8 +21,8 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -61,7 +61,7 @@ public class DeltaFollowUpScannerTest extends ScannerTestBase
{
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
+ TableScan.Plan plan = scanner.scan(1, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200",
"+I 1|40|400"));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
index 98a0bd20b..a22d3a16f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.SnapshotManager;
@@ -64,7 +64,7 @@ public class InputChangelogFollowUpScannerTest extends
ScannerTestBase {
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
+ TableScan.Plan plan = scanner.scan(1, snapshotSplitReader);
assertThat(getResult(read, plan.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200",
"+I 1|40|400"));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 2beddf707..306869692 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -27,7 +27,6 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.TableStreamingReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.TypeUtils;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
similarity index 79%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index d1f2e1dff..75e5fadb1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -16,15 +16,21 @@
* limitations under the License.
*/
-package org.apache.paimon.table.source;
+package org.apache.paimon.flink.lookup;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.TypeUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
@@ -34,6 +40,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.IntUnaryOperator;
@@ -46,10 +53,20 @@ public class TableStreamingReader {
private final ReadBuilder readBuilder;
@Nullable private final PredicateFilter recordFilter;
- private final StreamDataTableScan scan;
+ private final StreamTableScan scan;
public TableStreamingReader(Table table, int[] projection, @Nullable
Predicate predicate) {
+ if (CoreOptions.fromMap(table.options()).startupMode()
+ != CoreOptions.StartupMode.COMPACTED_FULL) {
+ table =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.SCAN_MODE.key(),
+
CoreOptions.StartupMode.LATEST_FULL.toString()));
+ }
+
this.readBuilder =
table.newReadBuilder().withProjection(projection).withFilter(predicate);
+ scan = readBuilder.newStreamScan();
if (predicate != null) {
List<String> fieldNames = table.rowType().getFieldNames();
@@ -75,9 +92,6 @@ public class TableStreamingReader {
} else {
recordFilter = null;
}
-
- scan = (StreamDataTableScan) readBuilder.newStreamScan();
- scan.withSnapshotStarting();
}
public Iterator<InternalRow> nextBatch() throws Exception {
@@ -89,11 +103,11 @@ public class TableStreamingReader {
}
}
- private Iterator<InternalRow> read(DataFilePlan plan) throws IOException {
+ private Iterator<InternalRow> read(TableScan.Plan plan) throws IOException
{
TableRead read = readBuilder.newRead();
List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
- for (DataSplit split : plan.splits) {
+ for (Split split : plan.splits()) {
readers.add(() -> read.createReader(split));
}
Iterator<InternalRow> iterator =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index f49b03309..349c7d5cd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -18,9 +18,9 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
-import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
@@ -39,6 +39,7 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -92,14 +93,15 @@ public class CompactorSourceBuilder {
.toArray(Predicate[]::new));
}
- ReadBuilder readBuilder =
bucketsTable.newReadBuilder().withFilter(partitionPredicate);
if (isContinuous) {
+ bucketsTable = bucketsTable.copy(streamingCompactOptions());
return new ContinuousFileStoreSource(
- readBuilder,
+
bucketsTable.newReadBuilder().withFilter(partitionPredicate),
bucketsTable.options(),
- null,
- TableScanUtils.compactStreamScanFactory());
+ null);
} else {
+ bucketsTable = bucketsTable.copy(batchCompactOptions());
+ ReadBuilder readBuilder =
bucketsTable.newReadBuilder().withFilter(partitionPredicate);
List<Split> splits = readBuilder.newScan().plan().splits();
return new StaticFileStoreSource(
readBuilder,
@@ -108,8 +110,7 @@ public class CompactorSourceBuilder {
.coreOptions()
.toConfiguration()
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
- splits,
- TableScanUtils.compactBatchScanFactory());
+ splits);
}
}
@@ -126,4 +127,25 @@ public class CompactorSourceBuilder {
tableIdentifier + "-compact-source",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
}
+
+ private Map<String, String> streamingCompactOptions() {
+ // set 'streaming-compact' and remove 'scan.bounded.watermark'
+ return new HashMap<String, String>() {
+ {
+ put(CoreOptions.STREAMING_COMPACT.key(), "true");
+ put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), null);
+ }
+ };
+ }
+
+ private Map<String, String> batchCompactOptions() {
+ // batch compactor source will compact all current files
+ return new HashMap<String, String>() {
+ {
+ put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
+ put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
+ put(CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.LATEST_FULL.toString());
+ }
+ };
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index e9c14ca15..54f69ca60 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.Boundedness;
@@ -44,21 +44,11 @@ public class ContinuousFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 3L;
private final Map<String, String> options;
- private final TableScanUtils.StreamTableScanFactory scanFactory;
public ContinuousFileStoreSource(
ReadBuilder readBuilder, Map<String, String> options, @Nullable
Long limit) {
- this(readBuilder, options, limit,
TableScanUtils.defaultStreamScanFactory());
- }
-
- public ContinuousFileStoreSource(
- ReadBuilder readBuilder,
- Map<String, String> options,
- @Nullable Long limit,
- TableScanUtils.StreamTableScanFactory scanFactory) {
super(readBuilder, limit);
this.options = options;
- this.scanFactory = scanFactory;
}
@Override
@@ -77,6 +67,8 @@ public class ContinuousFileStoreSource extends FlinkSource {
splits = checkpoint.splits();
}
CoreOptions coreOptions = CoreOptions.fromMap(options);
+ StreamTableScan scan = readBuilder.newStreamScan();
+ scan.restore(nextSnapshotId);
return new ContinuousFileSplitEnumerator(
context,
splits,
@@ -85,7 +77,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
coreOptions
.toConfiguration()
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
- scanFactory.create(readBuilder, nextSnapshotId));
+ scan);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index c0fa8c9e0..5018130d4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -37,7 +36,6 @@ public class StaticFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 3L;
private final int splitBatchSize;
- private final TableScanUtils.TableScanFactory scanFactory;
private final List<Split> splitList;
public StaticFileStoreSource(
@@ -45,19 +43,9 @@ public class StaticFileStoreSource extends FlinkSource {
@Nullable Long limit,
int splitBatchSize,
List<Split> splitList) {
- this(readBuilder, limit, splitBatchSize, splitList,
ReadBuilder::newScan);
- }
-
- public StaticFileStoreSource(
- ReadBuilder readBuilder,
- @Nullable Long limit,
- int splitBatchSize,
- List<Split> splitList,
- TableScanUtils.TableScanFactory scanFactory) {
super(readBuilder, limit);
this.splitBatchSize = splitBatchSize;
this.splitList = splitList;
- this.scanFactory = scanFactory;
}
@Override
@@ -79,7 +67,7 @@ public class StaticFileStoreSource extends FlinkSource {
if (null != splitList) {
return splitGenerator.createSplits(splitList);
} else {
- return
splitGenerator.createSplits(scanFactory.create(readBuilder).plan());
+ return splitGenerator.createSplits(readBuilder.newScan().plan());
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index 9ab22f00c..7d4911d3e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -20,22 +20,11 @@ package org.apache.paimon.flink.utils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
-import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
import java.util.HashMap;
-/** Utility methods for {@link TableScan}, such as validating and creating. */
+/** Utility methods for {@link TableScan}, such as validating. */
public class TableScanUtils {
public static void streamingReadingValidate(Table table) {
@@ -60,47 +49,4 @@ public class TableScanUtils {
}
}
}
-
- // ------------------------------------------------------------------------
- // TableScan factories
- // ------------------------------------------------------------------------
-
- /** Factory to create batch {@link TableScan}. */
- public interface TableScanFactory extends Serializable {
- TableScan create(ReadBuilder readBuilder);
- }
-
- /** Factory to create {@link StreamTableScan}. */
- public interface StreamTableScanFactory extends Serializable {
-
- StreamTableScan create(ReadBuilder readBuilder, @Nullable Long
nextSnapshotId);
- }
-
- public static StreamTableScanFactory defaultStreamScanFactory() {
- return (builder, nextSnapshotId) -> {
- StreamTableScan scan = builder.newStreamScan();
- scan.restore(nextSnapshotId);
- return scan;
- };
- }
-
- public static TableScanFactory compactBatchScanFactory() {
- return readBuilder -> {
- BatchDataTableScan scan = (BatchDataTableScan)
readBuilder.newScan();
- // static compactor source will compact all current files
- scan.withStartingScanner(new FullStartingScanner());
- return scan;
- };
- }
-
- public static StreamTableScanFactory compactStreamScanFactory() {
- return (readBuilder, nextSnapshotId) -> {
- StreamDataTableScan scan = (StreamDataTableScan)
readBuilder.newStreamScan();
- scan.withStartingScanner(new ContinuousCompactorStartingScanner())
- .withFollowUpScanner(new
ContinuousCompactorFollowUpScanner())
- .withBoundedChecker(BoundedChecker.neverEnd());
- scan.restore(nextSnapshotId);
- return scan;
- };
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index e6f2e85a9..db4e107fd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -24,8 +24,8 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -146,8 +146,8 @@ public class CompactActionITCase extends ActionITCaseBase {
Assertions.assertEquals(Snapshot.CommitKind.APPEND,
snapshot.commitKind());
// no full compaction has happened, so plan should be empty
- StreamDataTableScan scan = table.newStreamScan();
- DataTableScan.DataFilePlan plan = scan.plan();
+ StreamTableScan scan = table.newStreamScan();
+ TableScan.Plan plan = scan.plan();
Assertions.assertTrue(plan.splits().isEmpty());
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -209,15 +209,14 @@ public class CompactActionITCase extends ActionITCaseBase
{
}
private void validateResult(
- FileStoreTable table, StreamDataTableScan scan, List<String>
expected, long timeout)
+ FileStoreTable table, StreamTableScan scan, List<String> expected,
long timeout)
throws Exception {
List<String> actual = new ArrayList<>();
long start = System.currentTimeMillis();
while (actual.size() != expected.size()) {
- DataTableScan.DataFilePlan plan = scan.plan();
- if (plan != null) {
- actual.addAll(getResult(table.newRead(), plan.splits(),
ROW_TYPE));
- }
+ TableScan.Plan plan = scan.plan();
+ actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
+
if (System.currentTimeMillis() - start > timeout) {
break;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index 6587b5663..65bcf6ba7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -62,7 +62,7 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
assertThat(snapshot.id()).isEqualTo(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- DataTableScan.DataFilePlan plan = table.newScan().plan();
+ TableScan.Plan plan = table.newScan().plan();
assertThat(plan.splits().size()).isEqualTo(2);
List<String> actual = getResult(table.newRead(), plan.splits(),
ROW_TYPE);
@@ -109,7 +109,7 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
assertThat(snapshot.id()).isEqualTo(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- DataTableScan.DataFilePlan plan = table.newScan().plan();
+ TableScan.Plan plan = table.newScan().plan();
assertThat(plan.splits().size()).isEqualTo(2);
List<String> actual = getResult(table.newRead(), plan.splits(),
ROW_TYPE);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
index 8950cc377..054ab27fe 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
@@ -65,7 +65,7 @@ public abstract class CommitterOperatorTestBase {
List<String> actual = new ArrayList<>();
table.newScan()
.plan()
- .splits
+ .splits()
.forEach(
s -> {
try {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index fda4cdd1e..3739f7bc0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -33,7 +33,8 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -116,15 +117,16 @@ public class CompactorSinkITCase extends AbstractTestBase
{
assertEquals(3, snapshot.id());
assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
- DataTableScan.DataFilePlan plan = table.newScan().plan();
+ TableScan.Plan plan = table.newScan().plan();
assertEquals(3, plan.splits().size());
- for (DataSplit split : plan.splits) {
- if (split.partition().getInt(1) == 15) {
+ for (Split split : plan.splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ if (dataSplit.partition().getInt(1) == 15) {
// compacted
- assertEquals(1, split.files().size());
+ assertEquals(1, dataSplit.files().size());
} else {
// not compacted
- assertEquals(2, split.files().size());
+ assertEquals(2, dataSplit.files().size());
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index 2abeb87e4..bc600a86f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -35,7 +35,7 @@ import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -191,7 +191,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
TableSchema schema = schemaManager.latest().get();
Map<Integer, Map<String, String>> actual = new HashMap<>();
- DataTableScan.DataFilePlan plan = table.newScan().plan();
+ TableScan.Plan plan = table.newScan().plan();
try (RecordReaderIterator<InternalRow> it =
new
RecordReaderIterator<>(table.newRead().createReader(plan))) {
while (it.hasNext()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index d070a7ab4..05237690e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
@@ -43,6 +44,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -77,9 +80,14 @@ public class CompactorSourceITCase extends AbstractTestBase {
commitUser = UUID.randomUUID().toString();
}
- @Test
- public void testBatchRead() throws Exception {
+ @ParameterizedTest(name = "defaultOptions = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testBatchRead(boolean defaultOptions) throws Exception {
FileStoreTable table = createFileStoreTable();
+ if (!defaultOptions) {
+ // change options to test whether CompactorSourceBuilder work
normally
+ table =
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
+ }
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -115,9 +123,20 @@ public class CompactorSourceITCase extends
AbstractTestBase {
it.close();
}
- @Test
- public void testStreamingRead() throws Exception {
+ @ParameterizedTest(name = "defaultOptions = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testStreamingRead(boolean defaultOptions) throws Exception {
FileStoreTable table = createFileStoreTable();
+ if (!defaultOptions) {
+ // change options to test whether CompactorSourceBuilder work
normally
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+ dynamicOptions.put(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ CoreOptions.ChangelogProducer.NONE.toString());
+ dynamicOptions.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
+ table = table.copy(dynamicOptions);
+ }
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 1c9ea6b71..2ecd7f54e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -19,16 +19,11 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.Filter;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
@@ -193,8 +188,8 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- Queue<DataFilePlan> results = new LinkedBlockingQueue<>();
- StreamDataTableScan scan = new MockScan(results);
+ Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
.setSplitEnumeratorContext(context)
@@ -268,7 +263,7 @@ public class ContinuousFileSplitEnumeratorTest {
private long discoveryInterval = Long.MAX_VALUE;
private int splitBatchSize = 10;
- private StreamDataTableScan scan;
+ private StreamTableScan scan;
public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -291,7 +286,7 @@ public class ContinuousFileSplitEnumeratorTest {
return this;
}
- public Builder setScan(StreamDataTableScan scan) {
+ public Builder setScan(StreamTableScan scan) {
this.scan = scan;
return this;
}
@@ -302,67 +297,22 @@ public class ContinuousFileSplitEnumeratorTest {
}
}
- private static class MockScan implements StreamDataTableScan {
- private final Queue<DataFilePlan> results;
+ private static class MockScan implements StreamTableScan {
+ private final Queue<Plan> results;
- public MockScan(Queue<DataFilePlan> results) {
+ public MockScan(Queue<Plan> results) {
this.results = results;
}
@Override
- public StreamDataTableScan withKind(ScanKind kind) {
- return null;
- }
-
- @Override
- public StreamDataTableScan withSnapshot(long snapshotId) {
- return null;
- }
-
- @Override
- public StreamDataTableScan withLevelFilter(Filter<Integer>
levelFilter) {
- return null;
- }
-
- @Override
- public StreamDataTableScan withFilter(Predicate predicate) {
- return null;
- }
-
- @Override
- public DataFilePlan plan() {
- DataFilePlan plan = results.poll();
+ public Plan plan() {
+ Plan plan = results.poll();
if (plan == null) {
throw new EndOfScanException();
}
return plan;
}
- @Override
- public boolean supportStreamingReadOverwrite() {
- return false;
- }
-
- @Override
- public StreamDataTableScan withStartingScanner(StartingScanner
startingScanner) {
- return null;
- }
-
- @Override
- public StreamDataTableScan withFollowUpScanner(FollowUpScanner
followUpScanner) {
- return null;
- }
-
- @Override
- public StreamDataTableScan withBoundedChecker(BoundedChecker
boundedChecker) {
- return null;
- }
-
- @Override
- public StreamDataTableScan withSnapshotStarting() {
- return null;
- }
-
@Override
public Long checkpoint() {
return null;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 45c53d18c..9837f1469 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -23,9 +23,9 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.stats.StatsTestUtils;
+import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
+import org.apache.paimon.table.source.snapshot.SnapshotSplitReaderImpl;
import org.junit.jupiter.api.Test;
@@ -74,13 +74,13 @@ public class FileStoreSourceSplitGeneratorTest {
}
};
List<DataSplit> scanSplits =
- SnapshotSplitReader.generateSplits(
+ SnapshotSplitReaderImpl.generateSplits(
1L,
false,
false,
Collections::singletonList,
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
- DataTableScan.DataFilePlan tableScanPlan = new
DataTableScan.DataFilePlan(scanSplits);
+ DataFilePlan tableScanPlan = new DataFilePlan(scanSplits);
List<FileStoreSourceSplit> splits =
new
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 91e778b47..411ea0d03 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -28,7 +28,8 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -53,10 +54,10 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
FileStoreTable table = createFileStoreTable(jobConf);
- DataTableScan scan = table.newScan();
+ InnerTableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
- return scan.plan().splits.stream()
- .map(split -> new
PaimonInputSplit(table.location().toString(), split))
+ return scan.plan().splits().stream()
+ .map(split -> new
PaimonInputSplit(table.location().toString(), (DataSplit) split))
.toArray(PaimonInputSplit[]::new);
}