This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 19e9b1366 [core] Move convertToRawFiles method from TableScan to Split
(#2361)
19e9b1366 is described below
commit 19e9b136682b285c40c2efc04decc2ccf8391878
Author: tsreaper <[email protected]>
AuthorDate: Wed Nov 22 11:47:53 2023 +0800
[core] Move convertToRawFiles method from TableScan to Split (#2361)
---
.../table/source/AbstractInnerTableScan.java | 9 --
.../org/apache/paimon/table/source/DataSplit.java | 38 +++++-
.../org/apache/paimon/table/source/RawFile.java | 21 +++
.../java/org/apache/paimon/table/source/Split.java | 11 ++
.../org/apache/paimon/table/source/TableScan.java | 9 --
.../table/source/snapshot/SnapshotReader.java | 8 --
.../table/source/snapshot/SnapshotReaderImpl.java | 143 ++++++++++-----------
.../apache/paimon/table/system/AuditLogTable.java | 9 --
.../table/source/snapshot/SnapshotReaderTest.java | 16 +--
.../source/FileStoreSourceSplitGeneratorTest.java | 121 ++++++-----------
10 files changed, 185 insertions(+), 200 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 277e01a2e..501630d80 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -197,13 +197,4 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
public List<BinaryRow> listPartitions() {
return snapshotReader.partitions();
}
-
- @Override
- public Optional<List<RawFile>> convertToRawFiles(Split split) {
- if (split instanceof DataSplit) {
- return snapshotReader.convertToRawFiles((DataSplit) split);
- } else {
- return Optional.empty();
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 87898154c..5226739df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -31,8 +31,10 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.OptionalLong;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -40,7 +42,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {
- private static final long serialVersionUID = 3L;
+ private static final long serialVersionUID = 4L;
private long snapshotId = 0;
private boolean isStreaming = false;
@@ -50,6 +52,8 @@ public class DataSplit implements Split {
private int bucket = -1;
private List<DataFileMeta> dataFiles;
+ private List<RawFile> rawFiles = Collections.emptyList();
+
public DataSplit() {}
public long snapshotId() {
@@ -89,6 +93,15 @@ public class DataSplit implements Split {
return rowCount;
}
+ @Override
+ public Optional<List<RawFile>> convertToRawFiles() {
+ if (rawFiles.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(rawFiles);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -102,12 +115,13 @@ public class DataSplit implements Split {
&& Objects.equals(partition, split.partition)
&& Objects.equals(beforeFiles, split.beforeFiles)
&& Objects.equals(dataFiles, split.dataFiles)
- && isStreaming == split.isStreaming;
+ && isStreaming == split.isStreaming
+ && Objects.equals(rawFiles, split.rawFiles);
}
@Override
public int hashCode() {
- return Objects.hash(partition, bucket, beforeFiles, dataFiles,
isStreaming);
+ return Objects.hash(partition, bucket, beforeFiles, dataFiles,
isStreaming, rawFiles);
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -125,6 +139,7 @@ public class DataSplit implements Split {
this.beforeFiles = other.beforeFiles;
this.dataFiles = other.dataFiles;
this.isStreaming = other.isStreaming;
+ this.rawFiles = other.rawFiles;
}
public void serialize(DataOutputView out) throws IOException {
@@ -144,6 +159,11 @@ public class DataSplit implements Split {
}
out.writeBoolean(isStreaming);
+
+ out.writeInt(rawFiles.size());
+ for (RawFile rawFile : rawFiles) {
+ rawFile.serialize(out);
+ }
}
public static DataSplit deserialize(DataInputView in) throws IOException {
@@ -166,6 +186,12 @@ public class DataSplit implements Split {
boolean isStreaming = in.readBoolean();
+ int rawFileNum = in.readInt();
+ List<RawFile> rawFiles = new ArrayList<>();
+ for (int i = 0; i < rawFileNum; i++) {
+ rawFiles.add(RawFile.deserialize(in));
+ }
+
return builder()
.withSnapshot(snapshotId)
.withPartition(partition)
@@ -173,6 +199,7 @@ public class DataSplit implements Split {
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
+ .rawFiles(rawFiles)
.build();
}
@@ -215,6 +242,11 @@ public class DataSplit implements Split {
return this;
}
+ public Builder rawFiles(List<RawFile> rawFiles) {
+ this.split.rawFiles = rawFiles;
+ return this;
+ }
+
public DataSplit build() {
checkArgument(split.partition != null);
checkArgument(split.bucket != -1);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
index a1c1f07d2..2923304e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
@@ -20,7 +20,10 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+import java.io.IOException;
import java.util.Objects;
/**
@@ -73,6 +76,24 @@ public class RawFile {
return schemaId;
}
+ public void serialize(DataOutputView out) throws IOException {
+ out.writeUTF(path);
+ out.writeLong(offset);
+ out.writeLong(length);
+ out.writeUTF(format);
+ out.writeLong(schemaId);
+ }
+
+ public static RawFile deserialize(DataInputView in) throws IOException {
+ String path = in.readUTF();
+ long offset = in.readLong();
+ long length = in.readLong();
+ String format = in.readUTF();
+ long schemaId = in.readLong();
+
+ return new RawFile(path, offset, length, format, schemaId);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof RawFile)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
index b4cbbbd4e..ee96943a4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
/**
* An input split for reading.
@@ -31,4 +33,13 @@ import java.io.Serializable;
public interface Split extends Serializable {
long rowCount();
+
+ /**
+ * If all files in this split can be read without merging, returns an
{@link Optional} wrapping
+ * a list of {@link RawFile}s to be read without merging. Otherwise,
returns {@link
+ * Optional#empty()}.
+ */
+ default Optional<List<RawFile>> convertToRawFiles() {
+ return Optional.empty();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index e9e876dd1..ba8b0b5e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.Table;
import java.util.List;
-import java.util.Optional;
/**
* A scan of {@link Table} to generate {@link Split} splits.
@@ -39,14 +38,6 @@ public interface TableScan {
/** Get partitions from simple manifest entries. */
List<BinaryRow> listPartitions();
- /**
- * If all files in this split can be read without merging, returns an
{@link Optional} wrapping
- * a list of {@link RawFile}s, otherwise returns {@link Optional#empty()}.
- */
- default Optional<List<RawFile>> convertToRawFiles(Split split) {
- return Optional.empty();
- }
-
/**
* Plan of scan.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 10d51684b..4af797c0a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
@@ -36,7 +35,6 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/** Read splits from specified {@link Snapshot} with given configuration. */
public interface SnapshotReader {
@@ -76,12 +74,6 @@ public interface SnapshotReader {
/** Get partitions from a snapshot. */
List<BinaryRow> partitions();
- /**
- * If all files in this split can be read without merging, returns an
{@link Optional} wrapping
- * a list of {@link RawFile}s, otherwise returns {@link Optional#empty()}.
- */
- Optional<List<RawFile>> convertToRawFiles(DataSplit split);
-
/** Result plan of this scan. */
interface Plan extends TableScan.Plan {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a5f5cc36e..1c6bdc64d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.consumer.ConsumerManager;
@@ -258,6 +257,40 @@ public class SnapshotReaderImpl implements SnapshotReader {
};
}
+ private List<DataSplit> generateSplits(
+ long snapshotId,
+ boolean isStreaming,
+ SplitGenerator splitGenerator,
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
+ List<DataSplit> splits = new ArrayList<>();
+ for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
+ groupedDataFiles.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
+ int bucket = bucketEntry.getKey();
+ List<DataFileMeta> bucketFiles = bucketEntry.getValue();
+ DataSplit.Builder builder =
+ DataSplit.builder()
+ .withSnapshot(snapshotId)
+ .withPartition(partition)
+ .withBucket(bucket)
+ .isStreaming(isStreaming);
+ List<List<DataFileMeta>> splitGroups =
+ isStreaming
+ ? splitGenerator.splitForStreaming(bucketFiles)
+ : splitGenerator.splitForBatch(bucketFiles);
+ for (List<DataFileMeta> dataFiles : splitGroups) {
+ splits.add(
+ builder.withDataFiles(dataFiles)
+ .rawFiles(convertToRawFiles(partition,
bucket, dataFiles))
+ .build());
+ }
+ }
+ }
+ return splits;
+ }
+
@Override
public List<BinaryRow> partitions() {
List<ManifestEntry> entryList = scan.plan().files();
@@ -275,47 +308,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
.collect(Collectors.toList());
}
- @Override
- public Optional<List<RawFile>> convertToRawFiles(DataSplit split) {
- String bucketPath = pathFactory.bucketPath(split.partition(),
split.bucket()).toString();
- List<DataFileMeta> dataFiles = split.dataFiles();
-
- // bucket with only one file can be returned
- if (dataFiles.size() == 1) {
- return Optional.of(
- Collections.singletonList(makeRawTableFile(bucketPath,
dataFiles.get(0))));
- }
-
- // append only files can be returned
- if (tableSchema.primaryKeys().isEmpty()) {
- return Optional.of(makeRawTableFiles(bucketPath, dataFiles));
- }
-
- // bucket containing only one level (except level 0) can be returned
- Set<Integer> levels =
-
dataFiles.stream().map(DataFileMeta::level).collect(Collectors.toSet());
- if (levels.size() == 1 && !levels.contains(0)) {
- return Optional.of(makeRawTableFiles(bucketPath, dataFiles));
- }
-
- return Optional.empty();
- }
-
- private List<RawFile> makeRawTableFiles(String bucketPath,
List<DataFileMeta> dataFiles) {
- return dataFiles.stream()
- .map(f -> makeRawTableFile(bucketPath, f))
- .collect(Collectors.toList());
- }
-
- private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
- return new RawFile(
- bucketPath + "/" + meta.fileName(),
- 0,
- meta.fileSize(),
- new
CoreOptions(tableSchema.options()).formatType().toString().toLowerCase(),
- meta.schemaId());
- }
-
@Override
public Plan readChanges() {
withMode(ScanMode.DELTA);
@@ -368,6 +360,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
.withBeforeFiles(before)
.withDataFiles(data)
.isStreaming(isStreaming)
+ .rawFiles(convertToRawFiles(part, bucket,
data))
.build();
splits.add(split);
}
@@ -414,36 +407,42 @@ public class SnapshotReaderImpl implements SnapshotReader
{
return lazyPartitionComparator;
}
- @VisibleForTesting
- public static List<DataSplit> generateSplits(
- long snapshotId,
- boolean isStreaming,
- SplitGenerator splitGenerator,
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
- List<DataSplit> splits = new ArrayList<>();
- for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
- groupedDataFiles.entrySet()) {
- BinaryRow partition = entry.getKey();
- Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
- for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
- int bucket = bucketEntry.getKey();
- List<DataFileMeta> bucketFiles = bucketEntry.getValue();
- DataSplit.Builder builder =
- DataSplit.builder()
- .withSnapshot(snapshotId)
- .withPartition(partition)
- .withBucket(bucket)
- .isStreaming(isStreaming);
- List<List<DataFileMeta>> splitGroups =
- isStreaming
- ? splitGenerator.splitForStreaming(bucketFiles)
- : splitGenerator.splitForBatch(bucketFiles);
- splitGroups.stream()
- .map(builder::withDataFiles)
- .map(DataSplit.Builder::build)
- .forEach(splits::add);
- }
+ private List<RawFile> convertToRawFiles(
+ BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
+ String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
+
+ // bucket with only one file can be returned
+ if (dataFiles.size() == 1) {
+ return Collections.singletonList(makeRawTableFile(bucketPath,
dataFiles.get(0)));
}
- return splits;
+
+ // append only files can be returned
+ if (tableSchema.primaryKeys().isEmpty()) {
+ return makeRawTableFiles(bucketPath, dataFiles);
+ }
+
+ // bucket containing only one level (except level 0) can be returned
+ Set<Integer> levels =
+
dataFiles.stream().map(DataFileMeta::level).collect(Collectors.toSet());
+ if (levels.size() == 1 && !levels.contains(0)) {
+ return makeRawTableFiles(bucketPath, dataFiles);
+ }
+
+ return Collections.emptyList();
+ }
+
+ private List<RawFile> makeRawTableFiles(String bucketPath,
List<DataFileMeta> dataFiles) {
+ return dataFiles.stream()
+ .map(f -> makeRawTableFile(bucketPath, f))
+ .collect(Collectors.toList());
+ }
+
+ private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
+ return new RawFile(
+ bucketPath + "/" + meta.fileName(),
+ 0,
+ meta.fileSize(),
+ new
CoreOptions(tableSchema.options()).formatType().toString().toLowerCase(),
+ meta.schemaId());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 990c71257..f8e84fa72 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -37,11 +37,9 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
@@ -281,13 +279,6 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
public List<BinaryRow> partitions() {
return snapshotReader.partitions();
}
-
- @Override
- public Optional<List<RawFile>> convertToRawFiles(DataSplit split) {
- // we can't return snapshotReader.convertToRawFiles(split),
- // because AuditLogTable must use its special reader
- return Optional.empty();
- }
}
private class AuditLogBatchScan implements InnerTableScan {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
index 9de27a222..edb3f0a86 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
@@ -70,7 +70,7 @@ public class SnapshotReaderTest {
}
@Test
- public void testGetPrimaryKeyRawTableFiles() throws Exception {
+ public void testGetPrimaryKeyRawFiles() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {DataTypes.STRING(), DataTypes.INT(),
DataTypes.BIGINT()},
@@ -98,7 +98,7 @@ public class SnapshotReaderTest {
assertThat(dataSplit.dataFiles()).hasSize(1);
DataFileMeta meta = dataSplit.dataFiles().get(0);
String partition = dataSplit.partition().getString(0).toString();
- assertThat(reader.convertToRawFiles(dataSplit))
+ assertThat(dataSplit.convertToRawFiles())
.hasValue(
Collections.singletonList(
new RawFile(
@@ -123,7 +123,7 @@ public class SnapshotReaderTest {
assertThat(dataSplits).hasSize(2);
for (DataSplit dataSplit : dataSplits) {
assertThat(dataSplit.dataFiles()).hasSize(2);
- assertThat(reader.convertToRawFiles(dataSplit)).isNotPresent();
+ assertThat(dataSplit.convertToRawFiles()).isNotPresent();
}
// compact all files
@@ -146,7 +146,7 @@ public class SnapshotReaderTest {
assertThat(dataSplit.dataFiles()).hasSize(1);
DataFileMeta meta = dataSplit.dataFiles().get(0);
String partition = dataSplit.partition().getString(0).toString();
- assertThat(reader.convertToRawFiles(dataSplit))
+ assertThat(dataSplit.convertToRawFiles())
.hasValue(
Collections.singletonList(
new RawFile(
@@ -171,7 +171,7 @@ public class SnapshotReaderTest {
assertThat(dataSplits).hasSize(2);
for (DataSplit dataSplit : dataSplits) {
assertThat(dataSplit.dataFiles()).hasSize(2);
- assertThat(reader.convertToRawFiles(dataSplit)).isNotPresent();
+ assertThat(dataSplit.convertToRawFiles()).isNotPresent();
}
write.close();
@@ -179,7 +179,7 @@ public class SnapshotReaderTest {
}
@Test
- public void testGetAppendOnlyRawTableFiles() throws Exception {
+ public void testGetAppendOnlyRawFiles() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
@@ -205,7 +205,7 @@ public class SnapshotReaderTest {
DataSplit dataSplit = dataSplits.get(0);
assertThat(dataSplit.dataFiles()).hasSize(1);
DataFileMeta meta = dataSplit.dataFiles().get(0);
- assertThat(reader.convertToRawFiles(dataSplit))
+ assertThat(dataSplit.convertToRawFiles())
.hasValue(
Collections.singletonList(
new RawFile(
@@ -237,7 +237,7 @@ public class SnapshotReaderTest {
assertThat(dataSplit.dataFiles()).hasSize(2);
DataFileMeta meta0 = dataSplit.dataFiles().get(0);
DataFileMeta meta1 = dataSplit.dataFiles().get(1);
- assertThat(reader.convertToRawFiles(dataSplit))
+ assertThat(dataSplit.convertToRawFiles())
.hasValue(
Arrays.asList(
new RawFile(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 880f3288e..5015e9219 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -19,20 +19,13 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.ScanMode;
-import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.junit.jupiter.api.Test;
-import javax.annotation.Nullable;
-
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -47,62 +40,20 @@ public class FileStoreSourceSplitGeneratorTest {
@Test
public void test() {
- FileStoreScan.Plan plan =
- new FileStoreScan.Plan() {
- @Override
- public Long watermark() {
- return null;
- }
-
- @Nullable
- @Override
- public Long snapshotId() {
- return 1L;
- }
-
- @Override
- public ScanMode scanMode() {
- return ScanMode.ALL;
- }
-
- @Override
- public List<ManifestEntry> files() {
- return Arrays.asList(
- makeEntry(1, 0, "f0"),
- makeEntry(1, 0, "f1"),
- makeEntry(1, 1, "f2"),
- makeEntry(2, 0, "f3"),
- makeEntry(2, 0, "f4"),
- makeEntry(2, 0, "f5"),
- makeEntry(2, 1, "f6"),
- makeEntry(3, 0, "f7"),
- makeEntry(3, 1, "f8"),
- makeEntry(4, 0, "f9"),
- makeEntry(4, 1, "f10"),
- makeEntry(5, 0, "f11"),
- makeEntry(5, 1, "f12"),
- makeEntry(6, 0, "f13"),
- makeEntry(6, 1, "f14"));
- }
- };
List<DataSplit> scanSplits =
- SnapshotReaderImpl.generateSplits(
- 1L,
- false,
- new SplitGenerator() {
- @Override
- public List<List<DataFileMeta>> splitForBatch(
- List<DataFileMeta> files) {
- return Collections.singletonList(files);
- }
-
- @Override
- public List<List<DataFileMeta>> splitForStreaming(
- List<DataFileMeta> files) {
- return null;
- }
- },
-
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
+ Arrays.asList(
+ dataSplit(1, 0, "f0", "f1"),
+ dataSplit(1, 1, "f2"),
+ dataSplit(2, 0, "f3", "f4", "f5"),
+ dataSplit(2, 1, "f6"),
+ dataSplit(3, 0, "f7"),
+ dataSplit(3, 1, "f8"),
+ dataSplit(4, 0, "f9"),
+ dataSplit(4, 1, "f10"),
+ dataSplit(5, 0, "f11"),
+ dataSplit(5, 1, "f12"),
+ dataSplit(6, 0, "f13"),
+ dataSplit(6, 1, "f14"));
DataFilePlan tableScanPlan = new DataFilePlan(scanSplits);
List<FileStoreSourceSplit> splits =
@@ -145,24 +96,30 @@ public class FileStoreSourceSplitGeneratorTest {
.isEqualTo(files);
}
- private ManifestEntry makeEntry(int partition, int bucket, String
fileName) {
- return new ManifestEntry(
- FileKind.ADD,
- row(partition), // not used
- bucket, // not used
- 0, // not used
- new DataFileMeta(
- fileName,
- 0, // not used
- 0, // not used
- null, // not used
- null, // not used
- StatsTestUtils.newEmptyTableStats(), // not used
- StatsTestUtils.newEmptyTableStats(), // not used
- 0, // not used
- 0, // not used
- 0, // not used
- 0 // not used
- ));
+ private DataSplit dataSplit(int partition, int bucket, String...
fileNames) {
+ List<DataFileMeta> metas = new ArrayList<>();
+ for (String fileName : fileNames) {
+ metas.add(
+ new DataFileMeta(
+ fileName,
+ 0, // not used
+ 0, // not used
+ null, // not used
+ null, // not used
+ StatsTestUtils.newEmptyTableStats(), // not used
+ StatsTestUtils.newEmptyTableStats(), // not used
+ 0, // not used
+ 0, // not used
+ 0, // not used
+ 0 // not used
+ ));
+ }
+ return DataSplit.builder()
+ .withSnapshot(1)
+ .withPartition(row(partition))
+ .withBucket(bucket)
+ .isStreaming(false)
+ .withDataFiles(metas)
+ .build();
}
}