This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2a928b974 [test] Add test case for read with raw convertible splits
(#3163)
2a928b974 is described below
commit 2a928b9747a358865d49ff9b5d3d03cb14a4df98
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Apr 7 13:41:48 2024 +0800
[test] Add test case for read with raw convertible splits (#3163)
---
.../java/org/apache/paimon/options/MemorySize.java | 4 ++
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 78 ++++++++++++++++++++++
.../paimon/table/source/SplitGeneratorTest.java | 21 ++++++
3 files changed, 103 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
b/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
index 9296a7054..d40450e0c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
@@ -88,6 +88,10 @@ public class MemorySize implements java.io.Serializable,
Comparable<MemorySize>
return new MemorySize(kibiBytes << 10);
}
+ public static MemorySize ofBytes(long bytes) {
+ return new MemorySize(bytes);
+ }
+
// ------------------------------------------------------------------------
/** Gets the memory size in bytes. */
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index a1fc6f0b0..3357c9f7d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -87,7 +87,10 @@ import static
org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
@@ -1328,6 +1331,81 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.close();
}
+ @Test
+ public void testReadWithRawConvertibleSplits() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(FILE_FORMAT,
CoreOptions.FileFormatType.AVRO);
+ options.set(SOURCE_SPLIT_OPEN_FILE_COST,
MemorySize.ofBytes(1));
+ options.set(SOURCE_SPLIT_TARGET_SIZE,
MemorySize.ofKibiBytes(5));
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // file1
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 0, 0L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // file2
+ for (int i = 1; i < 1000; i++) {
+ write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
+ }
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // file3
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1000L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // file4
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1001L));
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // split1[file1], split2[file2], split3[file3, file4]
+ List<DataSplit> dataSplits =
table.newSnapshotReader().read().dataSplits();
+ assertThat(dataSplits).hasSize(3);
+ assertThat(dataSplits.get(0).dataFiles()).hasSize(1);
+ assertThat(dataSplits.get(0).convertToRawFiles()).isPresent();
+ assertThat(dataSplits.get(1).dataFiles()).hasSize(1);
+ assertThat(dataSplits.get(1).convertToRawFiles()).isPresent();
+ assertThat(dataSplits.get(2).dataFiles()).hasSize(2);
+ assertThat(dataSplits.get(2).convertToRawFiles()).isEmpty();
+
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+ assertThat(result.size()).isEqualTo(1001);
+ for (int i = 0; i < 1000; i++) {
+ assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]",
i, i));
+ }
+ assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]");
+
+ // compact all files
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+
+ // split1[compactedFile]
+ dataSplits = table.newSnapshotReader().read().dataSplits();
+ assertThat(dataSplits).hasSize(1);
+ assertThat(dataSplits.get(0).dataFiles()).hasSize(1);
+ assertThat(dataSplits.get(0).convertToRawFiles()).isPresent();
+
+ result = getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+ assertThat(result.size()).isEqualTo(1001);
+ for (int i = 0; i < 1000; i++) {
+ assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]",
i, i));
+ }
+ assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]");
+
+ write.close();
+ commit.close();
+ }
+
@Test
public void testTableQueryForLookup() throws Exception {
FileStoreTable table =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index 82663a72c..a10413005 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -185,6 +185,27 @@ public class SplitGeneratorTest {
Pair.of(Collections.singletonList("6"), true));
}
+ @Test
+ public void testMergeTreeSplitRawConvertible() {
+ Comparator<InternalRow> comparator = Comparator.comparingInt(o ->
o.getInt(0));
+ MergeTreeSplitGenerator mergeTreeSplitGenerator =
+ new MergeTreeSplitGenerator(comparator, 100, 2, false,
DEDUPLICATE);
+
+ List<DataFileMeta> files =
+ Arrays.asList(
+ newFile("1", 0, 0, 10, 10L),
+ newFile("2", 0, 0, 12, 12L),
+ newFile("3", 0, 13, 20, 20L),
+ newFile("4", 0, 21, 200, 200L),
+ newFile("5", 0, 201, 210, 210L),
+ newFile("6", 0, 211, 220, 220L));
+
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files)))
+ .containsExactlyInAnyOrder(
+ Pair.of(Arrays.asList("1", "2", "3"), false),
+ Pair.of(Collections.singletonList("4"), true),
+ Pair.of(Arrays.asList("5", "6"), false));
+ }
+
private List<List<String>> toNames(List<SplitGenerator.SplitGroup>
splitGroups) {
return splitGroups.stream()
.map(