This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 77eb818744 [flink] fallback to full cache mode when partial-lookup is unavailable. (#5769) 77eb818744 is described below commit 77eb8187448446658191a7166ecd095be711dd0c Author: zhoulii <zhouli....@foxmail.com> AuthorDate: Thu Jun 19 13:12:15 2025 +0800 [flink] fallback to full cache mode when partial-lookup is unavailable. (#5769) --- .../apache/paimon/table/query/LocalTableQuery.java | 19 +------------ .../flink/lookup/FileStoreLookupFunction.java | 7 ++--- .../flink/lookup/PrimaryKeyPartialLookupTable.java | 17 +++++++++++ .../org/apache/paimon/flink/LookupJoinITCase.java | 33 ++++++++++++++++++++++ 4 files changed, 54 insertions(+), 22 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index d474d4e2d0..4b89a48464 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; -import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator; import static org.apache.paimon.mergetree.LookupFile.localFilePrefix; @@ -105,23 +104,7 @@ public class LocalTableQuery implements TableQuery { options.lookupCacheMaxMemory(), options.lookupCacheHighPrioPoolRatio()), new RowCompactedSerializer(keyType).createSliceComparator()); - - if (options.needLookup()) { - startLevel = 1; - } else { - if (options.sequenceField().size() > 0) { - throw new UnsupportedOperationException( - "Not support sequence field definition, but is: " - + options.sequenceField()); - } - - if (options.mergeEngine() != DEDUPLICATE) { - throw new UnsupportedOperationException( - "Only support deduplicate merge engine, but is: " + options.mergeEngine()); - } - - startLevel = 0; - } + startLevel = options.needLookup() ? 1 : 0; } public void refreshFiles( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index e3e15c4ce6..8faafa3571 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -30,7 +30,6 @@ import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.OutOfRangeException; import org.apache.paimon.types.RowType; @@ -199,11 +198,11 @@ public class FileStoreLookupFunction implements Serializable, Closeable { table, projection, path, joinKeys, getRequireCachedBucketIds()); LOG.info( "Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor."); - } catch (UnsupportedOperationException ignore) { + } catch (UnsupportedOperationException e) { LOG.info( "Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor " - + "because bucket mode isn't {}. Will create FullCacheLookupTable.", - BucketMode.HASH_FIXED); + + "because {}. Will create FullCacheLookupTable.", + e.getMessage()); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 43aeefc26a..c6da347893 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.lookup; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.Projection; @@ -78,6 +79,22 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { "Unsupported mode for partial lookup: " + bucketMode); } + CoreOptions coreOptions = CoreOptions.fromMap(table.options()); + + if (!coreOptions.needLookup() + && coreOptions.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE) { + throw new UnsupportedOperationException( + "Only support deduplicate merge engine when table does not need lookup, but merge engine is: " + + coreOptions.mergeEngine()); + } + + if (coreOptions.mergeEngine() == CoreOptions.MergeEngine.DEDUPLICATE + && !coreOptions.sequenceField().isEmpty()) { + throw new UnsupportedOperationException( + "Unsupported sequence fields definition for partial lookup when use deduplicate merge engine, but sequence fields are: " + + coreOptions.sequenceField()); + } + TableSchema schema = table.schema(); this.partitionFromPk = CodeGenUtils.newProjection( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 652ef5bd9c..2744650a8e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1108,4 +1108,37 @@ public class LookupJoinITCase extends CatalogITCaseBase { assertThat(result) .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), Row.of(3, 213)); } + + @Test + public void testFallbackCacheMode() throws Exception { + sql( + "CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH" + + " ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j', 'bucket' = '1')"); + sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)"); + + String query = + "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE for system_time as of T.proctime AS D ON T.i = D.i"; + BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2), (3)"); + List<Row> result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 22, 222, 2222), + Row.of(3, null, null, null)); + + sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 11, 444, 4444), (3, 33, 333, 3333)"); + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T VALUES (1), (2), (3), (4)"); + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 22, 222, 2222), // not change + Row.of(3, 33, 333, 3333), + Row.of(4, null, null, null)); + + iterator.close(); + } }