This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aee8fbebe [core] Refactor DataSplit to contain before files
aee8fbebe is described below
commit aee8fbebe77dfd19db8fc747708089a5bb31a62d
Author: Jingsong <[email protected]>
AuthorDate: Tue Jul 11 10:16:02 2023 +0800
[core] Refactor DataSplit to contain before files
This closes #1527
---
.../AppendOnlyTableCompactionCoordinator.java | 2 +-
.../paimon/operation/AppendOnlyFileStoreRead.java | 2 +-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 11 +-
.../paimon/operation/KeyValueFileStoreRead.java | 47 +++---
.../paimon/table/AppendOnlyFileStoreTable.java | 3 -
.../table/ChangelogValueCountFileStoreTable.java | 3 -
.../org/apache/paimon/table/source/DataSplit.java | 184 +++++++++++++--------
.../snapshot/IncrementalStartingScanner.java | 10 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 110 ++++++------
.../apache/paimon/table/system/BucketsTable.java | 2 +-
.../org/apache/paimon/table/system/FilesTable.java | 4 +-
.../test/java/org/apache/paimon/TestFileStore.java | 14 +-
.../operation/KeyValueFileStoreReadTest.java | 17 +-
.../table/ChangelogWithKeyFileMetaFilterTest.java | 6 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 8 +-
...hangelogWithKeyTableColumnTypeFileMetaTest.java | 2 +-
.../paimon/table/ColumnTypeFileMetaTestBase.java | 12 +-
.../paimon/table/FileMetaFilterTestBase.java | 18 +-
.../paimon/table/FileStoreTableTestBase.java | 2 +-
.../paimon/table/SchemaEvolutionTableTestBase.java | 2 +-
.../org/apache/paimon/table/source/SplitTest.java | 12 +-
.../paimon/flink/action/CompactActionITCase.java | 4 +-
.../paimon/flink/sink/CompactorSinkITCase.java | 4 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 16 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 3 +-
.../source/FileStoreSourceSplitSerializerTest.java | 11 +-
.../paimon/hive/mapred/PaimonInputSplit.java | 2 +-
.../paimon/hive/mapred/PaimonInputSplitTest.java | 18 +-
28 files changed, 305 insertions(+), 224 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
index e89cff30e..7277d5e26 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
@@ -116,7 +116,7 @@ public class AppendOnlyTableCompactionCoordinator {
splits.forEach(
split -> {
DataSplit dataSplit = (DataSplit) split;
- notifyNewFiles(dataSplit.partition(),
dataSplit.files());
+ notifyNewFiles(dataSplit.partition(),
dataSplit.dataFiles());
});
// batch mode, we don't do continuous scanning
if (!streamingMode) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index 6bcfc27fd..2a90a0bac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -98,7 +98,7 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(split.partition(),
split.bucket());
List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new
ArrayList<>();
- for (DataFileMeta file : split.files()) {
+ for (DataFileMeta file : split.dataFiles()) {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
BulkFormatMapping bulkFormatMapping =
bulkFormatMappings.computeIfAbsent(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 8019df5c5..abe350c8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -158,12 +158,11 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
rewriter.write(
new RecordReaderIterator<>(
read.createReader(
- new DataSplit(
- 0L /* unused */,
- partition,
- bucket,
- toCompact,
- false))));
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withDataFiles(toCompact)
+ .build())));
rewriter.close();
return rewriter.result();
};
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 9fce1cb85..b3aa93d53 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -32,6 +32,7 @@ import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
@@ -51,6 +52,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -185,26 +187,17 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private RecordReader<KeyValue>
createReaderWithoutOuterProjection(DataSplit split)
throws IOException {
- if (split.isIncremental()) {
+ if (split.isStreaming()) {
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(
split.partition(), split.bucket(), true,
filtersForOverlappedSection);
- // Return the raw file contents without merging
- List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new
ArrayList<>();
- for (DataFileMeta file : split.files()) {
- suppliers.add(
- () -> {
- // We need to check extraFiles to be compatible
with Paimon 0.2.
- // See comments on DataFileMeta#extraFiles.
- String fileName =
changelogFile(file).orElse(file.fileName());
- return readerFactory.createRecordReader(
- file.schemaId(), fileName, file.level());
- });
- }
- RecordReader<KeyValue> concatRecordReader =
ConcatRecordReader.create(suppliers);
- return split.reverseRowKind()
- ? new ReverseReader(concatRecordReader)
- : concatRecordReader;
+ ReaderSupplier<KeyValue> beforeSupplier =
+ () -> new
ReverseReader(streamingConcat(split.beforeFiles(), readerFactory));
+ ReaderSupplier<KeyValue> dataSupplier =
+ () -> streamingConcat(split.dataFiles(), readerFactory);
+ return split.beforeFiles().isEmpty()
+ ? dataSupplier.get()
+ : ConcatRecordReader.create(Arrays.asList(beforeSupplier,
dataSupplier));
} else {
// Sections are read by SortMergeReader, which sorts and merges
records by keys.
// So we cannot project keys or else the sorting will be incorrect.
@@ -218,11 +211,11 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
false,
filtersForNonOverlappedSection);
- List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders =
new ArrayList<>();
+ List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
new
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
for (List<SortedRun> section :
- new IntervalPartition(split.files(),
keyComparator).partition()) {
+ new IntervalPartition(split.dataFiles(),
keyComparator).partition()) {
sectionReaders.add(
() ->
MergeTreeReaders.readerForSection(
@@ -245,6 +238,22 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
}
}
+ private RecordReader<KeyValue> streamingConcat(
+ List<DataFileMeta> files, KeyValueFileReaderFactory readerFactory)
throws IOException {
+ List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ suppliers.add(
+ () -> {
+ // We need to check extraFiles to be compatible with
Paimon 0.2.
+ // See comments on DataFileMeta#extraFiles.
+ String fileName =
changelogFile(file).orElse(file.fileName());
+ return readerFactory.createRecordReader(
+ file.schemaId(), fileName, file.level());
+ });
+ }
+ return ConcatRecordReader.create(suppliers);
+ }
+
private Optional<String> changelogFile(DataFileMeta fileMeta) {
for (String file : fileMeta.extraFiles()) {
if (file.startsWith(CHANGELOG_FILE_PREFIX)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 68f274465..beb703b84 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -30,7 +30,6 @@ import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
-import org.apache.paimon.operation.ReverseReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
@@ -94,8 +93,6 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
/**
* Currently, the streaming read of overwrite is implemented by reversing
the {@link RowKind} of
* overwrote records to {@link RowKind#DELETE}, so only tables that have
primary key support it.
- *
- * @see ReverseReader
*/
@Override
public boolean supportStreamingReadOverwrite() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 5fa99529d..42b5fa18a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -32,7 +32,6 @@ import
org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.Lock;
-import org.apache.paimon.operation.ReverseReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -107,8 +106,6 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
/**
* Currently, the streaming read of overwrite is implemented by reversing
the {@link RowKind} of
* overwrote records to {@link RowKind#DELETE}, so only tables that have
primary key support it.
- *
- * @see ReverseReader
*/
@Override
public boolean supportStreamingReadOverwrite() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 8cd768b30..8fb76347c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -34,54 +34,22 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
+
+ private long snapshotId = 0;
+ private boolean isStreaming = false;
+ private List<DataFileMeta> beforeFiles = new ArrayList<>();
- private long snapshotId;
private BinaryRow partition;
- private int bucket;
- private List<DataFileMeta> files;
- private boolean isIncremental;
-
- // when reverseRowKind is true, the RowKind of records from this split
should be reversed to
- // DELETE
- private boolean reverseRowKind;
-
- public DataSplit(
- long snapshotId,
- BinaryRow partition,
- int bucket,
- List<DataFileMeta> files,
- boolean isIncremental) {
- init(snapshotId, partition, bucket, files, isIncremental, false);
- }
-
- public DataSplit(
- long snapshotId,
- BinaryRow partition,
- int bucket,
- List<DataFileMeta> files,
- boolean isIncremental,
- boolean reverseRowKind) {
- init(snapshotId, partition, bucket, files, isIncremental,
reverseRowKind);
- }
-
- private void init(
- long snapshotId,
- BinaryRow partition,
- int bucket,
- List<DataFileMeta> files,
- boolean isIncremental,
- boolean reverseRowKind) {
- this.snapshotId = snapshotId;
- this.partition = partition;
- this.bucket = bucket;
- this.files = files;
- this.isIncremental = isIncremental;
- this.reverseRowKind = reverseRowKind;
- }
+ private int bucket = -1;
+ private List<DataFileMeta> dataFiles;
+
+ public DataSplit() {}
public long snapshotId() {
return snapshotId;
@@ -95,22 +63,22 @@ public class DataSplit implements Split {
return bucket;
}
- public List<DataFileMeta> files() {
- return files;
+ public List<DataFileMeta> beforeFiles() {
+ return beforeFiles;
}
- public boolean isIncremental() {
- return isIncremental;
+ public List<DataFileMeta> dataFiles() {
+ return dataFiles;
}
- public boolean reverseRowKind() {
- return reverseRowKind;
+ public boolean isStreaming() {
+ return isStreaming;
}
@Override
public long rowCount() {
long rowCount = 0;
- for (DataFileMeta file : files) {
+ for (DataFileMeta file : dataFiles) {
rowCount += file.rowCount();
}
return rowCount;
@@ -127,14 +95,14 @@ public class DataSplit implements Split {
DataSplit split = (DataSplit) o;
return bucket == split.bucket
&& Objects.equals(partition, split.partition)
- && Objects.equals(files, split.files)
- && isIncremental == split.isIncremental
- && reverseRowKind == split.reverseRowKind;
+ && Objects.equals(beforeFiles, split.beforeFiles)
+ && Objects.equals(dataFiles, split.dataFiles)
+ && isStreaming == split.isStreaming;
}
@Override
public int hashCode() {
- return Objects.hash(partition, bucket, files, isIncremental,
reverseRowKind);
+ return Objects.hash(partition, bucket, beforeFiles, dataFiles,
isStreaming);
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -142,40 +110,114 @@ public class DataSplit implements Split {
}
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- DataSplit split = DataSplit.deserialize(new
DataInputViewStreamWrapper(in));
- init(
- split.snapshotId,
- split.partition,
- split.bucket,
- split.files,
- split.isIncremental,
- split.reverseRowKind);
+ assign(deserialize(new DataInputViewStreamWrapper(in)));
+ }
+
+ private void assign(DataSplit other) {
+ this.snapshotId = other.snapshotId;
+ this.partition = other.partition;
+ this.bucket = other.bucket;
+ this.beforeFiles = other.beforeFiles;
+ this.dataFiles = other.dataFiles;
+ this.isStreaming = other.isStreaming;
}
public void serialize(DataOutputView out) throws IOException {
out.writeLong(snapshotId);
SerializationUtils.serializeBinaryRow(partition, out);
out.writeInt(bucket);
- out.writeInt(files.size());
+
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
- for (DataFileMeta file : files) {
+ out.writeInt(beforeFiles.size());
+ for (DataFileMeta file : beforeFiles) {
+ dataFileSer.serialize(file, out);
+ }
+
+ out.writeInt(dataFiles.size());
+ for (DataFileMeta file : dataFiles) {
dataFileSer.serialize(file, out);
}
- out.writeBoolean(isIncremental);
- out.writeBoolean(reverseRowKind);
+
+ out.writeBoolean(isStreaming);
}
public static DataSplit deserialize(DataInputView in) throws IOException {
long snapshotId = in.readLong();
BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
int bucket = in.readInt();
- int fileNumber = in.readInt();
- List<DataFileMeta> files = new ArrayList<>(fileNumber);
+
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
+ int beforeNumber = in.readInt();
+ List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
+ for (int i = 0; i < beforeNumber; i++) {
+ beforeFiles.add(dataFileSer.deserialize(in));
+ }
+
+ int fileNumber = in.readInt();
+ List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
for (int i = 0; i < fileNumber; i++) {
- files.add(dataFileSer.deserialize(in));
+ dataFiles.add(dataFileSer.deserialize(in));
+ }
+
+ boolean isStreaming = in.readBoolean();
+
+ return builder()
+ .withSnapshot(snapshotId)
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withBeforeFiles(beforeFiles)
+ .withDataFiles(dataFiles)
+ .isStreaming(isStreaming)
+ .build();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder for {@link DataSplit}. */
+ public static class Builder {
+
+ private final DataSplit split = new DataSplit();
+
+ public Builder withSnapshot(long snapshot) {
+ this.split.snapshotId = snapshot;
+ return this;
+ }
+
+ public Builder withPartition(BinaryRow partition) {
+ this.split.partition = partition;
+ return this;
+ }
+
+ public Builder withBucket(int bucket) {
+ this.split.bucket = bucket;
+ return this;
+ }
+
+ public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
+ this.split.beforeFiles = beforeFiles;
+ return this;
+ }
+
+ public Builder withDataFiles(List<DataFileMeta> dataFiles) {
+ this.split.dataFiles = dataFiles;
+ return this;
+ }
+
+ public Builder isStreaming(boolean isStreaming) {
+ this.split.isStreaming = isStreaming;
+ return this;
+ }
+
+ public DataSplit build() {
+ checkArgument(split.partition != null);
+ checkArgument(split.bucket != -1);
+ checkArgument(split.dataFiles != null);
+
+ DataSplit split = new DataSplit();
+ split.assign(this.split);
+ return split;
}
- return new DataSplit(
- snapshotId, partition, bucket, files, in.readBoolean(),
in.readBoolean());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 40d40c830..e3ba10691 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -57,7 +57,7 @@ public class IncrementalStartingScanner implements
StartingScanner {
for (DataSplit split : splits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()), k
-> new ArrayList<>())
- .addAll(split.files());
+ .addAll(split.dataFiles());
}
}
@@ -67,7 +67,13 @@ public class IncrementalStartingScanner implements
StartingScanner {
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
reader.splitGenerator().splitForBatch(entry.getValue())) {
- result.add(new DataSplit(end, partition, bucket, files,
false));
+ result.add(
+ DataSplit.builder()
+ .withSnapshot(end)
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withDataFiles(files)
+ .build());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 19b4097be..768eb3f0e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -42,13 +42,18 @@ import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** Implementation of {@link SnapshotReader}. */
@@ -164,7 +169,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
Long snapshotId = plan.snapshotId();
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
- FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
+ groupByPartFiles(plan.files(FileKind.ADD));
if (options.scanPlanSortPartition()) {
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> newFiles = new
LinkedHashMap<>();
files.entrySet().stream()
@@ -176,7 +181,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
generateSplits(
snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 :
snapshotId,
scanKind != ScanKind.ALL,
- false,
splitGenerator,
files);
return new Plan() {
@@ -216,10 +220,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
.collect(Collectors.toList());
}
- /**
- * Get splits from an overwrite snapshot files. The {@link
FileKind#DELETE} part will be marked
- * with reverseRowKind = true (see {@link DataSplit}).
- */
+ /** Get splits from an overwritten snapshot files. */
@Override
public Plan readOverwrittenChanges() {
withKind(ScanKind.DELTA);
@@ -234,21 +235,44 @@ public class SnapshotReaderImpl implements SnapshotReader
{
List<DataSplit> splits = new ArrayList<>();
- splits.addAll(
- generateSplits(
- snapshotId,
- true,
- true,
- splitGenerator,
-
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.DELETE))));
-
- splits.addAll(
- generateSplits(
- snapshotId,
- true,
- false,
- splitGenerator,
-
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD))));
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+ groupByPartFiles(plan.files(FileKind.DELETE));
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+ groupByPartFiles(plan.files(FileKind.ADD));
+
+ Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
+ beforeFiles.forEach(
+ (part, bucketMap) ->
+ buckets.computeIfAbsent(part, k -> new HashSet<>())
+ .addAll(bucketMap.keySet()));
+ dataFiles.forEach(
+ (part, bucketMap) ->
+ buckets.computeIfAbsent(part, k -> new HashSet<>())
+ .addAll(bucketMap.keySet()));
+
+ for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
+ BinaryRow part = entry.getKey();
+ for (Integer bucket : entry.getValue()) {
+ List<DataFileMeta> before =
+ beforeFiles
+ .getOrDefault(part, Collections.emptyMap())
+ .getOrDefault(bucket, Collections.emptyList());
+ List<DataFileMeta> data =
+ dataFiles
+ .getOrDefault(part, Collections.emptyMap())
+ .getOrDefault(bucket, Collections.emptyList());
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(snapshotId)
+ .withPartition(part)
+ .withBucket(bucket)
+ .withBeforeFiles(before)
+ .withDataFiles(data)
+ .isStreaming(true)
+ .build();
+ splits.add(split);
+ }
+ }
return new Plan() {
@Nullable
@@ -283,8 +307,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
@VisibleForTesting
public static List<DataSplit> generateSplits(
long snapshotId,
- boolean isIncremental,
- boolean reverseRowKind,
+ boolean isStreaming,
SplitGenerator splitGenerator,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
List<DataSplit> splits = new ArrayList<>();
@@ -294,32 +317,21 @@ public class SnapshotReaderImpl implements SnapshotReader
{
Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
int bucket = bucketEntry.getKey();
- if (isIncremental) {
- // streaming splits incremental data files
-
splitGenerator.splitForStreaming(bucketEntry.getValue()).stream()
- .map(
- files ->
- new DataSplit(
- snapshotId,
- partition,
- bucket,
- files,
- true,
- reverseRowKind))
- .forEach(splits::add);
- } else {
-
splitGenerator.splitForBatch(bucketEntry.getValue()).stream()
- .map(
- files ->
- new DataSplit(
- snapshotId,
- partition,
- bucket,
- files,
- false,
- reverseRowKind))
- .forEach(splits::add);
- }
+ List<DataFileMeta> bucketFiles = bucketEntry.getValue();
+ DataSplit.Builder builder =
+ DataSplit.builder()
+ .withSnapshot(snapshotId)
+ .withPartition(partition)
+ .withBucket(bucket)
+ .isStreaming(isStreaming);
+ List<List<DataFileMeta>> splitGroups =
+ isStreaming
+ ? splitGenerator.splitForStreaming(bucketFiles)
+ : splitGenerator.splitForBatch(bucketFiles);
+ splitGroups.stream()
+ .map(builder::withDataFiles)
+ .map(DataSplit.Builder::build)
+ .forEach(splits::add);
}
}
return splits;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index b19f64cd3..c8aea53f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -167,7 +167,7 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
// Serialized files are only useful in streaming jobs.
// Batch compact jobs only run once, so they only need to know
what buckets should
// be compacted and don't need to concern incremental new
files.
- files = dataSplit.files();
+ files = dataSplit.dataFiles();
}
InternalRow row =
GenericRow.of(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index daf491cc1..94858164f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -169,7 +169,7 @@ public class FilesTable implements ReadonlyTable {
TableScan.Plan plan = plan();
return plan.splits().stream()
.map(s -> (DataSplit) s)
- .mapToLong(s -> s.files().size())
+ .mapToLong(s -> s.dataFiles().size())
.sum();
}
@@ -263,7 +263,7 @@ public class FilesTable implements ReadonlyTable {
for (Split dataSplit : plan.splits()) {
iteratorList.add(
Iterators.transform(
- ((DataSplit) dataSplit).files().iterator(),
+ ((DataSplit) dataSplit).dataFiles().iterator(),
file ->
toRow(
(DataSplit) dataSplit,
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6b88a838b..2d8b98a60 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -349,7 +349,7 @@ public class TestFileStore extends KeyValueFileStore {
}
public List<KeyValue> readKvsFromManifestEntries(
- List<ManifestEntry> entries, boolean isIncremental) throws
Exception {
+ List<ManifestEntry> entries, boolean isStreaming) throws Exception
{
if (LOG.isDebugEnabled()) {
for (ManifestEntry entry : entries) {
LOG.debug("reading from " + entry.toString());
@@ -374,12 +374,12 @@ public class TestFileStore extends KeyValueFileStore {
RecordReaderIterator<KeyValue> iterator =
new RecordReaderIterator<>(
read.createReader(
- new DataSplit(
- 0L /* unused */,
- entryWithPartition.getKey(),
- entryWithBucket.getKey(),
- entryWithBucket.getValue(),
- isIncremental)));
+ DataSplit.builder()
+
.withPartition(entryWithPartition.getKey())
+
.withBucket(entryWithBucket.getKey())
+
.withDataFiles(entryWithBucket.getValue())
+ .isStreaming(isStreaming)
+ .build()));
while (iterator.hasNext()) {
kvs.add(iterator.next().copy(keySerializer,
valueSerializer));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
index 6225b7ef8..8a2d118e0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
@@ -234,14 +234,15 @@ public class KeyValueFileStoreReadTest {
for (Map.Entry<BinaryRow, List<ManifestEntry>> entry :
filesGroupedByPartition.entrySet()) {
RecordReader<KeyValue> reader =
read.createReader(
- new DataSplit(
- snapshotId,
- entry.getKey(),
- 0,
- entry.getValue().stream()
- .map(ManifestEntry::file)
- .collect(Collectors.toList()),
- false));
+ DataSplit.builder()
+ .withSnapshot(snapshotId)
+ .withPartition(entry.getKey())
+ .withBucket(0)
+ .withDataFiles(
+ entry.getValue().stream()
+ .map(ManifestEntry::file)
+
.collect(Collectors.toList()))
+ .build());
RecordReaderIterator<KeyValue> actualIterator = new
RecordReaderIterator<>(reader);
while (actualIterator.hasNext()) {
result.add(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
index 86362cb60..3c7b168f1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
@@ -53,7 +53,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 6L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
@@ -87,7 +87,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 6L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
@@ -120,7 +120,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 6L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index c76a6e15d..703f82167 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -710,7 +710,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
SnapshotReader snapshotReader =
table.newSnapshotReader().withKind(ScanKind.DELTA);
List<DataSplit> splits0 = snapshotReader.read().dataSplits();
assertThat(splits0).hasSize(1);
- assertThat(splits0.get(0).files()).hasSize(1);
+ assertThat(splits0.get(0).dataFiles()).hasSize(1);
write.write(rowData(1, 10, 1000L));
write.write(rowData(1, 20, 2000L));
@@ -721,9 +721,9 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
List<DataSplit> splits1 = snapshotReader.read().dataSplits();
assertThat(splits1).hasSize(1);
- assertThat(splits1.get(0).files()).hasSize(1);
- assertThat(splits1.get(0).files().get(0).fileName())
- .isNotEqualTo(splits0.get(0).files().get(0).fileName());
+ assertThat(splits1.get(0).dataFiles()).hasSize(1);
+ assertThat(splits1.get(0).dataFiles().get(0).fileName())
+ .isNotEqualTo(splits0.get(0).dataFiles().get(0).fileName());
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
index 869dc8bca..5d6875ed1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
@@ -94,7 +94,7 @@ public class ChangelogWithKeyTableColumnTypeFileMetaTest
extends ColumnTypeFileM
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 3L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
index 8b576ecb9..a95bd63b6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
@@ -52,7 +52,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 3L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
@@ -74,7 +74,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
List<DataFileMeta> fileMetaList =
splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
assertThat(
fileMetaList.stream()
@@ -126,7 +126,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 2L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
@@ -157,7 +157,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
List<DataFileMeta> fileMetaList =
splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
assertThat(
fileMetaList.stream()
@@ -185,7 +185,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 2L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
@@ -207,7 +207,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
List<DataFileMeta> fileMetaList =
splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
assertThat(
fileMetaList.stream()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
index 14ccd403a..f2ab5b9a9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
@@ -45,7 +45,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
checkFilterRowCount(toDataFileMetas(splits), 6L);
return splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
},
(files, schemas) -> {
@@ -65,7 +65,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
List<DataFileMeta> fileMetaList =
splits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
assertThat(
fileMetaList.stream()
@@ -124,7 +124,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
List<DataFileMeta> files =
table.newSnapshotReader().withFilter(predicate).read().dataSplits()
.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
assertThat(files.size()).isGreaterThan(0);
checkFilterRowCount(files, 3L);
@@ -140,7 +140,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
List<DataFileMeta> filterFileMetas =
filterSplits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
checkFilterRowCount(filterFileMetas, 6L);
@@ -165,7 +165,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
filterAllSplits.stream()
.flatMap(
s ->
- s.files().stream()
+
s.dataFiles().stream()
.map(DataFileMeta::fileName))
.collect(Collectors.toList()))
.containsAll(
@@ -179,7 +179,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
Set<String> filterFileNames = new HashSet<>();
for (DataSplit dataSplit : filterAllSplits) {
- for (DataFileMeta dataFileMeta : dataSplit.files()) {
+ for (DataFileMeta dataFileMeta :
dataSplit.dataFiles()) {
FieldStats[] fieldStats =
getTableValueStats(dataFileMeta).fields(null);
int minValue = (Integer) fieldStats[1].minValue();
int maxValue = (Integer) fieldStats[1].maxValue();
@@ -205,7 +205,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
FileStoreTable table = createFileStoreTable(schemas);
List<DataFileMeta> files =
table.newSnapshotReader().read().dataSplits().stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
assertThat(files.size()).isGreaterThan(0);
checkFilterRowCount(files, 6L);
@@ -225,7 +225,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
List<DataFileMeta> filterFileMetas =
filterSplits.stream()
- .flatMap(s -> s.files().stream())
+ .flatMap(s -> s.dataFiles().stream())
.collect(Collectors.toList());
List<String> fileNameList =
filterFileMetas.stream()
@@ -250,7 +250,7 @@ public abstract class FileMetaFilterTestBase extends
SchemaEvolutionTableTestBas
Set<String> filterFileNames = new HashSet<>();
for (DataSplit dataSplit : allSplits) {
- for (DataFileMeta dataFileMeta : dataSplit.files()) {
+ for (DataFileMeta dataFileMeta :
dataSplit.dataFiles()) {
FieldStats[] fieldStats =
getTableValueStats(dataFileMeta).fields(null);
Integer minValue = (Integer)
fieldStats[3].minValue();
Integer maxValue = (Integer)
fieldStats[3].maxValue();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index e3966f850..8d7f94140 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -522,7 +522,7 @@ public abstract class FileStoreTableTestBase {
List<DataFileMeta> files =
table.newSnapshotReader().read().dataSplits().stream()
- .flatMap(split -> split.files().stream())
+ .flatMap(split -> split.dataFiles().stream())
.collect(Collectors.toList());
for (DataFileMeta file : files) {
assertThat(file.level()).isEqualTo(0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index ad76e557a..66f55eb38 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -429,7 +429,7 @@ public abstract class SchemaEvolutionTableTestBase {
}
protected static List<DataFileMeta> toDataFileMetas(List<DataSplit>
splits) {
- return splits.stream().flatMap(s ->
s.files().stream()).collect(Collectors.toList());
+ return splits.stream().flatMap(s ->
s.dataFiles().stream()).collect(Collectors.toList());
}
protected static void checkFilterRowCount(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 000ba6b2d..7fbd8f96d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -45,12 +45,12 @@ public class SplitTest {
files.add(gen.next().meta);
}
DataSplit split =
- new DataSplit(
- ThreadLocalRandom.current().nextLong(100),
- data.partition,
- data.bucket,
- files,
- false);
+ DataSplit.builder()
+
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
+ .withPartition(data.partition)
+ .withBucket(data.bucket)
+ .withDataFiles(files)
+ .build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
split.serialize(new DataOutputViewStreamWrapper(out));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index f420a264b..e5201a1e8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -113,10 +113,10 @@ public class CompactActionITCase extends ActionITCaseBase
{
for (DataSplit split : splits) {
if (split.partition().getInt(1) == 15) {
// compacted
- Assertions.assertEquals(1, split.files().size());
+ Assertions.assertEquals(1, split.dataFiles().size());
} else {
// not compacted
- Assertions.assertEquals(2, split.files().size());
+ Assertions.assertEquals(2, split.dataFiles().size());
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 75d6e9b52..257f844c7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -126,10 +126,10 @@ public class CompactorSinkITCase extends AbstractTestBase
{
DataSplit dataSplit = (DataSplit) split;
if (dataSplit.partition().getInt(1) == 15) {
// compacted
- assertEquals(1, dataSplit.files().size());
+ assertEquals(1, dataSplit.dataFiles().size());
} else {
// not compacted
- assertEquals(2, dataSplit.files().size());
+ assertEquals(2, dataSplit.dataFiles().size());
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index bce90c3a9..f511ee2af 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -506,13 +506,25 @@ public class ContinuousFileSplitEnumeratorTest {
int snapshotId, int bucket, List<DataFileMeta> files) {
return new FileStoreSourceSplit(
UUID.randomUUID().toString(),
- new DataSplit(snapshotId, row(1), bucket, files, true),
+ DataSplit.builder()
+ .withSnapshot(snapshotId)
+ .withPartition(row(1))
+ .withBucket(bucket)
+ .withDataFiles(files)
+ .isStreaming(true)
+ .build(),
0);
}
private static DataSplit createDataSplit(
long snapshotId, int bucket, List<DataFileMeta> files) {
- return new DataSplit(snapshotId, row(1), bucket, files, true);
+ return DataSplit.builder()
+ .withSnapshot(snapshotId)
+ .withPartition(row(1))
+ .withBucket(bucket)
+ .withDataFiles(files)
+ .isStreaming(true)
+ .build();
}
private static class Builder {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 87b082011..2fb1f090a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -83,7 +83,6 @@ public class FileStoreSourceSplitGeneratorTest {
SnapshotReaderImpl.generateSplits(
1L,
false,
- false,
new SplitGenerator() {
@Override
public List<List<DataFileMeta>> splitForBatch(
@@ -134,7 +133,7 @@ public class FileStoreSourceSplitGeneratorTest {
assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket);
assertThat(
((DataSplit) split.split())
- .files().stream()
+ .dataFiles().stream()
.map(DataFileMeta::fileName)
.collect(Collectors.toList()))
.isEqualTo(files);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index f27bb281d..d0cafbb95 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -117,8 +117,15 @@ public class FileStoreSourceSplitSerializerTest {
List<DataFileMeta> files,
boolean isIncremental,
long recordsToSkip) {
- return new FileStoreSourceSplit(
- id, new DataSplit(1L, partition, bucket, files,
isIncremental), recordsToSkip);
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(1)
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withDataFiles(files)
+ .isStreaming(isIncremental)
+ .build();
+ return new FileStoreSourceSplit(id, split, recordsToSkip);
}
private static FileStoreSourceSplit
serializeAndDeserialize(FileStoreSourceSplit split)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
index ac99902c9..c1c0efdc9 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
@@ -65,7 +65,7 @@ public class PaimonInputSplit extends FileSplit {
@Override
public long getLength() {
- return split.files().stream().mapToLong(DataFileMeta::fileSize).sum();
+ return
split.dataFiles().stream().mapToLong(DataFileMeta::fileSize).sum();
}
@Override
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index ecf683e46..908fce7c1 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -51,18 +51,18 @@ public class PaimonInputSplitTest {
}
BinaryRow wantedPartition = generated.get(0).partition;
- PaimonInputSplit split =
- new PaimonInputSplit(
- tempDir.toString(),
- new DataSplit(
- ThreadLocalRandom.current().nextLong(100),
- wantedPartition,
- 0,
+ DataSplit dataSplit =
+ DataSplit.builder()
+
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
+ .withPartition(wantedPartition)
+ .withBucket(0)
+ .withDataFiles(
generated.stream()
.filter(d ->
d.partition.equals(wantedPartition))
.map(d -> d.meta)
- .collect(Collectors.toList()),
- false));
+ .collect(Collectors.toList()))
+ .build();
+ PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(),
dataSplit);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);