This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a928b974 [test] Add test case for read with raw convertible splits 
(#3163)
2a928b974 is described below

commit 2a928b9747a358865d49ff9b5d3d03cb14a4df98
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Apr 7 13:41:48 2024 +0800

    [test] Add test case for read with raw convertible splits (#3163)
---
 .../java/org/apache/paimon/options/MemorySize.java |  4 ++
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 78 ++++++++++++++++++++++
 .../paimon/table/source/SplitGeneratorTest.java    | 21 ++++++
 3 files changed, 103 insertions(+)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java 
b/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
index 9296a7054..d40450e0c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/MemorySize.java
@@ -88,6 +88,10 @@ public class MemorySize implements java.io.Serializable, 
Comparable<MemorySize>
         return new MemorySize(kibiBytes << 10);
     }
 
+    public static MemorySize ofBytes(long bytes) {
+        return new MemorySize(bytes);
+    }
+
     // ------------------------------------------------------------------------
 
     /** Gets the memory size in bytes. */
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index a1fc6f0b0..3357c9f7d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -87,7 +87,10 @@ import static 
org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
 import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
 import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
@@ -1328,6 +1331,81 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testReadWithRawConvertibleSplits() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set(FILE_FORMAT, 
CoreOptions.FileFormatType.AVRO);
+                            options.set(SOURCE_SPLIT_OPEN_FILE_COST, 
MemorySize.ofBytes(1));
+                            options.set(SOURCE_SPLIT_TARGET_SIZE, 
MemorySize.ofKibiBytes(5));
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // file1
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 0, 0L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // file2
+        for (int i = 1; i < 1000; i++) {
+            write.write(rowDataWithKind(RowKind.INSERT, 1, i, (long) i));
+        }
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // file3
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1000L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // file4
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 1000, 1001L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        // split1[file1], split2[file2], split3[file3, file4]
+        List<DataSplit> dataSplits = 
table.newSnapshotReader().read().dataSplits();
+        assertThat(dataSplits).hasSize(3);
+        assertThat(dataSplits.get(0).dataFiles()).hasSize(1);
+        assertThat(dataSplits.get(0).convertToRawFiles()).isPresent();
+        assertThat(dataSplits.get(1).dataFiles()).hasSize(1);
+        assertThat(dataSplits.get(1).convertToRawFiles()).isPresent();
+        assertThat(dataSplits.get(2).dataFiles()).hasSize(2);
+        assertThat(dataSplits.get(2).convertToRawFiles()).isEmpty();
+
+        Function<InternalRow, String> rowDataToString =
+                row ->
+                        internalRowToString(
+                                row,
+                                DataTypes.ROW(
+                                        DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()));
+        List<String> result =
+                getResult(table.newRead(), table.newScan().plan().splits(), 
rowDataToString);
+        assertThat(result.size()).isEqualTo(1001);
+        for (int i = 0; i < 1000; i++) {
+            assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]", 
i, i));
+        }
+        assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]");
+
+        // compact all files
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
+
+        // split1[compactedFile]
+        dataSplits = table.newSnapshotReader().read().dataSplits();
+        assertThat(dataSplits).hasSize(1);
+        assertThat(dataSplits.get(0).dataFiles()).hasSize(1);
+        assertThat(dataSplits.get(0).convertToRawFiles()).isPresent();
+
+        result = getResult(table.newRead(), table.newScan().plan().splits(), 
rowDataToString);
+        assertThat(result.size()).isEqualTo(1001);
+        for (int i = 0; i < 1000; i++) {
+            assertThat(result.get(i)).isEqualTo(String.format("+I[1, %s, %s]", 
i, i));
+        }
+        assertThat(result.get(1000)).isEqualTo("+I[1, 1000, 1001]");
+
+        write.close();
+        commit.close();
+    }
+
     @Test
     public void testTableQueryForLookup() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index 82663a72c..a10413005 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -185,6 +185,27 @@ public class SplitGeneratorTest {
                         Pair.of(Collections.singletonList("6"), true));
     }
 
+    @Test
+    public void testMergeTreeSplitRawConvertible() {
+        Comparator<InternalRow> comparator = Comparator.comparingInt(o -> 
o.getInt(0));
+        MergeTreeSplitGenerator mergeTreeSplitGenerator =
+                new MergeTreeSplitGenerator(comparator, 100, 2, false, 
DEDUPLICATE);
+
+        List<DataFileMeta> files =
+                Arrays.asList(
+                        newFile("1", 0, 0, 10, 10L),
+                        newFile("2", 0, 0, 12, 12L),
+                        newFile("3", 0, 13, 20, 20L),
+                        newFile("4", 0, 21, 200, 200L),
+                        newFile("5", 0, 201, 210, 210L),
+                        newFile("6", 0, 211, 220, 220L));
+        
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files)))
+                .containsExactlyInAnyOrder(
+                        Pair.of(Arrays.asList("1", "2", "3"), false),
+                        Pair.of(Collections.singletonList("4"), true),
+                        Pair.of(Arrays.asList("5", "6"), false));
+    }
+
     private List<List<String>> toNames(List<SplitGenerator.SplitGroup> 
splitGroups) {
         return splitGroups.stream()
                 .map(

Reply via email to