This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 77eb818744 [flink] fallback to full cache mode when partial-lookup is 
unavailable. (#5769)
77eb818744 is described below

commit 77eb8187448446658191a7166ecd095be711dd0c
Author: zhoulii <zhouli....@foxmail.com>
AuthorDate: Thu Jun 19 13:12:15 2025 +0800

    [flink] fallback to full cache mode when partial-lookup is unavailable. 
(#5769)
---
 .../apache/paimon/table/query/LocalTableQuery.java | 19 +------------
 .../flink/lookup/FileStoreLookupFunction.java      |  7 ++---
 .../flink/lookup/PrimaryKeyPartialLookupTable.java | 17 +++++++++++
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 33 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 22 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index d474d4e2d0..4b89a48464 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -55,7 +55,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
-import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
 import static org.apache.paimon.mergetree.LookupFile.localFilePrefix;
 
@@ -105,23 +104,7 @@ public class LocalTableQuery implements TableQuery {
                                 options.lookupCacheMaxMemory(),
                                 options.lookupCacheHighPrioPoolRatio()),
                         new 
RowCompactedSerializer(keyType).createSliceComparator());
-
-        if (options.needLookup()) {
-            startLevel = 1;
-        } else {
-            if (options.sequenceField().size() > 0) {
-                throw new UnsupportedOperationException(
-                        "Not support sequence field definition, but is: "
-                                + options.sequenceField());
-            }
-
-            if (options.mergeEngine() != DEDUPLICATE) {
-                throw new UnsupportedOperationException(
-                        "Only support deduplicate merge engine, but is: " + 
options.mergeEngine());
-            }
-
-            startLevel = 0;
-        }
+        startLevel = options.needLookup() ? 1 : 0;
     }
 
     public void refreshFiles(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index e3e15c4ce6..8faafa3571 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -30,7 +30,6 @@ import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.types.RowType;
@@ -199,11 +198,11 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                                     table, projection, path, joinKeys, 
getRequireCachedBucketIds());
                     LOG.info(
                             "Remote service isn't available. Created 
PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
-                } catch (UnsupportedOperationException ignore) {
+                } catch (UnsupportedOperationException e) {
                     LOG.info(
                             "Remote service isn't available. Cannot create 
PrimaryKeyPartialLookupTable with LocalQueryExecutor "
-                                    + "because bucket mode isn't {}. Will 
create FullCacheLookupTable.",
-                            BucketMode.HASH_FIXED);
+                                    + "because {}. Will create 
FullCacheLookupTable.",
+                            e.getMessage());
                 }
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 43aeefc26a..c6da347893 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.lookup;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
@@ -78,6 +79,22 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                     "Unsupported mode for partial lookup: " + bucketMode);
         }
 
+        CoreOptions coreOptions = CoreOptions.fromMap(table.options());
+
+        if (!coreOptions.needLookup()
+                && coreOptions.mergeEngine() != 
CoreOptions.MergeEngine.DEDUPLICATE) {
+            throw new UnsupportedOperationException(
+                    "Only support deduplicate merge engine when table does not 
need lookup, but merge engine is:  "
+                            + coreOptions.mergeEngine());
+        }
+
+        if (coreOptions.mergeEngine() == CoreOptions.MergeEngine.DEDUPLICATE
+                && !coreOptions.sequenceField().isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "Unsupported sequence fields definition for partial lookup 
when use deduplicate merge engine, but sequence fields are:  "
+                            + coreOptions.sequenceField());
+        }
+
         TableSchema schema = table.schema();
         this.partitionFromPk =
                 CodeGenUtils.newProjection(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 652ef5bd9c..2744650a8e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1108,4 +1108,37 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         assertThat(result)
                 .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), 
Row.of(3, 213));
     }
+
+    @Test
+    public void testFallbackCacheMode() throws Exception {
+        sql(
+                "CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT 
ENFORCED, j INT, k1 INT, k2 INT) WITH"
+                        + " ('continuous.discovery-interval'='1 ms', 
'sequence.field' = 'j', 'bucket' = '1')");
+        sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22, 
222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN 
DIM_WITH_SEQUENCE for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 11, 444, 4444), (3, 33, 
333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222), // not change
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
 }

Reply via email to