This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new df2cf190e [flink] Introduce max_pt for Flink lookup join (#2926)
df2cf190e is described below

commit df2cf190e879e533e6139f6a76a74f76732c64dc
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 1 12:01:41 2024 +0800

    [flink] Introduce max_pt for Flink lookup join (#2926)
---
 docs/content/how-to/lookup-joins.md                |  30 ++++++
 .../generated/flink_connector_configuration.html   |  12 +++
 .../java/org/apache/paimon/data/JoinedRow.java     |   4 +
 .../apache/paimon/predicate/PredicateBuilder.java  |  17 ++++
 .../main/java/org/apache/paimon/types/RowType.java |  16 +++
 .../org/apache/paimon/utils/KeyProjectedRow.java   |   4 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |  15 +++
 .../flink/lookup/DynamicPartitionLoader.java       | 112 +++++++++++++++++++++
 .../flink/lookup/FileStoreLookupFunction.java      |  62 +++++++++++-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |  16 ++-
 .../apache/paimon/flink/lookup/LookupTable.java    |   3 +
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  12 ++-
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |  29 +++---
 .../flink/lookup/PrimaryKeyPartialLookupTable.java |  72 +++++++++----
 .../flink/lookup/SecondaryIndexLookupTable.java    |  16 ++-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |  38 +++++++
 .../flink/lookup/FileStoreLookupFunctionTest.java  |   4 +-
 .../paimon/flink/lookup/LookupTableTest.java       |  14 +++
 18 files changed, 433 insertions(+), 43 deletions(-)

diff --git a/docs/content/how-to/lookup-joins.md 
b/docs/content/how-to/lookup-joins.md
index e0dba4c59..ee0706c31 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -120,6 +120,36 @@ your streaming job may be blocked. You can try to use 
`audit_log` system table f
 (convert CDC stream to append stream).
 {{< /hint >}}
 
+## Dynamic Partition
+
+In traditional data warehouses, each partition often maintains the latest full 
data, so this partition table only 
+needs to join the latest partition. Paimon has specifically developed the 
`max_pt` feature for this scenario.
+
+**Create Paimon Partitioned Table**
+
+```sql
+CREATE TABLE customers (
+  id INT,
+  name STRING,
+  country STRING,
+  zip STRING,
+  dt STRING,
+  PRIMARY KEY (id, dt) NOT ENFORCED
+) PARTITIONED BY (dt);
+```
+
+**Lookup Join**
+
+```sql
+SELECT o.order_id, o.total, c.country, c.zip
+FROM orders AS o
+JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 
'lookup.dynamic-partition.refresh-interval'='1 h') */
+FOR SYSTEM_TIME AS OF o.proc_time AS c
+ON o.customer_id = c.id;
+```
+
+The Lookup node will automatically refresh the latest partition and query the 
data of the latest partition.
+
 ## Query Service
 
 You can run a Flink Streaming Job to start query service for the table. When 
QueryService exists, Flink Lookup Join
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 099d9127c..f02046f31 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -56,6 +56,18 @@ under the License.
             <td><p>Enum</p></td>
             <td>The cache mode of lookup join.<br /><br />Possible 
values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>lookup.dynamic-partition</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specific dynamic partition for lookup, only support 'max_pt()' 
currently.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.dynamic-partition.refresh-interval</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>Specific dynamic partition refresh interval for lookup, scan 
all partitions and obtain corresponding partition.</td>
+        </tr>
         <tr>
             <td><h5>scan.infer-parallelism</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
index 526886598..3650000b6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
@@ -75,6 +75,10 @@ public class JoinedRow implements InternalRow {
         this.row2 = row2;
     }
 
+    public static JoinedRow join(InternalRow row1, InternalRow row2) {
+        return new JoinedRow(row1, row2);
+    }
+
     /**
      * Replaces the {@link InternalRow} backing this {@link JoinedRow}.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index ee84d59d8..f6410adca 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -46,8 +46,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
 
@@ -161,6 +163,21 @@ public class PredicateBuilder {
                 .get();
     }
 
+    @Nullable
+    public static Predicate andNullable(Predicate... predicates) {
+        return andNullable(Arrays.asList(predicates));
+    }
+
+    @Nullable
+    public static Predicate andNullable(List<Predicate> predicates) {
+        predicates = 
predicates.stream().filter(Objects::nonNull).collect(Collectors.toList());
+        if (predicates.isEmpty()) {
+            return null;
+        }
+
+        return and(predicates);
+    }
+
     public static Predicate or(Predicate... predicates) {
         return or(Arrays.asList(predicates));
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 692c60e51..1462d330e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -27,6 +27,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -184,6 +185,21 @@ public final class RowType extends DataType {
         return new RowType(newFields);
     }
 
+    public RowType project(int[] mapping) {
+        List<DataField> fields = getFields();
+        return new RowType(
+                
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
+    }
+
+    public RowType project(List<String> names) {
+        List<DataField> fields = getFields();
+        List<String> fieldNames = 
fields.stream().map(DataField::name).collect(Collectors.toList());
+        return new RowType(
+                names.stream()
+                        .map(k -> fields.get(fieldNames.indexOf(k)))
+                        .collect(Collectors.toList()));
+    }
+
     public static RowType of(DataType... types) {
         final List<DataField> fields = new ArrayList<>();
         for (int i = 0; i < types.length; i++) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
index ea11105bb..01d9c313a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
@@ -44,6 +44,10 @@ public class KeyProjectedRow implements InternalRow {
         return this;
     }
 
+    public int[] indexMapping() {
+        return indexMapping;
+    }
+
     @Override
     public int getFieldCount() {
         return indexMapping.length;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index ae6451916..d98709f05 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -293,6 +293,21 @@ public class FlinkConnectorOptions {
                     .defaultValue(LookupCacheMode.AUTO)
                     .withDescription("The cache mode of lookup join.");
 
+    public static final ConfigOption<String> LOOKUP_DYNAMIC_PARTITION =
+            ConfigOptions.key("lookup.dynamic-partition")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specific dynamic partition for lookup, only 
support 'max_pt()' currently.");
+
+    public static final ConfigOption<Duration> 
LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
+            ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "Specific dynamic partition refresh interval for 
lookup, "
+                                    + "scan all partitions and obtain 
corresponding partition.");
+
     public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
             ConfigOptions.key("sink.savepoint.auto-tag")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
new file mode 100644
index 000000000..26c172621
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Dynamic partition for lookup. */
+public class DynamicPartitionLoader implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String MAX_PT = "max_pt()";
+
+    private final Table table;
+    private final Duration refreshInterval;
+
+    private TableScan scan;
+    private Comparator<InternalRow> comparator;
+
+    private LocalDateTime lastRefresh;
+    private BinaryRow partition;
+
+    private DynamicPartitionLoader(Table table, Duration refreshInterval) {
+        this.table = table;
+        this.refreshInterval = refreshInterval;
+    }
+
+    public void open() {
+        this.scan = table.newReadBuilder().newScan();
+        RowType partitionType = table.rowType().project(table.partitionKeys());
+        this.comparator =
+                
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes(), "Partition");
+    }
+
+    public void addJoinKeys(List<String> joinKeys) {
+        List<String> partitionKeys = table.partitionKeys();
+        checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains));
+        joinKeys.addAll(partitionKeys);
+    }
+
+    @Nullable
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    /** @return true if partition changed. */
+    public boolean checkRefresh() {
+        if (lastRefresh != null
+                && 
!lastRefresh.plus(refreshInterval).isBefore(LocalDateTime.now())) {
+            return false;
+        }
+
+        BinaryRow previous = this.partition;
+        partition = 
scan.listPartitions().stream().max(comparator).orElse(null);
+        lastRefresh = LocalDateTime.now();
+
+        return !Objects.equals(previous, partition);
+    }
+
+    @Nullable
+    public static DynamicPartitionLoader of(Table table) {
+        Options options = Options.fromMap(table.options());
+        String dynamicPartition = options.get(LOOKUP_DYNAMIC_PARTITION);
+        if (dynamicPartition == null) {
+            return null;
+        }
+
+        if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) {
+            throw new UnsupportedOperationException(
+                    "Unsupported dynamic partition pattern: " + 
dynamicPartition);
+        }
+
+        Duration refresh =
+                
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
+        return new DynamicPartitionLoader(table, refresh);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 77ea719a2..a321da634 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -19,17 +19,22 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.OutOfRangeException;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.InternalRowUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -51,6 +56,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.UUID;
@@ -73,6 +79,7 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreLookupFunction.class);
 
     private final Table table;
+    @Nullable private final DynamicPartitionLoader partitionLoader;
     private final List<String> projectFields;
     private final List<String> joinKeys;
     @Nullable private final Predicate predicate;
@@ -89,6 +96,7 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
         TableScanUtils.streamingReadingValidate(table);
 
         this.table = table;
+        this.partitionLoader = DynamicPartitionLoader.of(table);
 
         // join keys are based on projection fields
         this.joinKeys =
@@ -96,6 +104,10 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                         .mapToObj(i -> 
table.rowType().getFieldNames().get(projection[i]))
                         .collect(Collectors.toList());
 
+        if (partitionLoader != null) {
+            partitionLoader.addJoinKeys(joinKeys);
+        }
+
         this.projectFields =
                 Arrays.stream(projection)
                         .mapToObj(i -> table.rowType().getFieldNames().get(i))
@@ -126,6 +138,10 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     }
 
     private void open() throws Exception {
+        if (partitionLoader != null) {
+            partitionLoader.open();
+        }
+
         this.nextLoadTime = -1;
 
         Options options = Options.fromMap(table.options());
@@ -165,6 +181,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             this.lookupTable = FullCacheLookupTable.create(context, 
options.get(LOOKUP_CACHE_ROWS));
         }
 
+        refreshDynamicPartition(false);
         lookupTable.open();
     }
 
@@ -187,7 +204,17 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     public Collection<RowData> lookup(RowData keyRow) {
         try {
             checkRefresh();
-            List<InternalRow> results = lookupTable.get(new 
FlinkRowWrapper(keyRow));
+
+            InternalRow key = new FlinkRowWrapper(keyRow);
+            if (partitionLoader != null) {
+                InternalRow partition = refreshDynamicPartition(true);
+                if (partition == null) {
+                    return Collections.emptyList();
+                }
+                key = JoinedRow.join(key, partition);
+            }
+
+            List<InternalRow> results = lookupTable.get(key);
             List<RowData> rows = new ArrayList<>(results.size());
             for (InternalRow matchedRow : results) {
                 rows.add(new FlinkRowData(matchedRow));
@@ -201,6 +228,39 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
     }
 
+    @Nullable
+    private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception 
{
+        if (partitionLoader == null) {
+            return null;
+        }
+
+        boolean partitionChanged = partitionLoader.checkRefresh();
+        BinaryRow partition = partitionLoader.partition();
+        
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+
+        if (partitionChanged && reopen) {
+            lookupTable.close();
+            lookupTable.open();
+        }
+
+        return partition;
+    }
+
+    private Predicate createSpecificPartFilter(BinaryRow partition) {
+        RowType rowType = table.rowType();
+        List<String> fieldNames = rowType.getFieldNames();
+        List<String> partitionKeys = table.partitionKeys();
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        List<Predicate> predicates = new ArrayList<>();
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            int index = fieldNames.indexOf(partitionKeys.get(i));
+            predicates.add(
+                    builder.equal(
+                            index, InternalRowUtils.get(partition, i, 
rowType.getTypeAt(index))));
+        }
+        return PredicateBuilder.and(predicates);
+    }
+
     private void reopen() {
         try {
             close();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index d6ccacf87..7f9b019bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -27,11 +27,13 @@ import org.apache.paimon.lookup.BulkLoader;
 import org.apache.paimon.lookup.RocksDBState;
 import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.MutableObjectIterator;
 import org.apache.paimon.utils.PartialRow;
 import org.apache.paimon.utils.TypeUtils;
@@ -53,9 +55,11 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     protected final Context context;
     protected final RocksDBStateFactory stateFactory;
     protected final RowType projectedType;
-    private final LookupStreamingReader reader;
     private final boolean sequenceFieldEnabled;
 
+    private LookupStreamingReader reader;
+    private Predicate specificPartition;
+
     public FullCacheLookupTable(Context context) throws IOException {
         this.context = context;
         this.stateFactory =
@@ -64,7 +68,6 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                         context.table.coreOptions().toConfiguration(),
                         null);
         FileStoreTable table = context.table;
-        this.reader = new LookupStreamingReader(table, context.projection, 
context.tablePredicate);
         this.sequenceFieldEnabled =
                 table.primaryKeys().size() > 0
                         && new 
CoreOptions(table.options()).sequenceField().isPresent();
@@ -75,8 +78,16 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         this.projectedType = projectedType;
     }
 
+    @Override
+    public void specificPartitionFilter(Predicate filter) {
+        this.specificPartition = filter;
+    }
+
     @Override
     public void open() throws Exception {
+        Predicate scanPredicate =
+                PredicateBuilder.andNullable(context.tablePredicate, 
specificPartition);
+        this.reader = new LookupStreamingReader(context.table, 
context.projection, scanPredicate);
         BinaryExternalSortBuffer bulkLoadSorter =
                 RocksDBState.createBulkLoadSorter(
                         IOManager.create(context.tempPath.toString()), 
context.table.coreOptions());
@@ -155,6 +166,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     @Override
     public void close() throws IOException {
         stateFactory.close();
+        FileIOUtils.deleteDirectory(context.tempPath);
     }
 
     /** Bulk loader for the table. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index e89259073..c13947bbd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -27,6 +28,8 @@ import java.util.List;
 /** A lookup table which provides get and refresh. */
 public interface LookupTable extends Closeable {
 
+    void specificPartitionFilter(Predicate filter);
+
     void open() throws Exception;
 
     List<InternalRow> get(InternalRow key) throws IOException;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index cd1c5e7a4..b931d18b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -36,22 +36,30 @@ import java.util.List;
 /** A {@link LookupTable} for table without primary key. */
 public class NoPrimaryKeyLookupTable extends FullCacheLookupTable {
 
-    private final RocksDBListState<InternalRow, InternalRow> state;
+    private final long lruCacheSize;
 
     private final KeyProjectedRow joinKeyRow;
 
+    private RocksDBListState<InternalRow, InternalRow> state;
+
     public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws 
IOException {
         super(context);
+        this.lruCacheSize = lruCacheSize;
         List<String> fieldNames = projectedType.getFieldNames();
         int[] joinKeyMapping = 
context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
         this.joinKeyRow = new KeyProjectedRow(joinKeyMapping);
+    }
+
+    @Override
+    public void open() throws Exception {
         this.state =
                 stateFactory.listState(
                         "join-key-index",
                         InternalSerializers.create(
-                                TypeUtils.project(projectedType, 
joinKeyMapping)),
+                                TypeUtils.project(projectedType, 
joinKeyRow.indexMapping())),
                         InternalSerializers.create(projectedType),
                         lruCacheSize);
+        super.open();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index fbec4d443..97034070c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -39,31 +39,24 @@ import java.util.List;
 /** A {@link LookupTable} for primary key table. */
 public class PrimaryKeyLookupTable extends FullCacheLookupTable {
 
-    protected final RocksDBValueState<InternalRow, InternalRow> tableState;
-
-    protected int[] primaryKeyMapping;
+    protected final long lruCacheSize;
 
     protected final KeyProjectedRow primaryKeyRow;
 
     @Nullable private final ProjectedRow keyRearrange;
 
+    protected RocksDBValueState<InternalRow, InternalRow> tableState;
+
     public PrimaryKeyLookupTable(Context context, long lruCacheSize, 
List<String> joinKey)
             throws IOException {
         super(context);
+        this.lruCacheSize = lruCacheSize;
         List<String> fieldNames = projectedType.getFieldNames();
         FileStoreTable table = context.table;
-        this.primaryKeyMapping =
+        int[] primaryKeyMapping =
                 
table.primaryKeys().stream().mapToInt(fieldNames::indexOf).toArray();
         this.primaryKeyRow = new KeyProjectedRow(primaryKeyMapping);
 
-        this.tableState =
-                stateFactory.valueState(
-                        "table",
-                        InternalSerializers.create(
-                                TypeUtils.project(projectedType, 
primaryKeyMapping)),
-                        InternalSerializers.create(projectedType),
-                        lruCacheSize);
-
         ProjectedRow keyRearrange = null;
         if (!table.primaryKeys().equals(joinKey)) {
             keyRearrange =
@@ -76,6 +69,18 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
         this.keyRearrange = keyRearrange;
     }
 
+    @Override
+    public void open() throws Exception {
+        this.tableState =
+                stateFactory.valueState(
+                        "table",
+                        InternalSerializers.create(
+                                TypeUtils.project(projectedType, 
primaryKeyRow.indexMapping())),
+                        InternalSerializers.create(projectedType),
+                        lruCacheSize);
+        super.open();
+    }
+
     @Override
     public List<InternalRow> innerGet(InternalRow key) throws IOException {
         if (keyRearrange != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 690cca544..7082a670e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.query.RemoteTableQuery;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.query.LocalTableQuery;
@@ -42,6 +43,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
 import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
@@ -50,17 +52,19 @@ import static 
org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
 /** Lookup table for primary key which supports to read the LSM tree directly. 
*/
 public class PrimaryKeyPartialLookupTable implements LookupTable {
 
-    private final QueryExecutor queryExecutor;
+    private final Function<Predicate, QueryExecutor> executorFactory;
     private final FixedBucketFromPkExtractor extractor;
     @Nullable private final ProjectedRow keyRearrange;
+    @Nullable private final ProjectedRow trimmedKeyRearrange;
+
+    private Predicate specificPartition;
+    private QueryExecutor queryExecutor;
 
     private PrimaryKeyPartialLookupTable(
-            QueryExecutor queryExecutor, FileStoreTable table, List<String> 
joinKey) {
-        this.queryExecutor = queryExecutor;
-        if (table.partitionKeys().size() > 0) {
-            throw new UnsupportedOperationException(
-                    "The partitioned table are not supported in partial cache 
mode.");
-        }
+            Function<Predicate, QueryExecutor> executorFactory,
+            FileStoreTable table,
+            List<String> joinKey) {
+        this.executorFactory = executorFactory;
 
         if (table.bucketMode() != BucketMode.FIXED) {
             throw new UnsupportedOperationException(
@@ -79,6 +83,18 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                                     .toArray());
         }
         this.keyRearrange = keyRearrange;
+
+        List<String> trimmedPrimaryKeys = table.schema().trimmedPrimaryKeys();
+        ProjectedRow trimmedKeyRearrange = null;
+        if (!trimmedPrimaryKeys.equals(joinKey)) {
+            trimmedKeyRearrange =
+                    ProjectedRow.from(
+                            trimmedPrimaryKeys.stream()
+                                    .map(joinKey::indexOf)
+                                    .mapToInt(value -> value)
+                                    .toArray());
+        }
+        this.trimmedKeyRearrange = trimmedKeyRearrange;
     }
 
     @VisibleForTesting
@@ -86,21 +102,33 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         return queryExecutor;
     }
 
+    @Override
+    public void specificPartitionFilter(Predicate filter) {
+        this.specificPartition = filter;
+    }
+
     @Override
     public void open() throws Exception {
+        this.queryExecutor = executorFactory.apply(specificPartition);
         refresh();
     }
 
     @Override
     public List<InternalRow> get(InternalRow key) throws IOException {
+        InternalRow adjustedKey = key;
         if (keyRearrange != null) {
-            key = keyRearrange.replaceRow(key);
+            adjustedKey = keyRearrange.replaceRow(adjustedKey);
         }
-        extractor.setRecord(key);
+        extractor.setRecord(adjustedKey);
         int bucket = extractor.bucket();
         BinaryRow partition = extractor.partition();
 
-        InternalRow kv = queryExecutor.lookup(partition, bucket, key);
+        InternalRow trimmedKey = key;
+        if (trimmedKeyRearrange != null) {
+            trimmedKey = trimmedKeyRearrange.replaceRow(trimmedKey);
+        }
+
+        InternalRow kv = queryExecutor.lookup(partition, bucket, trimmedKey);
         if (kv == null) {
             return Collections.emptyList();
         } else {
@@ -115,19 +143,23 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
 
     @Override
     public void close() throws IOException {
-        queryExecutor.close();
+        if (queryExecutor != null) {
+            queryExecutor.close();
+        }
     }
 
     public static PrimaryKeyPartialLookupTable createLocalTable(
             FileStoreTable table, int[] projection, File tempPath, 
List<String> joinKey) {
-        LocalQueryExecutor queryExecutor = new LocalQueryExecutor(table, 
projection, tempPath);
-        return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey);
+        return new PrimaryKeyPartialLookupTable(
+                filter -> new LocalQueryExecutor(table, projection, tempPath, 
filter),
+                table,
+                joinKey);
     }
 
     public static PrimaryKeyPartialLookupTable createRemoteTable(
             FileStoreTable table, int[] projection, List<String> joinKey) {
-        RemoveQueryExecutor queryExecutor = new RemoveQueryExecutor(table, 
projection);
-        return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey);
+        return new PrimaryKeyPartialLookupTable(
+                filter -> new RemoteQueryExecutor(table, projection), table, 
joinKey);
     }
 
     interface QueryExecutor extends Closeable {
@@ -142,7 +174,8 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         private final LocalTableQuery tableQuery;
         private final StreamTableScan scan;
 
-        private LocalQueryExecutor(FileStoreTable table, int[] projection, 
File tempPath) {
+        private LocalQueryExecutor(
+                FileStoreTable table, int[] projection, File tempPath, 
@Nullable Predicate filter) {
             this.tableQuery =
                     table.newLocalTableQuery()
                             
.withValueProjection(Projection.of(projection).toNestedIndexes())
@@ -151,7 +184,8 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
             Map<String, String> dynamicOptions = new HashMap<>();
             dynamicOptions.put(STREAM_SCAN_MODE.key(), 
FILE_MONITOR.getValue());
             dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null);
-            this.scan = 
table.copy(dynamicOptions).newReadBuilder().newStreamScan();
+            this.scan =
+                    
table.copy(dynamicOptions).newReadBuilder().withFilter(filter).newStreamScan();
         }
 
         @Override
@@ -189,11 +223,11 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         }
     }
 
-    static class RemoveQueryExecutor implements QueryExecutor {
+    static class RemoteQueryExecutor implements QueryExecutor {
 
         private final RemoteTableQuery tableQuery;
 
-        private RemoveQueryExecutor(FileStoreTable table, int[] projection) {
+        private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
             this.tableQuery = new 
RemoteTableQuery(table).withValueProjection(projection);
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index d95481a32..d0731ebdf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -34,22 +34,28 @@ import java.util.List;
 /** A {@link LookupTable} for primary key table which provides lookup by 
secondary key. */
 public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
 
-    private final RocksDBSetState<InternalRow, InternalRow> indexState;
-
     private final KeyProjectedRow secKeyRow;
 
+    private RocksDBSetState<InternalRow, InternalRow> indexState;
+
     public SecondaryIndexLookupTable(Context context, long lruCacheSize) 
throws IOException {
         super(context, lruCacheSize / 2, context.table.primaryKeys());
         List<String> fieldNames = projectedType.getFieldNames();
         int[] secKeyMapping = 
context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
         this.secKeyRow = new KeyProjectedRow(secKeyMapping);
+    }
+
+    @Override
+    public void open() throws Exception {
         this.indexState =
                 stateFactory.setState(
                         "sec-index",
-                        
InternalSerializers.create(TypeUtils.project(projectedType, secKeyMapping)),
                         InternalSerializers.create(
-                                TypeUtils.project(projectedType, 
primaryKeyMapping)),
-                        lruCacheSize / 2);
+                                TypeUtils.project(projectedType, 
secKeyRow.indexMapping())),
+                        InternalSerializers.create(
+                                TypeUtils.project(projectedType, 
primaryKeyRow.indexMapping())),
+                        lruCacheSize);
+        super.open();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 5fc9af74b..a11e7887c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -641,6 +641,44 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         iterator.close();
     }
 
+    @Test
+    public void testLookupMaxPtPartitionedTablePartialCache() throws Exception 
{
+        innerTestLookupMaxPtPartitionedTable(LookupCacheMode.AUTO);
+    }
+
+    @Test
+    public void testLookupMaxPtPartitionedTableFullCache() throws Exception {
+        innerTestLookupMaxPtPartitionedTable(LookupCacheMode.FULL);
+    }
+
+    private void innerTestLookupMaxPtPartitionedTable(LookupCacheMode mode) 
throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, 
PRIMARY KEY (pt, k) NOT ENFORCED)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'bucket' = '1', "
+                        + "'lookup.dynamic-partition' = 'max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + String.format("'lookup.cache' = '%s', ", mode)
+                        + "'continuous.discovery-interval'='1 ms')");
+        String query =
+                "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for 
system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 2)");
+        Thread.sleep(2000); // wait refresh
+        sql("INSERT INTO T VALUES (1)");
+        List<Row> result = iterator.collect(1);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2));
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 3)");
+        Thread.sleep(2000); // wait refresh
+        sql("INSERT INTO T VALUES (1)");
+        result = iterator.collect(1);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 3));
+
+        iterator.close();
+    }
+
     @Test
     public void testLookupNonPkAppendTable() throws Exception {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index 61dc4e230..c21989ee8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor;
-import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoveQueryExecutor;
+import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoteQueryExecutor;
 import org.apache.paimon.lookup.RocksDBOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -142,7 +142,7 @@ public class FileStoreLookupFunctionTest {
         
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
         QueryExecutor queryExecutor =
                 ((PrimaryKeyPartialLookupTable) 
lookupFunction.lookupTable()).queryExecutor();
-        assertThat(queryExecutor).isInstanceOf(RemoveQueryExecutor.class);
+        assertThat(queryExecutor).isInstanceOf(RemoteQueryExecutor.class);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 9af88502a..3af6b722d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -116,6 +116,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f0"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         // test bulk load error
         {
@@ -174,6 +175,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f0"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         for (int i = 1; i <= 10; i++) {
@@ -218,6 +220,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f0"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         table.refresh(singletonList(sequence(row(1, 11, 111), 
-1L)).iterator(), false);
         List<InternalRow> result = table.get(row(1));
@@ -249,6 +252,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f0"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         table.refresh(singletonList(sequence(row(1, 11, 111), 
-1L)).iterator(), false);
         List<InternalRow> result = table.get(row(1));
@@ -272,6 +276,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f1"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         // test bulk load 100_000 records
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -316,6 +321,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f1"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         Random rnd = new Random();
@@ -370,6 +376,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f1"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         table.refresh(singletonList(sequence(row(1, 11, 111), 
-1L)).iterator(), false);
         List<InternalRow> result = table.get(row(11));
@@ -410,6 +417,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f1"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         // test bulk load 100_000 records
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -452,6 +460,7 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         singletonList("f1"));
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
 
         table.refresh(singletonList(row(1, 11, 333)).iterator(), false);
         List<InternalRow> result = table.get(row(11));
@@ -478,6 +487,8 @@ public class LookupTableTest extends TableTestBase {
                         new int[] {0, 1, 2},
                         tempDir.toFile(),
                         ImmutableList.of("pk1", "pk2"));
+        table.open();
+
         List<InternalRow> result = table.get(row(1, -1));
         assertThat(result).hasSize(0);
 
@@ -508,6 +519,7 @@ public class LookupTableTest extends TableTestBase {
                         new int[] {2, 1},
                         tempDir.toFile(),
                         ImmutableList.of("pk1", "pk2"));
+        table.open();
         List<InternalRow> result = table.get(row(1, -1));
         assertThat(result).hasSize(0);
 
@@ -533,6 +545,8 @@ public class LookupTableTest extends TableTestBase {
                         new int[] {2, 1},
                         tempDir.toFile(),
                         ImmutableList.of("pk2", "pk1"));
+        table.open();
+
         List<InternalRow> result = table.get(row(-1, 1));
         assertThat(result).hasSize(0);
 


Reply via email to