This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 440ad13ac [flink] refactor the code of the lookup and support 
computing the changelog generated by compact during read time. (#4110)
440ad13ac is described below

commit 440ad13ac8104ee90806f7f08a8bc9a2c6c95194
Author: liming.1018 <[email protected]>
AuthorDate: Tue Sep 24 11:02:14 2024 +0800

    [flink] refactor the code of the lookup and support computing the changelog 
generated by compact during read time. (#4110)
---
 .../paimon/table/source/DataTableStreamScan.java   |   4 +-
 .../org/apache/paimon/table/TableTestBase.java     |  13 +-
 .../lookup/CompactionDiffFollowUpScanner.java      |  51 ++++++
 .../flink/lookup/FileStoreLookupFunction.java      |   4 +-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |   2 +-
 .../lookup/IncrementalCompactDiffSplitRead.java    |  77 +++++++++
 .../paimon/flink/lookup/LookupCompactDiffRead.java |  87 ++++++++++
 .../paimon/flink/lookup/LookupDataTableScan.java   |  87 ++++++++++
 .../paimon/flink/lookup/LookupFileStoreTable.java  | 178 +++++++++++++++++++++
 .../paimon/flink/lookup/LookupStreamingReader.java |  31 +---
 .../flink/lookup/PrimaryKeyPartialLookupTable.java |  18 +--
 .../apache/paimon/flink/utils/TableScanUtils.java  |  23 +++
 .../paimon/flink/lookup/LookupTableTest.java       | 103 ++++++++++++
 13 files changed, 631 insertions(+), 47 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index f315bdfa9..4cd221996 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -219,7 +219,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
         return false;
     }
 
-    private FollowUpScanner createFollowUpScanner() {
+    protected FollowUpScanner createFollowUpScanner() {
         CoreOptions.StreamScanMode type =
                 options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
         switch (type) {
@@ -249,7 +249,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
         return followUpScanner;
     }
 
-    private BoundedChecker createBoundedChecker() {
+    protected BoundedChecker createBoundedChecker() {
         Long boundedWatermark = options.scanBoundedWatermark();
         return boundedWatermark != null
                 ? BoundedChecker.watermark(boundedWatermark)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index cc8fc98dd..eaaf8ca70 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -127,10 +127,21 @@ public abstract class TableTestBase {
     }
 
     protected void compact(Table table, BinaryRow partition, int bucket) 
throws Exception {
+        compact(table, partition, bucket, null, true);
+    }
+
+    protected void compact(
+            Table table,
+            BinaryRow partition,
+            int bucket,
+            IOManager ioManager,
+            boolean fullCompaction)
+            throws Exception {
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         try (BatchTableWrite write = writeBuilder.newWrite();
                 BatchTableCommit commit = writeBuilder.newCommit()) {
-            write.compact(partition, bucket, true);
+            write.withIOManager(ioManager);
+            write.compact(partition, bucket, fullCompaction);
             commit.commit(write.prepareCommit());
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionDiffFollowUpScanner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionDiffFollowUpScanner.java
new file mode 100644
index 000000000..fd909c596
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionDiffFollowUpScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.FollowUpScanner;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link FollowUpScanner} for read all changed files after compact. */
+public class CompactionDiffFollowUpScanner implements FollowUpScanner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactionDiffFollowUpScanner.class);
+
+    @Override
+    public boolean shouldScanSnapshot(Snapshot snapshot) {
+        if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+            return true;
+        }
+
+        LOG.debug(
+                "Next snapshot id {} is not COMPACT, but is {}, check next 
one.",
+                snapshot.id(),
+                snapshot.commitKind());
+        return false;
+    }
+
+    @Override
+    public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader 
snapshotReader) {
+        return 
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshot).readChanges();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 01ebbde20..4090193de 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -98,7 +98,9 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
 
     public FileStoreLookupFunction(
             Table table, int[] projection, int[] joinKeyIndex, @Nullable 
Predicate predicate) {
-        TableScanUtils.streamingReadingValidate(table);
+        if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
+            TableScanUtils.streamingReadingValidate(table);
+        }
 
         this.table = table;
         this.partitionLoader = DynamicPartitionLoader.of(table);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 28b0da0d1..15b82fbe4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -350,7 +350,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                 File tempPath,
                 List<String> joinKey,
                 @Nullable Set<Integer> requiredCachedBucketIds) {
-            this.table = table;
+            this.table = new LookupFileStoreTable(table, joinKey);
             this.projection = projection;
             this.tablePredicate = tablePredicate;
             this.projectedPredicate = projectedPredicate;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
new file mode 100644
index 000000000..fef74127e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.EmptyRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.splitread.IncrementalDiffSplitRead;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link SplitRead} for streaming incremental diff after compaction. */
+public class IncrementalCompactDiffSplitRead extends IncrementalDiffSplitRead {
+
+    public IncrementalCompactDiffSplitRead(MergeFileSplitRead mergeRead) {
+        super(mergeRead);
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
+        if (split.beforeFiles().stream().noneMatch(file -> file.level() == 0)) 
{
+            return new EmptyRecordReader<>();
+        }
+        return super.createReader(filterLevel0Files(split));
+    }
+
+    private DataSplit filterLevel0Files(DataSplit split) {
+        List<DataFileMeta> beforeFiles =
+                split.beforeFiles().stream()
+                        .filter(file -> file.level() > 0)
+                        .collect(Collectors.toList());
+        List<DataFileMeta> afterFiles =
+                split.dataFiles().stream()
+                        .filter(file -> file.level() > 0)
+                        .collect(Collectors.toList());
+        DataSplit.Builder builder =
+                new DataSplit.Builder()
+                        .withSnapshot(split.snapshotId())
+                        .withPartition(split.partition())
+                        .withBucket(split.bucket())
+                        .withBucketPath(split.bucketPath())
+                        .withBeforeFiles(beforeFiles)
+                        .withDataFiles(afterFiles)
+                        .isStreaming(split.isStreaming())
+                        .rawConvertible(split.rawConvertible());
+
+        if (split.beforeDeletionFiles().isPresent()) {
+            builder.withBeforeDeletionFiles(split.beforeDeletionFiles().get());
+        }
+        if (split.deletionFiles().isPresent()) {
+            builder.withDataDeletionFiles(split.deletionFiles().get());
+        }
+        return builder.build();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
new file mode 100644
index 000000000..71a8a73ad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.AbstractDataTableRead;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.IOException;
+
+import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
+
+/** An {@link InnerTableRead} that reads the data changed before and after 
compaction. */
+public class LookupCompactDiffRead extends AbstractDataTableRead<KeyValue> {
+    private final SplitRead<InternalRow> fullPhaseMergeRead;
+    private final SplitRead<InternalRow> incrementalDiffRead;
+
+    public LookupCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema 
schema) {
+        super(schema);
+        this.incrementalDiffRead = new 
IncrementalCompactDiffSplitRead(mergeRead);
+        this.fullPhaseMergeRead =
+                SplitRead.convert(mergeRead, split -> 
unwrap(mergeRead.createReader(split)));
+    }
+
+    @Override
+    public void projection(int[][] projection) {
+        fullPhaseMergeRead.withProjection(projection);
+        incrementalDiffRead.withProjection(projection);
+    }
+
+    @Override
+    public RecordReader<InternalRow> reader(Split split) throws IOException {
+        DataSplit dataSplit = (DataSplit) split;
+        if (dataSplit.beforeFiles().isEmpty()) {
+            return fullPhaseMergeRead.createReader(dataSplit); // full reading 
phase
+        } else {
+            return incrementalDiffRead.createReader((DataSplit) split);
+        }
+    }
+
+    @Override
+    protected InnerTableRead innerWithFilter(Predicate predicate) {
+        fullPhaseMergeRead.withFilter(predicate);
+        incrementalDiffRead.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public InnerTableRead forceKeepDelete() {
+        fullPhaseMergeRead.forceKeepDelete();
+        incrementalDiffRead.forceKeepDelete();
+        return this;
+    }
+
+    @Override
+    public TableRead withIOManager(IOManager ioManager) {
+        fullPhaseMergeRead.withIOManager(ioManager);
+        incrementalDiffRead.withIOManager(ioManager);
+        return this;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
new file mode 100644
index 000000000..fe064b80e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
+import org.apache.paimon.table.source.snapshot.FollowUpScanner;
+import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.SnapshotManager;
+
+import static org.apache.paimon.CoreOptions.StartupMode;
+import static 
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode;
+
+/**
+ * {@link org.apache.paimon.table.source.StreamTableScan} implementation for 
lookup streaming
+ * planning.
+ */
+public class LookupDataTableScan extends DataTableStreamScan {
+
+    private StartupMode startupMode;
+    private LookupStreamScanMode lookupScanMode;
+
+    public LookupDataTableScan(
+            CoreOptions options,
+            SnapshotReader snapshotReader,
+            SnapshotManager snapshotManager,
+            boolean supportStreamingReadOverwrite,
+            DefaultValueAssigner defaultValueAssigner,
+            LookupStreamScanMode lookupScanMode) {
+        super(
+                options,
+                snapshotReader,
+                snapshotManager,
+                supportStreamingReadOverwrite,
+                defaultValueAssigner);
+        this.startupMode = options.startupMode();
+        this.lookupScanMode = lookupScanMode;
+    }
+
+    @Override
+    protected StartingScanner createStartingScanner(boolean isStreaming) {
+        return startupMode != CoreOptions.StartupMode.COMPACTED_FULL
+                ? new FullStartingScanner(snapshotReader.snapshotManager())
+                : super.createStartingScanner(isStreaming);
+    }
+
+    @Override
+    protected FollowUpScanner createFollowUpScanner() {
+        switch (lookupScanMode) {
+            case CHANGELOG:
+                return super.createFollowUpScanner();
+            case FILE_MONITOR:
+                return new AllDeltaFollowUpScanner();
+            case COMPACT_DELTA_MONITOR:
+                return new CompactionDiffFollowUpScanner();
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown lookup stream scan mode: " + 
lookupScanMode.name());
+        }
+    }
+
+    @Override
+    protected BoundedChecker createBoundedChecker() {
+        return BoundedChecker.neverEnd(); // dim table should never end
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
new file mode 100644
index 000000000..bb5274bc6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.DelegatedFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.ReadBuilderImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.utils.SimpleFileReader;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static org.apache.paimon.options.description.TextElement.text;
+
+/** {@link FileStoreTable} for lookup table. */
+public class LookupFileStoreTable extends DelegatedFileStoreTable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final LookupStreamScanMode lookupScanMode;
+
+    public LookupFileStoreTable(FileStoreTable wrapped, List<String> joinKeys) 
{
+        super(wrapped);
+        this.lookupScanMode = lookupStreamScanMode(wrapped, joinKeys);
+    }
+
+    public LookupFileStoreTable(FileStoreTable wrapped, LookupStreamScanMode 
lookupScanMode) {
+        super(wrapped);
+        this.lookupScanMode = lookupScanMode;
+    }
+
+    @Override
+    public ReadBuilder newReadBuilder() {
+        return new ReadBuilderImpl(this);
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        switch (lookupScanMode) {
+            case CHANGELOG:
+            case FILE_MONITOR:
+                return wrapped.newRead();
+            case COMPACT_DELTA_MONITOR:
+                return new LookupCompactDiffRead(
+                        ((KeyValueFileStore) wrapped.store()).newRead(), 
wrapped.schema());
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown lookup stream scan mode: " + 
lookupScanMode.name());
+        }
+    }
+
+    @Override
+    public StreamDataTableScan newStreamScan() {
+        return new LookupDataTableScan(
+                wrapped.coreOptions(),
+                wrapped.newSnapshotReader(),
+                wrapped.snapshotManager(),
+                wrapped.supportStreamingReadOverwrite(),
+                DefaultValueAssigner.create(wrapped.schema()),
+                lookupScanMode);
+    }
+
+    @Override
+    public SimpleFileReader<ManifestFileMeta> manifestListReader() {
+        return wrapped.manifestListReader();
+    }
+
+    @Override
+    public SimpleFileReader<ManifestEntry> manifestFileReader() {
+        return wrapped.manifestFileReader();
+    }
+
+    @Override
+    public SimpleFileReader<IndexManifestEntry> indexManifestFileReader() {
+        return wrapped.indexManifestFileReader();
+    }
+
+    @Override
+    public FileStoreTable copy(Map<String, String> dynamicOptions) {
+        return new LookupFileStoreTable(wrapped.copy(dynamicOptions), 
lookupScanMode);
+    }
+
+    @Override
+    public FileStoreTable copy(TableSchema newTableSchema) {
+        return new LookupFileStoreTable(wrapped.copy(newTableSchema), 
lookupScanMode);
+    }
+
+    @Override
+    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        return new LookupFileStoreTable(wrapped.copy(dynamicOptions), 
lookupScanMode);
+    }
+
+    @Override
+    public FileStoreTable copyWithLatestSchema() {
+        return new LookupFileStoreTable(wrapped.copyWithLatestSchema(), 
lookupScanMode);
+    }
+
+    @Override
+    public FileStoreTable switchToBranch(String branchName) {
+        wrapped.switchToBranch(branchName);
+        return this;
+    }
+
+    private LookupStreamScanMode lookupStreamScanMode(FileStoreTable table, 
List<String> joinKeys) {
+        Options options = Options.fromMap(table.options());
+        if (options.get(LOOKUP_CACHE_MODE) == 
FlinkConnectorOptions.LookupCacheMode.AUTO
+                && new HashSet<>(table.primaryKeys()).equals(new 
HashSet<>(joinKeys))) {
+            return LookupStreamScanMode.FILE_MONITOR;
+        } else if (table.primaryKeys().size() > 0
+                && options.get(CHANGELOG_PRODUCER) == 
CoreOptions.ChangelogProducer.NONE
+                && TableScanUtils.supportCompactDiffStreamingReading(table)) {
+            return LookupStreamScanMode.COMPACT_DELTA_MONITOR;
+        } else {
+            return LookupStreamScanMode.CHANGELOG;
+        }
+    }
+
+    /** Inner stream scan mode for lookup table. */
+    public enum LookupStreamScanMode implements DescribedEnum {
+        CHANGELOG("changelog", "Streaming reading based on changelog or delta 
data files."),
+        FILE_MONITOR("file-monitor", "Monitor data file changes."),
+        COMPACT_DELTA_MONITOR(
+                "compact-delta-monitor",
+                "Streaming reading based on data changes before and after 
compaction.");
+
+        private final String value;
+        private final String description;
+
+        LookupStreamScanMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ceb40c1a8..fa9b7672d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -22,13 +22,10 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.SplitsParallelReadUtil;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -42,10 +39,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.IntUnaryOperator;
 import java.util.stream.IntStream;
@@ -62,20 +56,12 @@ public class LookupStreamingReader {
     @Nullable private final Predicate projectedPredicate;
     private final StreamTableScan scan;
 
-    private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
-            Arrays.asList(
-                    CoreOptions.SCAN_TIMESTAMP_MILLIS,
-                    CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
-                    CoreOptions.SCAN_SNAPSHOT_ID,
-                    CoreOptions.SCAN_TAG_NAME,
-                    CoreOptions.SCAN_VERSION);
-
     public LookupStreamingReader(
             Table table,
             int[] projection,
             @Nullable Predicate predicate,
             Set<Integer> requireCachedBucketIds) {
-        this.table = unsetTimeTravelOptions(table);
+        this.table = table;
         this.projection = projection;
         this.readBuilder =
                 this.table
@@ -112,21 +98,6 @@ public class LookupStreamingReader {
         }
     }
 
-    private Table unsetTimeTravelOptions(Table origin) {
-        FileStoreTable fileStoreTable = (FileStoreTable) origin;
-        Map<String, String> newOptions = new 
HashMap<>(fileStoreTable.options());
-        
TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove);
-
-        CoreOptions.StartupMode startupMode = 
CoreOptions.fromMap(newOptions).startupMode();
-        if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
-            startupMode = CoreOptions.StartupMode.LATEST_FULL;
-        }
-        newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
-
-        TableSchema newSchema = fileStoreTable.schema().copy(newOptions);
-        return fileStoreTable.copy(newSchema);
-    }
-
     public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws 
Exception {
         List<Split> splits = scan.plan().splits();
         CoreOptions options = CoreOptions.fromMap(table.options());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index bdf0a1b4a..967826e11 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -40,16 +40,10 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 
-import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
-import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
-import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
-
 /** Lookup table for primary key which supports to read the LSM tree directly. 
*/
 public class PrimaryKeyPartialLookupTable implements LookupTable {
 
@@ -158,7 +152,11 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         return new PrimaryKeyPartialLookupTable(
                 filter ->
                         new LocalQueryExecutor(
-                                table, projection, tempPath, filter, 
requireCachedBucketIds),
+                                new LookupFileStoreTable(table, joinKey),
+                                projection,
+                                tempPath,
+                                filter,
+                                requireCachedBucketIds),
                 table,
                 joinKey);
     }
@@ -192,12 +190,8 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                             
.withValueProjection(Projection.of(projection).toNestedIndexes())
                             .withIOManager(new 
IOManagerImpl(tempPath.toString()));
 
-            Map<String, String> dynamicOptions = new HashMap<>();
-            dynamicOptions.put(STREAM_SCAN_MODE.key(), 
FILE_MONITOR.getValue());
-            dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null);
             this.scan =
-                    table.copy(dynamicOptions)
-                            .newReadBuilder()
+                    table.newReadBuilder()
                             .withFilter(filter)
                             .withBucketFilter(
                                     requireCachedBucketIds == null
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index a5645302f..30b7bbdd5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -20,12 +20,15 @@ package org.apache.paimon.flink.utils;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.TableScan;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 
 /** Utility methods for {@link TableScan}, such as validating. */
 public class TableScanUtils {
@@ -59,4 +62,24 @@ public class TableScanUtils {
         }
         return Optional.empty();
     }
+
+    /**
+     * Check whether streaming reading is supported based on the data changed 
before and after
+     * compact.
+     */
+    public static boolean supportCompactDiffStreamingReading(Table table) {
+        CoreOptions options = CoreOptions.fromMap(table.options());
+        Set<CoreOptions.MergeEngine> compactDiffReadingEngine =
+                new HashSet<CoreOptions.MergeEngine>() {
+                    {
+                        add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
+                        add(CoreOptions.MergeEngine.AGGREGATE);
+                    }
+                };
+
+        return options.needLookup()
+                && compactDiffReadingEngine.contains(options.mergeEngine())
+                && !Options.fromMap(options.toMap())
+                        
.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 14643542e..619cb4c1d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
@@ -722,6 +723,108 @@ public class LookupTableTest extends TableTestBase {
         table.close();
     }
 
+    @Test
+    public void testFullCacheLookupTableWithForceLookup() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.MERGE_ENGINE, 
CoreOptions.MergeEngine.PARTIAL_UPDATE);
+        options.set(
+                FlinkConnectorOptions.LOOKUP_CACHE_MODE,
+                FlinkConnectorOptions.LookupCacheMode.FULL);
+        options.set(CoreOptions.WRITE_ONLY, true);
+        options.set(CoreOptions.FORCE_LOOKUP, true);
+        options.set(CoreOptions.BUCKET, 1);
+        FileStoreTable storeTable = createTable(singletonList("f0"), options);
+        FileStoreTable compactTable =
+                
storeTable.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
"false"));
+        FullCacheLookupTable.Context context =
+                new FullCacheLookupTable.Context(
+                        storeTable,
+                        new int[] {0, 1, 2},
+                        null,
+                        null,
+                        tempDir.toFile(),
+                        singletonList("f0"),
+                        null);
+        table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+
+        // initialize
+        write(storeTable, ioManager, GenericRow.of(1, 11, 111));
+        compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true);
+        table.open();
+
+        List<InternalRow> result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        // first write
+        write(storeTable, GenericRow.of(1, null, 222));
+        table.refresh();
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111); // old value because there is no 
compact
+
+        // only L0 occur compact
+        compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, false);
+        table.refresh();
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 222); // get new value after compact
+
+        // second write
+        write(storeTable, GenericRow.of(1, 22, null));
+        table.refresh();
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 222); // old value
+
+        // full compact
+        compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true);
+        table.refresh();
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222); // new value
+    }
+
+    @Test
+    public void testPartialLookupTableWithForceLookup() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.MERGE_ENGINE, 
CoreOptions.MergeEngine.PARTIAL_UPDATE);
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.NONE);
+        options.set(CoreOptions.FORCE_LOOKUP, true);
+        options.set(CoreOptions.BUCKET, 1);
+        FileStoreTable dimTable = createTable(singletonList("f0"), options);
+
+        PrimaryKeyPartialLookupTable table =
+                PrimaryKeyPartialLookupTable.createLocalTable(
+                        dimTable,
+                        new int[] {0, 1, 2},
+                        tempDir.toFile(),
+                        ImmutableList.of("f0"),
+                        null);
+        table.open();
+
+        List<InternalRow> result = table.get(row(1, -1));
+        assertThat(result).hasSize(0);
+
+        write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2, 
-2, 22));
+        result = table.get(row(1));
+        assertThat(result).hasSize(0);
+
+        table.refresh();
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, -1, 11);
+        result = table.get(row(2));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 2, -2, 22);
+
+        write(dimTable, ioManager, GenericRow.of(1, null, 111));
+        table.refresh();
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, -1, 111);
+    }
+
     private FileStoreTable createDimTable() throws Exception {
         FileIO fileIO = LocalFileIO.create();
         org.apache.paimon.fs.Path tablePath =


Reply via email to