This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0b63757b3 [core] Introduce IncrementalSplit to simplify DataSplit
(#7093)
b0b63757b3 is described below
commit b0b63757b3ed67ef48b0496035c146b5a22d4744
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 21 20:25:11 2026 +0800
[core] Introduce IncrementalSplit to simplify DataSplit (#7093)
---
.../org/apache/paimon/manifest/PartitionEntry.java | 25 --
.../paimon/operation/MergeFileSplitRead.java | 4 -
.../apache/paimon/operation/RawFileSplitRead.java | 35 ++-
.../org/apache/paimon/table/source/DataSplit.java | 77 +------
.../paimon/table/source/IncrementalSplit.java | 254 +++++++++++++++++++++
.../source/snapshot/AbstractStartingScanner.java | 39 +++-
.../snapshot/IncrementalDiffStartingScanner.java | 7 +
.../table/source/snapshot/SnapshotReaderImpl.java | 91 +++++---
.../table/source/snapshot/StartingScanner.java | 6 +-
.../AppendTableRawFileSplitReadProvider.java | 3 +-
.../IncrementalChangelogReadProvider.java | 27 +--
.../splitread/IncrementalDiffReadProvider.java | 8 +-
.../source/splitread/IncrementalDiffSplitRead.java | 10 +-
.../splitread/MergeFileSplitReadProvider.java | 9 +-
.../paimon/table/system/FileMonitorTable.java | 35 ++-
.../cluster/BucketedAppendClusterManagerTest.java | 3 +-
.../apache/paimon/table/SimpleTableTestBase.java | 5 +-
.../snapshot/CompactedStartingScannerTest.java | 2 +-
.../source/snapshot/FullStartingScannerTest.java | 2 +-
.../snapshot/StaticFromTagStartingScannerTest.java | 2 +-
.../lookup/IncrementalCompactDiffSplitRead.java | 40 ++--
.../paimon/flink/lookup/LookupCompactDiffRead.java | 5 +-
.../paimon/flink/lookup/LookupStreamingReader.java | 9 +-
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 36 ++-
.../source/ContinuousFileSplitEnumerator.java | 30 ++-
.../flink/source/align/PlaceholderSplit.java | 6 -
.../paimon/hive/utils/HiveSplitGenerator.java | 3 +-
.../paimon/spark/scan/BinPackingSplits.scala | 2 +-
28 files changed, 526 insertions(+), 249 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index aaee93602a..d162234cd5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -23,16 +23,13 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
-import static org.apache.paimon.manifest.FileKind.ADD;
import static org.apache.paimon.manifest.FileKind.DELETE;
/** Entry representing a partition. */
@@ -150,28 +147,6 @@ public class PartitionEntry {
return partitions.values();
}
- public static Collection<PartitionEntry> mergeSplits(Collection<DataSplit>
splits) {
- Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
- for (DataSplit split : splits) {
- BinaryRow partition = split.partition();
- for (DataFileMeta file : split.dataFiles()) {
- PartitionEntry partitionEntry =
- fromDataFile(
- partition,
- ADD,
- file,
-
Optional.ofNullable(split.totalBuckets()).orElse(0));
- partitions.compute(
- partition,
- (part, old) -> old == null ? partitionEntry :
old.merge(partitionEntry));
- }
-
- // Ignore before files, because we don't know how to merge them
- // Ignore deletion files, because it is costly to read from it
- }
- return partitions.values();
- }
-
public static void merge(Collection<PartitionEntry> from, Map<BinaryRow,
PartitionEntry> to) {
for (PartitionEntry entry : from) {
to.compute(entry.partition(), (part, old) -> old == null ? entry :
old.merge(entry));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index c41c9c3777..689117f1d2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -229,10 +229,6 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
}
public RecordReader<KeyValue> createReader(DataSplit split) throws
IOException {
- if (!split.beforeFiles().isEmpty()) {
- throw new IllegalArgumentException("This read cannot accept split
with before files.");
- }
-
if (split.isStreaming() || split.bucket() ==
BucketMode.POSTPONE_BUCKET) {
return createNoMergeReader(
split.partition(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index deda6e682d..4b4beeaf31 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -45,6 +45,8 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -144,19 +146,38 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
@Override
public RecordReader<InternalRow> createReader(Split s) throws IOException {
- DataSplit split = (DataSplit) s;
- if (!split.beforeFiles().isEmpty()) {
- LOG.info("Ignore split before files: {}", split.beforeFiles());
+ if (s instanceof DataSplit) {
+ DataSplit split = (DataSplit) s;
+ return createReader(
+ split.partition(),
+ split.bucket(),
+ split.dataFiles(),
+ split.deletionFiles().orElse(null));
+ } else {
+ IncrementalSplit split = (IncrementalSplit) s;
+ if (!split.beforeFiles().isEmpty()) {
+ LOG.info("Ignore split before files: {}", split.beforeFiles());
+ }
+ return createReader(
+ split.partition(),
+ split.bucket(),
+ split.afterFiles(),
+ split.afterDeletionFiles());
}
+ }
- List<DataFileMeta> files = split.dataFiles();
- DeletionVector.Factory dvFactory =
- DeletionVector.factory(fileIO, files,
split.deletionFiles().orElse(null));
+ public RecordReader<InternalRow> createReader(
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> files,
+ List<DeletionFile> deletionFiles)
+ throws IOException {
+ DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO,
files, deletionFiles);
Map<String, IOExceptionSupplier<DeletionVector>> dvFactories = new
HashMap<>();
for (DataFileMeta file : files) {
dvFactories.put(file.fileName(), () ->
dvFactory.create(file.fileName()).orElse(null));
}
- return createReader(split.partition(), split.bucket(),
split.dataFiles(), dvFactories);
+ return createReader(partition, bucket, files, dvFactories);
}
public RecordReader<InternalRow> createReader(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 956b67c157..cbd2e3086f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -70,9 +70,6 @@ public class DataSplit implements Split {
private String bucketPath;
@Nullable private Integer totalBuckets;
- private List<DataFileMeta> beforeFiles = new ArrayList<>();
- @Nullable private List<DeletionFile> beforeDeletionFiles;
-
private List<DataFileMeta> dataFiles;
@Nullable private List<DeletionFile> dataDeletionFiles;
@@ -101,14 +98,6 @@ public class DataSplit implements Split {
return totalBuckets;
}
- public List<DataFileMeta> beforeFiles() {
- return beforeFiles;
- }
-
- public Optional<List<DeletionFile>> beforeDeletionFiles() {
- return Optional.ofNullable(beforeDeletionFiles);
- }
-
public List<DataFileMeta> dataFiles() {
return dataFiles;
}
@@ -126,10 +115,6 @@ public class DataSplit implements Split {
return rawConvertible;
}
- public OptionalLong latestFileCreationEpochMillis() {
- return
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
- }
-
public OptionalLong earliestFileCreationEpochMillis() {
return
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min();
}
@@ -260,32 +245,6 @@ public class DataSplit implements Split {
return sum;
}
- /**
- * Obtain merged row count as much as possible. There are two scenarios
where accurate row count
- * can be calculated:
- *
- * <p>1. raw file and no deletion file.
- *
- * <p>2. raw file + deletion file with cardinality.
- */
- public long partialMergedRowCount() {
- long sum = 0L;
- if (rawConvertible) {
- List<RawFile> rawFiles = convertToRawFiles().orElse(null);
- if (rawFiles != null) {
- for (int i = 0; i < rawFiles.size(); i++) {
- RawFile rawFile = rawFiles.get(i);
- if (dataDeletionFiles == null || dataDeletionFiles.get(i)
== null) {
- sum += rawFile.rowCount();
- } else if (dataDeletionFiles.get(i).cardinality() != null)
{
- sum += rawFile.rowCount() -
dataDeletionFiles.get(i).cardinality();
- }
- }
- }
- }
- return sum;
- }
-
@Override
public Optional<List<RawFile>> convertToRawFiles() {
if (rawConvertible) {
@@ -352,8 +311,6 @@ public class DataSplit implements Split {
&& Objects.equals(partition, dataSplit.partition)
&& Objects.equals(bucketPath, dataSplit.bucketPath)
&& Objects.equals(totalBuckets, dataSplit.totalBuckets)
- && Objects.equals(beforeFiles, dataSplit.beforeFiles)
- && Objects.equals(beforeDeletionFiles,
dataSplit.beforeDeletionFiles)
&& Objects.equals(dataFiles, dataSplit.dataFiles)
&& Objects.equals(dataDeletionFiles,
dataSplit.dataDeletionFiles);
}
@@ -366,8 +323,6 @@ public class DataSplit implements Split {
bucket,
bucketPath,
totalBuckets,
- beforeFiles,
- beforeDeletionFiles,
dataFiles,
dataDeletionFiles,
isStreaming,
@@ -404,8 +359,6 @@ public class DataSplit implements Split {
this.bucket = other.bucket;
this.bucketPath = other.bucketPath;
this.totalBuckets = other.totalBuckets;
- this.beforeFiles = other.beforeFiles;
- this.beforeDeletionFiles = other.beforeDeletionFiles;
this.dataFiles = other.dataFiles;
this.dataDeletionFiles = other.dataDeletionFiles;
this.isStreaming = other.isStreaming;
@@ -427,12 +380,10 @@ public class DataSplit implements Split {
}
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
- out.writeInt(beforeFiles.size());
- for (DataFileMeta file : beforeFiles) {
- dataFileSer.serialize(file, out);
- }
- DeletionFile.serializeList(out, beforeDeletionFiles);
+ // compatible with old beforeFiles
+ out.writeInt(0);
+ DeletionFile.serializeList(out, null);
out.writeInt(dataFiles.size());
for (DataFileMeta file : dataFiles) {
@@ -461,13 +412,15 @@ public class DataSplit implements Split {
FunctionWithIOException<DataInputView, DeletionFile> deletionFileSerde
=
getDeletionFileSerde(version);
int beforeNumber = in.readInt();
- List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
- for (int i = 0; i < beforeNumber; i++) {
- beforeFiles.add(dataFileSer.apply(in));
+ if (beforeNumber > 0) {
+ throw new RuntimeException("Cannot deserialize data split with
before files.");
}
List<DeletionFile> beforeDeletionFiles =
DeletionFile.deserializeList(in, deletionFileSerde);
+ if (beforeDeletionFiles != null) {
+ throw new RuntimeException("Cannot deserialize data split with
before deletion files.");
+ }
int fileNumber = in.readInt();
List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
@@ -487,14 +440,10 @@ public class DataSplit implements Split {
.withBucket(bucket)
.withBucketPath(bucketPath)
.withTotalBuckets(totalBuckets)
- .withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
.rawConvertible(rawConvertible);
- if (beforeDeletionFiles != null) {
- builder.withBeforeDeletionFiles(beforeDeletionFiles);
- }
if (dataDeletionFiles != null) {
builder.withDataDeletionFiles(dataDeletionFiles);
}
@@ -572,16 +521,6 @@ public class DataSplit implements Split {
return this;
}
- public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
- this.split.beforeFiles = new ArrayList<>(beforeFiles);
- return this;
- }
-
- public Builder withBeforeDeletionFiles(List<DeletionFile>
beforeDeletionFiles) {
- this.split.beforeDeletionFiles = new
ArrayList<>(beforeDeletionFiles);
- return this;
- }
-
public Builder withDataFiles(List<DataFileMeta> dataFiles) {
this.split.dataFiles = new ArrayList<>(dataFiles);
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/IncrementalSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/IncrementalSplit.java
new file mode 100644
index 0000000000..5b672a4fef
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/IncrementalSplit.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.utils.FunctionWithIOException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Incremental split for batch and streaming. */
+public class IncrementalSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int VERSION = 1;
+
+ private long snapshotId;
+ private BinaryRow partition;
+ private int bucket;
+ private int totalBuckets;
+
+ private List<DataFileMeta> beforeFiles;
+ private @Nullable List<DeletionFile> beforeDeletionFiles;
+
+ private List<DataFileMeta> afterFiles;
+ private @Nullable List<DeletionFile> afterDeletionFiles;
+
+ private boolean isStreaming;
+
+ public IncrementalSplit(
+ long snapshotId,
+ BinaryRow partition,
+ int bucket,
+ int totalBuckets,
+ List<DataFileMeta> beforeFiles,
+ @Nullable List<DeletionFile> beforeDeletionFiles,
+ List<DataFileMeta> afterFiles,
+ @Nullable List<DeletionFile> afterDeletionFiles,
+ boolean isStreaming) {
+ this.snapshotId = snapshotId;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
+ this.beforeFiles = beforeFiles;
+ this.beforeDeletionFiles = beforeDeletionFiles;
+ this.afterFiles = afterFiles;
+ this.afterDeletionFiles = afterDeletionFiles;
+ this.isStreaming = isStreaming;
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
+ public List<DataFileMeta> beforeFiles() {
+ return beforeFiles;
+ }
+
+ @Nullable
+ public List<DeletionFile> beforeDeletionFiles() {
+ return beforeDeletionFiles;
+ }
+
+ public List<DataFileMeta> afterFiles() {
+ return afterFiles;
+ }
+
+ @Nullable
+ public List<DeletionFile> afterDeletionFiles() {
+ return afterDeletionFiles;
+ }
+
+ public boolean isStreaming() {
+ return isStreaming;
+ }
+
+ @Override
+ public long rowCount() {
+ long rowCount = 0;
+ for (DataFileMeta file : beforeFiles) {
+ rowCount += file.rowCount();
+ }
+ for (DataFileMeta file : afterFiles) {
+ rowCount += file.rowCount();
+ }
+ return rowCount;
+ }
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IncrementalSplit that = (IncrementalSplit) o;
+ return snapshotId == that.snapshotId
+ && bucket == that.bucket
+ && totalBuckets == that.totalBuckets
+ && isStreaming == that.isStreaming
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(beforeFiles, that.beforeFiles)
+ && Objects.equals(beforeDeletionFiles,
that.beforeDeletionFiles)
+ && Objects.equals(afterFiles, that.afterFiles)
+ && Objects.equals(afterDeletionFiles, that.afterDeletionFiles);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ snapshotId,
+ partition,
+ bucket,
+ totalBuckets,
+ beforeFiles,
+ beforeDeletionFiles,
+ afterFiles,
+ afterDeletionFiles,
+ isStreaming);
+ }
+
+ @Override
+ public String toString() {
+ return "IncrementalSplit{"
+ + "snapshotId="
+ + snapshotId
+ + ", partition="
+ + partition
+ + ", bucket="
+ + bucket
+ + ", totalBuckets="
+ + totalBuckets
+ + ", beforeFiles="
+ + beforeFiles
+ + ", beforeDeletionFiles="
+ + beforeDeletionFiles
+ + ", afterFiles="
+ + afterFiles
+ + ", afterDeletionFiles="
+ + afterDeletionFiles
+ + ", isStreaming="
+ + isStreaming
+ + '}';
+ }
+
+ private void writeObject(ObjectOutputStream objectOutputStream) throws
IOException {
+ DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(objectOutputStream);
+ out.writeInt(VERSION);
+ out.writeLong(snapshotId);
+ serializeBinaryRow(partition, out);
+ out.writeInt(bucket);
+ out.writeInt(totalBuckets);
+
+ DataFileMetaSerializer dataFileSerializer = new
DataFileMetaSerializer();
+ out.writeInt(beforeFiles.size());
+ for (DataFileMeta file : beforeFiles) {
+ dataFileSerializer.serialize(file, out);
+ }
+
+ DeletionFile.serializeList(out, beforeDeletionFiles);
+
+ out.writeInt(afterFiles.size());
+ for (DataFileMeta file : afterFiles) {
+ dataFileSerializer.serialize(file, out);
+ }
+
+ DeletionFile.serializeList(out, afterDeletionFiles);
+
+ out.writeBoolean(isStreaming);
+ }
+
+ private void readObject(ObjectInputStream objectInputStream)
+ throws IOException, ClassNotFoundException {
+ DataInputViewStreamWrapper in = new
DataInputViewStreamWrapper(objectInputStream);
+ int version = in.readInt();
+ if (version != VERSION) {
+ throw new UnsupportedOperationException("Unsupported version: " +
version);
+ }
+
+ snapshotId = in.readLong();
+ partition = deserializeBinaryRow(in);
+ bucket = in.readInt();
+ totalBuckets = in.readInt();
+
+ DataFileMetaSerializer dataFileMetaSerializer = new
DataFileMetaSerializer();
+ FunctionWithIOException<DataInputView, DeletionFile>
deletionFileSerializer =
+ DeletionFile::deserialize;
+
+ int beforeNumber = in.readInt();
+ beforeFiles = new ArrayList<>(beforeNumber);
+ for (int i = 0; i < beforeNumber; i++) {
+ beforeFiles.add(dataFileMetaSerializer.deserialize(in));
+ }
+
+ beforeDeletionFiles = DeletionFile.deserializeList(in,
deletionFileSerializer);
+
+ int fileNumber = in.readInt();
+ afterFiles = new ArrayList<>(fileNumber);
+ for (int i = 0; i < fileNumber; i++) {
+ afterFiles.add(dataFileMetaSerializer.deserialize(in));
+ }
+
+ afterDeletionFiles = DeletionFile.deserializeList(in,
deletionFileSerializer);
+
+ isStreaming = in.readBoolean();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
index c1b2f985a4..84212a3cc4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
@@ -18,13 +18,23 @@
package org.apache.paimon.table.source.snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.SnapshotManager;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.manifest.FileKind.ADD;
/** The abstract class for StartingScanner. */
public abstract class AbstractStartingScanner implements StartingScanner {
@@ -54,8 +64,35 @@ public abstract class AbstractStartingScanner implements
StartingScanner {
public List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader) {
Result result = scan(snapshotReader);
if (result instanceof ScannedResult) {
- return new ArrayList<>(PartitionEntry.mergeSplits(((ScannedResult)
result).splits()));
+ return mergeDataSplitsToPartitionEntries(((ScannedResult)
result).splits());
}
return Collections.emptyList();
}
+
+ private static List<PartitionEntry> mergeDataSplitsToPartitionEntries(
+ Collection<Split> splits) {
+ Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
+ for (Split s : splits) {
+ if (!(s instanceof DataSplit)) {
+ throw new UnsupportedOperationException();
+ }
+ DataSplit split = (DataSplit) s;
+ BinaryRow partition = split.partition();
+ for (DataFileMeta file : split.dataFiles()) {
+ PartitionEntry partitionEntry =
+ PartitionEntry.fromDataFile(
+ partition,
+ ADD,
+ file,
+
Optional.ofNullable(split.totalBuckets()).orElse(0));
+ partitions.compute(
+ partition,
+ (part, old) -> old == null ? partitionEntry :
old.merge(partitionEntry));
+ }
+
+ // Ignore before files, because we don't know how to merge them
+ // Ignore deletion files, because it is costly to read from it
+ }
+ return new ArrayList<>(partitions.values());
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
index 37c6498e95..3a42f16e2c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.tag.TagPeriodHandler;
@@ -67,6 +68,12 @@ public class IncrementalDiffStartingScanner extends
AbstractStartingScanner {
return
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
}
+ @Override
+ public List<PartitionEntry> scanPartitions(SnapshotReader reader) {
+ // ignore start, just use end to read partition entries
+ return reader.withSnapshot(end).partitionEntries();
+ }
+
public static StartingScanner betweenTags(
Tag startTag,
Tag endTag,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 3021eeda3e..c839a5dbd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -46,8 +46,10 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
@@ -459,41 +461,42 @@ public class SnapshotReaderImpl implements SnapshotReader
{
Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
groupByPartFiles(plan.files(FileKind.DELETE));
- Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> afterFiles =
groupByPartFiles(plan.files(FileKind.ADD));
LazyField<Snapshot> beforeSnapshot =
new LazyField<>(() ->
snapshotManager.snapshot(plan.snapshot().id() - 1));
- return toChangesPlan(true, plan, beforeSnapshot, beforeFiles,
dataFiles);
+ return toIncrementalPlan(
+ true, beforeSnapshot, beforeFiles, plan.snapshot(),
plan.watermark(), afterFiles);
}
- private Plan toChangesPlan(
+ private Plan toIncrementalPlan(
boolean isStreaming,
- FileStoreScan.Plan plan,
LazyField<Snapshot> beforeSnapshot,
Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles,
- Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles) {
- Snapshot snapshot = plan.snapshot();
- List<DataSplit> splits = new ArrayList<>();
+ @Nullable Snapshot afterSnapshot,
+ @Nullable Long afterWatermark,
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> afterFiles) {
+ List<Split> splits = new ArrayList<>();
Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
beforeFiles.forEach(
(part, bucketMap) ->
buckets.computeIfAbsent(part, k -> new HashSet<>())
.addAll(bucketMap.keySet()));
- dataFiles.forEach(
+ afterFiles.forEach(
(part, bucketMap) ->
buckets.computeIfAbsent(part, k -> new HashSet<>())
.addAll(bucketMap.keySet()));
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
beforeDeletionFilesMap = null;
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFilesMap = null;
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
afterDeletionFilesMap = null;
if (!isStreaming && deletionVectors) {
beforeDeletionFilesMap =
beforeSnapshot.get() != null
? scanDvIndex(beforeSnapshot.get(),
toPartBuckets(beforeFiles))
: Collections.emptyMap();
- deletionFilesMap =
- snapshot != null
- ? scanDvIndex(snapshot, toPartBuckets(dataFiles))
+ afterDeletionFilesMap =
+ afterSnapshot != null
+ ? scanDvIndex(afterSnapshot,
toPartBuckets(afterFiles))
: Collections.emptyMap();
}
@@ -505,13 +508,10 @@ public class SnapshotReaderImpl implements SnapshotReader
{
.getOrDefault(part, Collections.emptyMap())
.getOrDefault(bucket, Collections.emptyList());
List<ManifestEntry> dataEntries =
- dataFiles
+ afterFiles
.getOrDefault(part, Collections.emptyMap())
.getOrDefault(bucket, Collections.emptyList());
- // deduplicate
- beforeEntries.removeIf(dataEntries::remove);
-
Integer totalBuckets = null;
if (!dataEntries.isEmpty()) {
totalBuckets = dataEntries.get(0).totalBuckets();
@@ -519,54 +519,69 @@ public class SnapshotReaderImpl implements SnapshotReader
{
totalBuckets = beforeEntries.get(0).totalBuckets();
}
+ // deduplicate
+ beforeEntries.removeIf(dataEntries::remove);
+
List<DataFileMeta> before =
beforeEntries.stream()
.map(ManifestEntry::file)
.collect(Collectors.toList());
- List<DataFileMeta> data =
+ List<DataFileMeta> after =
dataEntries.stream().map(ManifestEntry::file).collect(Collectors.toList());
- DataSplit.Builder builder =
- DataSplit.builder()
- .withSnapshot(snapshot.id())
- .withPartition(part)
- .withBucket(bucket)
- .withTotalBuckets(totalBuckets)
- .withBeforeFiles(before)
- .withDataFiles(data)
- .isStreaming(isStreaming)
- .withBucketPath(pathFactory.bucketPath(part,
bucket).toString());
+ List<DeletionFile> beforeDeletionFiles = null;
if (deletionVectors && beforeDeletionFilesMap != null) {
- builder.withBeforeDeletionFiles(
+ beforeDeletionFiles =
getDeletionFiles(
before,
beforeDeletionFilesMap.getOrDefault(
- Pair.of(part, bucket),
Collections.emptyMap())));
+ Pair.of(part, bucket),
Collections.emptyMap()));
}
- if (deletionVectors && deletionFilesMap != null) {
- builder.withDataDeletionFiles(
+
+ List<DeletionFile> afterDeletionFiles = null;
+ if (deletionVectors && afterDeletionFilesMap != null) {
+ afterDeletionFiles =
getDeletionFiles(
- data,
- deletionFilesMap.getOrDefault(
- Pair.of(part, bucket),
Collections.emptyMap())));
+ after,
+ afterDeletionFilesMap.getOrDefault(
+ Pair.of(part, bucket),
Collections.emptyMap()));
}
- splits.add(builder.build());
+
+ IncrementalSplit split =
+ new IncrementalSplit(
+ afterSnapshot.id(),
+ part,
+ bucket,
+ totalBuckets,
+ before,
+ beforeDeletionFiles,
+ after,
+ afterDeletionFiles,
+ isStreaming);
+
+ splits.add(split);
}
}
return new PlanImpl(
- plan.watermark(), snapshot == null ? null : snapshot.id(),
(List) splits);
+ afterWatermark, afterSnapshot == null ? null :
afterSnapshot.id(), splits);
}
@Override
public Plan readIncrementalDiff(Snapshot before) {
withMode(ScanMode.ALL);
FileStoreScan.Plan plan = scan.plan();
- Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> afterFiles =
groupByPartFiles(plan.files(FileKind.ADD));
Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
- return toChangesPlan(false, plan, new LazyField<>(() -> before),
beforeFiles, dataFiles);
+ return toIncrementalPlan(
+ false,
+ new LazyField<>(() -> before),
+ beforeFiles,
+ plan.snapshot(),
+ plan.watermark(),
+ afterFiles);
}
private RecordComparator partitionComparator() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 6d9daa70a9..07b317515a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import javax.annotation.Nullable;
@@ -63,8 +63,8 @@ public interface StartingScanner {
return plan.watermark();
}
- public List<DataSplit> splits() {
- return (List) plan.splits();
+ public List<Split> splits() {
+ return plan.splits();
}
public SnapshotReader.Plan plan() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
index 5a2d4635bb..5bb72578ab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.splitread;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.Split;
import java.util.function.Supplier;
@@ -34,6 +35,6 @@ public class AppendTableRawFileSplitReadProvider extends
RawFileSplitReadProvide
@Override
public boolean match(Split split, Context context) {
- return split instanceof DataSplit;
+ return split instanceof DataSplit || split instanceof IncrementalSplit;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index ee3d84b954..d7d9866a9c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -25,7 +25,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.ReverseReader;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOFunction;
@@ -55,25 +55,23 @@ public class IncrementalChangelogReadProvider implements
SplitReadProvider {
final MergeFileSplitRead read =
supplier.get().withReadKeyType(RowType.of());
IOFunction<Split, RecordReader<InternalRow>> convertedFactory =
split -> {
- DataSplit dataSplit = (DataSplit) split;
+ IncrementalSplit incrementalSplit = (IncrementalSplit)
split;
RecordReader<KeyValue> reader =
ConcatRecordReader.create(
() ->
new ReverseReader(
read.createMergeReader(
-
dataSplit.partition(),
- dataSplit.bucket(),
-
dataSplit.beforeFiles(),
- dataSplit
-
.beforeDeletionFiles()
-
.orElse(null),
+
incrementalSplit.partition(),
+
incrementalSplit.bucket(),
+
incrementalSplit.beforeFiles(),
+
incrementalSplit.beforeDeletionFiles(),
false)),
() ->
read.createMergeReader(
- dataSplit.partition(),
- dataSplit.bucket(),
- dataSplit.dataFiles(),
-
dataSplit.deletionFiles().orElse(null),
+
incrementalSplit.partition(),
+ incrementalSplit.bucket(),
+
incrementalSplit.afterFiles(),
+
incrementalSplit.afterDeletionFiles(),
false));
return unwrap(reader);
};
@@ -83,11 +81,10 @@ public class IncrementalChangelogReadProvider implements
SplitReadProvider {
@Override
public boolean match(Split split, Context context) {
- if (!(split instanceof DataSplit)) {
+ if (!(split instanceof IncrementalSplit)) {
return false;
}
- DataSplit dataSplit = (DataSplit) split;
- return !dataSplit.beforeFiles().isEmpty() && dataSplit.isStreaming();
+ return ((IncrementalSplit) split).isStreaming();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
index 6a381c333c..858e07e5d0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table.source.splitread;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.SplitRead;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.LazyField;
@@ -49,11 +49,11 @@ public class IncrementalDiffReadProvider implements
SplitReadProvider {
@Override
public boolean match(Split split, Context context) {
- if (!(split instanceof DataSplit)) {
+ if (!(split instanceof IncrementalSplit)) {
return false;
}
- DataSplit dataSplit = (DataSplit) split;
- return !dataSplit.beforeFiles().isEmpty() && !dataSplit.isStreaming();
+ IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+ return !incrementalSplit.isStreaming();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index c22416a26a..d00a8a4fd9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -29,7 +29,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowKind;
@@ -86,20 +86,20 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
@Override
public RecordReader<InternalRow> createReader(Split s) throws IOException {
- DataSplit split = (DataSplit) s;
+ IncrementalSplit split = (IncrementalSplit) s;
RecordReader<KeyValue> reader =
readDiff(
mergeRead.createMergeReader(
split.partition(),
split.bucket(),
split.beforeFiles(),
- split.beforeDeletionFiles().orElse(null),
+ split.beforeDeletionFiles(),
forceKeepDelete),
mergeRead.createMergeReader(
split.partition(),
split.bucket(),
- split.dataFiles(),
- split.deletionFiles().orElse(null),
+ split.afterFiles(),
+ split.afterDeletionFiles(),
forceKeepDelete),
mergeRead.keyComparator(),
mergeRead.createUdsComparator(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index d69209ef4e..fb3b850812 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -54,14 +54,7 @@ public class MergeFileSplitReadProvider implements
SplitReadProvider {
@Override
public boolean match(Split split, Context context) {
- if (split instanceof DataSplit) {
- DataSplit dataSplit = (DataSplit) split;
- return dataSplit.beforeFiles().isEmpty();
- } else if (split instanceof ChainSplit) {
- return true;
- } else {
- return false;
- }
+ return split instanceof DataSplit || split instanceof ChainSplit;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 060570406b..97903d881e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -41,6 +41,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamDataTableScan;
@@ -64,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static java.util.Collections.emptyList;
import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
@@ -181,7 +183,7 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
@Override
public List<String> primaryKeys() {
- return Collections.emptyList();
+ return emptyList();
}
@Override
@@ -238,20 +240,29 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- if (!(split instanceof DataSplit)) {
+ FileChange change;
+ if (split instanceof DataSplit) {
+ DataSplit dataSplit = (DataSplit) split;
+ change =
+ new FileChange(
+ dataSplit.snapshotId(),
+ dataSplit.partition(),
+ dataSplit.bucket(),
+ emptyList(),
+ dataSplit.dataFiles());
+ } else if (split instanceof IncrementalSplit) {
+ IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+ change =
+ new FileChange(
+ incrementalSplit.snapshotId(),
+ incrementalSplit.partition(),
+ incrementalSplit.bucket(),
+ incrementalSplit.beforeFiles(),
+ incrementalSplit.afterFiles());
+ } else {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- DataSplit dataSplit = (DataSplit) split;
-
- FileChange change =
- new FileChange(
- dataSplit.snapshotId(),
- dataSplit.partition(),
- dataSplit.bucket(),
- dataSplit.beforeFiles(),
- dataSplit.dataFiles());
-
return new
IteratorRecordReader<>(Collections.singletonList(toRow(change)).iterator());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
index 0d2d60e699..d7a3d5d59e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
@@ -94,7 +94,8 @@ public class BucketedAppendClusterManagerTest {
((AppendOnlyFileStoreTable) table)
.store()
.newRead()
- .createReader(BinaryRow.EMPTY_ROW, 0,
result.after(), null))) {
+ .createReader(
+ BinaryRow.EMPTY_ROW, 0,
result.after(), (List) null))) {
while (clusterRows.hasNext()) {
InternalRow row = clusterRows.next();
rows.add(String.format("%d,%d", row.getInt(1), row.getInt(2)));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 69cf5441f5..a310404f53 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -57,6 +57,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.ScanMode;
@@ -1641,8 +1642,8 @@ public abstract class SimpleTableTestBase {
.dataSplits());
for (Split split : splits) {
- DataSplit dataSplit = (DataSplit) split;
-
Assertions.assertThat(dataSplit.deletionFiles().isPresent()).isFalse();
+ IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+
Assertions.assertThat(incrementalSplit.afterDeletionFiles()).isNull();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 60c9573004..f6049674bb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -59,7 +59,7 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
StartingScanner.ScannedResult result =
(StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(3);
- assertThat(getResult(table.newRead(), toSplits(result.splits())))
+ assertThat(getResult(table.newRead(), result.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
write.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 20a65a04b7..d6b30adb11 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -54,7 +54,7 @@ public class FullStartingScannerTest extends ScannerTestBase {
StartingScanner.ScannedResult result =
(StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(2);
- assertThat(getResult(table.newRead(), toSplits(result.splits())))
+ assertThat(getResult(table.newRead(), result.splits()))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
write.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
index 357bb33863..c595359ab9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
@@ -58,7 +58,7 @@ public class StaticFromTagStartingScannerTest extends
ScannerTestBase {
StartingScanner.ScannedResult result =
(StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(2);
- assertThat(getResult(table.newRead(), toSplits(result.splits())))
+ assertThat(getResult(table.newRead(), result.splits()))
.hasSameElementsAs(
Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I
2|30|101", "+I 2|40|201"));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
index 6583e615d9..8c9b5a6cdc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
@@ -24,7 +24,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.reader.EmptyRecordReader;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.splitread.IncrementalDiffSplitRead;
@@ -41,39 +41,31 @@ public class IncrementalCompactDiffSplitRead extends
IncrementalDiffSplitRead {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- DataSplit dataSplit = (DataSplit) split;
- if (dataSplit.beforeFiles().stream().noneMatch(file -> file.level() ==
0)) {
+ IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+ if (incrementalSplit.beforeFiles().stream().noneMatch(file ->
file.level() == 0)) {
return new EmptyRecordReader<>();
}
- return super.createReader(filterLevel0Files(dataSplit));
+ return super.createReader(filterLevel0Files(incrementalSplit));
}
- private DataSplit filterLevel0Files(DataSplit split) {
+ private IncrementalSplit filterLevel0Files(IncrementalSplit split) {
List<DataFileMeta> beforeFiles =
split.beforeFiles().stream()
.filter(file -> file.level() > 0)
.collect(Collectors.toList());
List<DataFileMeta> afterFiles =
- split.dataFiles().stream()
+ split.afterFiles().stream()
.filter(file -> file.level() > 0)
.collect(Collectors.toList());
- DataSplit.Builder builder =
- new DataSplit.Builder()
- .withSnapshot(split.snapshotId())
- .withPartition(split.partition())
- .withBucket(split.bucket())
- .withBucketPath(split.bucketPath())
- .withBeforeFiles(beforeFiles)
- .withDataFiles(afterFiles)
- .isStreaming(split.isStreaming())
- .rawConvertible(split.rawConvertible());
-
- if (split.beforeDeletionFiles().isPresent()) {
- builder.withBeforeDeletionFiles(split.beforeDeletionFiles().get());
- }
- if (split.deletionFiles().isPresent()) {
- builder.withDataDeletionFiles(split.deletionFiles().get());
- }
- return builder.build();
+ return new IncrementalSplit(
+ split.snapshotId(),
+ split.partition(),
+ split.bucket(),
+ split.totalBuckets(),
+ beforeFiles,
+ null,
+ afterFiles,
+ null,
+ split.isStreaming());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index c4be923472..dfdd9dd207 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -57,9 +57,8 @@ public class LookupCompactDiffRead extends
AbstractDataTableRead {
@Override
public RecordReader<InternalRow> reader(Split split) throws IOException {
- DataSplit dataSplit = (DataSplit) split;
- if (dataSplit.beforeFiles().isEmpty()) {
- return fullPhaseMergeRead.createReader(dataSplit); // full reading
phase
+ if (split instanceof DataSplit) {
+ return fullPhaseMergeRead.createReader(split); // full reading
phase
} else {
return incrementalDiffRead.createReader(split);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ac0f00d895..df326054b1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
@@ -155,11 +156,15 @@ public class LookupStreamingReader {
return;
}
- DataSplit dataSplit = (DataSplit) splits.get(0);
+ Split split = splits.get(0);
+ long snapshotId =
+ split instanceof DataSplit
+ ? ((DataSplit) split).snapshotId()
+ : ((IncrementalSplit) split).snapshotId();
LOG.info(
"LookupStreamingReader get splits from {} with snapshotId {}.",
table.name(),
- dataSplit.snapshotId());
+ snapshotId);
}
@Nullable
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 03016bbccf..b00ac7e083 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
@@ -53,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static java.util.Collections.emptyList;
import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -165,7 +167,7 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
Integer numBuckets = queryExecutor.numBuckets(partition);
if (numBuckets == null) {
// no data, just return none
- return Collections.emptyList();
+ return emptyList();
}
int bucket = bucket(numBuckets, adjustedKey);
@@ -176,7 +178,7 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
InternalRow kv = queryExecutor.lookup(partition, bucket, trimmedKey);
if (kv == null) {
- return Collections.emptyList();
+ return emptyList();
} else {
return Collections.singletonList(kv);
}
@@ -319,19 +321,26 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
}
for (Split split : splits) {
- refreshSplit((DataSplit) split);
+ refreshSplit(split);
}
}
}
@VisibleForTesting
- void refreshSplit(DataSplit split) {
+ void refreshSplit(Split split) {
+ if (split instanceof DataSplit) {
+ refreshSplit((DataSplit) split);
+ } else {
+ refreshSplit((IncrementalSplit) split);
+ }
+ }
+
+ private void refreshSplit(DataSplit split) {
BinaryRow partition = split.partition();
int bucket = split.bucket();
- List<DataFileMeta> before = split.beforeFiles();
List<DataFileMeta> after = split.dataFiles();
- tableQuery.refreshFiles(partition, bucket, before, after);
+ tableQuery.refreshFiles(partition, bucket, emptyList(), after);
Integer totalBuckets = split.totalBuckets();
if (totalBuckets == null) {
// Just for compatibility with older versions
@@ -343,6 +352,13 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
numBuckets.put(partition, totalBuckets);
}
+ private void refreshSplit(IncrementalSplit split) {
+ BinaryRow partition = split.partition();
+ tableQuery.refreshFiles(
+ partition, split.bucket(), split.beforeFiles(),
split.afterFiles());
+ numBuckets.put(partition, split.totalBuckets());
+ }
+
@Override
public Long nextSnapshotId() {
return this.scan.checkpoint();
@@ -359,11 +375,15 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
return;
}
- DataSplit dataSplit = (DataSplit) splits.get(0);
+ Split split = splits.get(0);
+ long snapshotId =
+ split instanceof DataSplit
+ ? ((DataSplit) split).snapshotId()
+ : ((IncrementalSplit) split).snapshotId();
LOG.info(
"LocalQueryExecutor get splits from {} with snapshotId
{}.",
tableName,
- dataSplit.snapshotId());
+ snapshotId);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 668bd7d847..87b3f77d5a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.IncrementalSplit;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
@@ -305,20 +306,39 @@ public class ContinuousFileSplitEnumerator
}
protected int assignSuggestedTask(FileStoreSourceSplit split) {
- DataSplit dataSplit = ((DataSplit) split.split());
+ if (split.split() instanceof DataSplit) {
+ return assignSuggestedTask((DataSplit) split.split());
+ } else {
+ return assignSuggestedTask((IncrementalSplit) split.split());
+ }
+ }
+
+ protected int assignSuggestedTask(DataSplit split) {
int parallelism = context.currentParallelism();
int bucketId;
- if (dataSplit.bucket() == BucketMode.POSTPONE_BUCKET) {
+ if (split.bucket() == BucketMode.POSTPONE_BUCKET) {
bucketId =
-
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
+
PostponeBucketFileStoreWrite.getWriteId(split.dataFiles().get(0).fileName())
% parallelism;
} else {
- bucketId = dataSplit.bucket();
+ bucketId = split.bucket();
+ }
+
+ if (shuffleBucketWithPartition) {
+ return ChannelComputer.select(split.partition(), bucketId,
parallelism);
+ } else {
+ return ChannelComputer.select(bucketId, parallelism);
}
+ }
+
+ protected int assignSuggestedTask(IncrementalSplit split) {
+ int parallelism = context.currentParallelism();
+ // TODO how to deal with postpone bucket?
+ int bucketId = split.bucket();
if (shuffleBucketWithPartition) {
- return ChannelComputer.select(dataSplit.partition(), bucketId,
parallelism);
+ return ChannelComputer.select(split.partition(), bucketId,
parallelism);
} else {
return ChannelComputer.select(bucketId, parallelism);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
index 070b6f6b8f..42044860c6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
@@ -44,7 +44,6 @@ public class PlaceholderSplit extends DataSplit {
dataSplit =
DataSplit.builder()
.withSnapshot(snapshotId)
- .withBeforeFiles(Collections.emptyList())
.withBucket(0)
.withDataFiles(Collections.emptyList())
.withPartition(BinaryRow.EMPTY_ROW)
@@ -73,11 +72,6 @@ public class PlaceholderSplit extends DataSplit {
return dataSplit.bucket();
}
- @Override
- public List<DataFileMeta> beforeFiles() {
- return dataSplit.beforeFiles();
- }
-
@Override
public List<DataFileMeta> dataFiles() {
return dataSplit.dataFiles();
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index fb811841ac..57186709e5 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -55,7 +55,6 @@ import static java.util.Collections.singletonMap;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
import static org.apache.paimon.hive.utils.HiveUtils.extractTagName;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
/** Generator to generate hive input splits. */
public class HiveSplitGenerator {
@@ -174,7 +173,7 @@ public class HiveSplitGenerator {
for (DataSplit split : splits) {
if (split instanceof FallbackReadFileStoreTable.FallbackSplit) {
dataSplits.add(split);
- } else if (split.beforeFiles().isEmpty() &&
split.rawConvertible()) {
+ } else if (split.rawConvertible()) {
numFiles += split.dataFiles().size();
toPack.add(split);
} else {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index 679234bc74..8ed80644a8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -78,7 +78,7 @@ case class BinPackingSplits(coreOptions: CoreOptions,
readRowSizeRatio: Double =
def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
val (toReshuffle, reserved) = splits.partition {
case _: FallbackSplit => false
- case split: DataSplit => split.beforeFiles().isEmpty &&
split.rawConvertible()
+ case split: DataSplit => split.rawConvertible()
// Currently, format table reader only supports reading one file.
case _: FormatDataSplit => false
case _ => false