This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e8fb02e [common] supports configuring force-lookup. (#3905)
37e8fb02e is described below

commit 37e8fb02e7ef185b74765ca07486e977e2893583
Author: liming.1018 <[email protected]>
AuthorDate: Wed Aug 7 19:02:30 2024 +0800

    [common] supports configuring force-lookup. (#3905)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 +++-
 .../org/apache/paimon/lookup/LookupStrategy.java   | 15 +++--
 .../LookupChangelogMergeFunctionWrapperTest.java   |  6 +-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 64 ++++++++++++++++++++++
 .../paimon/flink/sink/AsyncLookupSinkWrite.java    |  2 +-
 6 files changed, 94 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 784719d76..b0497b345 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -345,6 +345,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Integer</td>
             <td>The maximal fan-in for external merge sort. It limits the 
number of file handles. If it is too small, may cause intermediate merging. But 
if it is too large, it will cause too many files opened at the same time, 
consume memory and lead to random reading.</td>
         </tr>
+        <tr>
+            <td><h5>force-lookup</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to force the use of lookup for compaction.</td>
+        </tr>
         <tr>
             <td><h5>lookup-wait</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index bd10ff000..aa4bcb685 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1258,6 +1258,13 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("Specifies the commit user prefix.");
 
+    @Immutable
+    public static final ConfigOption<Boolean> FORCE_LOOKUP =
+            key("force-lookup")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to force the use of lookup for 
compaction.");
+
     public static final ConfigOption<Boolean> LOOKUP_WAIT =
             key("lookup-wait")
                     .booleanType()
@@ -1705,7 +1712,8 @@ public class CoreOptions implements Serializable {
         return LookupStrategy.from(
                 mergeEngine().equals(MergeEngine.FIRST_ROW),
                 changelogProducer().equals(ChangelogProducer.LOOKUP),
-                deletionVectorsEnabled());
+                deletionVectorsEnabled(),
+                options.get(FORCE_LOOKUP));
     }
 
     public boolean changelogRowDeduplicate() {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
index 24e03ebdb..f01c7c967 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
@@ -29,15 +29,22 @@ public class LookupStrategy {
 
     public final boolean deletionVector;
 
-    private LookupStrategy(boolean isFirstRow, boolean produceChangelog, 
boolean deletionVector) {
+    private LookupStrategy(
+            boolean isFirstRow,
+            boolean produceChangelog,
+            boolean deletionVector,
+            boolean forceLookup) {
         this.isFirstRow = isFirstRow;
         this.produceChangelog = produceChangelog;
         this.deletionVector = deletionVector;
-        this.needLookup = produceChangelog || deletionVector || isFirstRow;
+        this.needLookup = produceChangelog || deletionVector || isFirstRow || 
forceLookup;
     }
 
     public static LookupStrategy from(
-            boolean isFirstRow, boolean produceChangelog, boolean 
deletionVector) {
-        return new LookupStrategy(isFirstRow, produceChangelog, 
deletionVector);
+            boolean isFirstRow,
+            boolean produceChangelog,
+            boolean deletionVector,
+            boolean forceLookup) {
+        return new LookupStrategy(isFirstRow, produceChangelog, 
deletionVector, forceLookup);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index dbee0805a..e2d37eae4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -73,7 +73,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         highLevel::get,
                         EQUALISER,
                         changelogRowDeduplicate,
-                        LookupStrategy.from(false, true, false),
+                        LookupStrategy.from(false, true, false, false),
                         null,
                         null);
 
@@ -233,7 +233,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         key -> null,
                         EQUALISER,
                         changelogRowDeduplicate,
-                        LookupStrategy.from(false, true, false),
+                        LookupStrategy.from(false, true, false, false),
                         null,
                         null);
 
@@ -322,7 +322,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         highLevel::get,
                         EQUALISER,
                         false,
-                        LookupStrategy.from(false, true, false),
+                        LookupStrategy.from(false, true, false, false),
                         null,
                         UserDefinedSeqComparator.create(
                                 RowType.builder().field("f0", 
DataTypes.INT()).build(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 09bd6eea1..e8ece0779 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -29,6 +29,8 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -65,9 +67,11 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CompatibilityTestUtils;
+import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
@@ -95,7 +99,11 @@ import static 
org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine;
+import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -1670,6 +1678,62 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         // table-path/changelog/EARLIEST
     }
 
+    @ParameterizedTest
+    @EnumSource(CoreOptions.MergeEngine.class)
+    public void testForceLookupCompaction(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
+        Map<MergeEngine, Pair<Long, Long>> testData = new HashMap<>();
+        testData.put(DEDUPLICATE, Pair.of(50L, 100L));
+        testData.put(PARTIAL_UPDATE, Pair.of(null, 100L));
+        testData.put(AGGREGATE, Pair.of(30L, 70L));
+        testData.put(FIRST_ROW, Pair.of(100L, 70L));
+
+        Pair<Long, Long> currentTestData = testData.get(mergeEngine);
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set(CoreOptions.FORCE_LOOKUP, true);
+                            options.set(MERGE_ENGINE, mergeEngine);
+                            if (mergeEngine == AGGREGATE) {
+                                options.set("fields.b.aggregate-function", 
"sum");
+                            }
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        write.withIOManager(IOManager.create(tempDir.toString()));
+
+        // write data
+        write.write(rowData(1, 10, currentTestData.getLeft()));
+        commit.commit(1, write.prepareCommit(true, 1));
+        assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2L);
+
+        write.write(rowData(1, 10, currentTestData.getRight()));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+        assertThat(table.snapshotManager().snapshotCount()).isEqualTo(4L);
+        assertThat(table.snapshotManager().latestSnapshot())
+                .matches(snapshot -> snapshot.commitKind() == COMPACT);
+
+        // 3 data files + bucket-0 directory
+        List<java.nio.file.Path> files =
+                Files.walk(new File(tablePath.toUri().getPath(), 
"pt=1/bucket-0").toPath())
+                        .collect(Collectors.toList());
+        assertThat(files.size()).isEqualTo(4);
+
+        // 2 data files compact into 1 file
+        FileStoreScan scan = table.store().newScan().withKind(ScanMode.DELTA);
+        assertThat(scan.plan().files(FileKind.ADD).size()).isEqualTo(1);
+        assertThat(scan.plan().files(FileKind.DELETE).size()).isEqualTo(2);
+
+        // check result
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newRead();
+        assertThat(getResult(read, splits, binaryRow(1), 0, 
BATCH_ROW_TO_STRING))
+                .isEqualTo(
+                        Collections.singletonList(
+                                
"1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     private void assertReadChangelog(int id, FileStoreTable table) throws 
Exception {
         // read the changelog at #{id}
         table =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
index 9c814463c..fcb4c89eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
@@ -34,7 +34,7 @@ import java.util.Map;
 
 /**
  * {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
- * org.apache.paimon.CoreOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT} set to false.
+ * org.apache.paimon.CoreOptions#LOOKUP_WAIT} set to false.
  */
 public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {
 

Reply via email to