This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 440ad13ac [flink] refactor the code of the lookup and support
computing the changelog generated by compact during read time. (#4110)
440ad13ac is described below
commit 440ad13ac8104ee90806f7f08a8bc9a2c6c95194
Author: liming.1018 <[email protected]>
AuthorDate: Tue Sep 24 11:02:14 2024 +0800
[flink] refactor the code of the lookup and support computing the changelog
generated by compact during read time. (#4110)
---
.../paimon/table/source/DataTableStreamScan.java | 4 +-
.../org/apache/paimon/table/TableTestBase.java | 13 +-
.../lookup/CompactionDiffFollowUpScanner.java | 51 ++++++
.../flink/lookup/FileStoreLookupFunction.java | 4 +-
.../paimon/flink/lookup/FullCacheLookupTable.java | 2 +-
.../lookup/IncrementalCompactDiffSplitRead.java | 77 +++++++++
.../paimon/flink/lookup/LookupCompactDiffRead.java | 87 ++++++++++
.../paimon/flink/lookup/LookupDataTableScan.java | 87 ++++++++++
.../paimon/flink/lookup/LookupFileStoreTable.java | 178 +++++++++++++++++++++
.../paimon/flink/lookup/LookupStreamingReader.java | 31 +---
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 18 +--
.../apache/paimon/flink/utils/TableScanUtils.java | 23 +++
.../paimon/flink/lookup/LookupTableTest.java | 103 ++++++++++++
13 files changed, 631 insertions(+), 47 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index f315bdfa9..4cd221996 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -219,7 +219,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
return false;
}
- private FollowUpScanner createFollowUpScanner() {
+ protected FollowUpScanner createFollowUpScanner() {
CoreOptions.StreamScanMode type =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
switch (type) {
@@ -249,7 +249,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
return followUpScanner;
}
- private BoundedChecker createBoundedChecker() {
+ protected BoundedChecker createBoundedChecker() {
Long boundedWatermark = options.scanBoundedWatermark();
return boundedWatermark != null
? BoundedChecker.watermark(boundedWatermark)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index cc8fc98dd..eaaf8ca70 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -127,10 +127,21 @@ public abstract class TableTestBase {
}
protected void compact(Table table, BinaryRow partition, int bucket)
throws Exception {
+ compact(table, partition, bucket, null, true);
+ }
+
+ protected void compact(
+ Table table,
+ BinaryRow partition,
+ int bucket,
+ IOManager ioManager,
+ boolean fullCompaction)
+ throws Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
- write.compact(partition, bucket, true);
+ write.withIOManager(ioManager);
+ write.compact(partition, bucket, fullCompaction);
commit.commit(write.prepareCommit());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionDiffFollowUpScanner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionDiffFollowUpScanner.java
new file mode 100644
index 000000000..fd909c596
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/CompactionDiffFollowUpScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.FollowUpScanner;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link FollowUpScanner} for read all changed files after compact. */
+public class CompactionDiffFollowUpScanner implements FollowUpScanner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionDiffFollowUpScanner.class);
+
+ @Override
+ public boolean shouldScanSnapshot(Snapshot snapshot) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ return true;
+ }
+
+ LOG.debug(
+ "Next snapshot id {} is not COMPACT, but is {}, check next
one.",
+ snapshot.id(),
+ snapshot.commitKind());
+ return false;
+ }
+
+ @Override
+ public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader
snapshotReader) {
+ return
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshot).readChanges();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 01ebbde20..4090193de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -98,7 +98,9 @@ public class FileStoreLookupFunction implements Serializable,
Closeable {
public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable
Predicate predicate) {
- TableScanUtils.streamingReadingValidate(table);
+ if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
+ TableScanUtils.streamingReadingValidate(table);
+ }
this.table = table;
this.partitionLoader = DynamicPartitionLoader.of(table);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 28b0da0d1..15b82fbe4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -350,7 +350,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
File tempPath,
List<String> joinKey,
@Nullable Set<Integer> requiredCachedBucketIds) {
- this.table = table;
+ this.table = new LookupFileStoreTable(table, joinKey);
this.projection = projection;
this.tablePredicate = tablePredicate;
this.projectedPredicate = projectedPredicate;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
new file mode 100644
index 000000000..fef74127e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.EmptyRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.splitread.IncrementalDiffSplitRead;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link SplitRead} for streaming incremental diff after compaction. */
+public class IncrementalCompactDiffSplitRead extends IncrementalDiffSplitRead {
+
+ public IncrementalCompactDiffSplitRead(MergeFileSplitRead mergeRead) {
+ super(mergeRead);
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
+ if (split.beforeFiles().stream().noneMatch(file -> file.level() == 0))
{
+ return new EmptyRecordReader<>();
+ }
+ return super.createReader(filterLevel0Files(split));
+ }
+
+ private DataSplit filterLevel0Files(DataSplit split) {
+ List<DataFileMeta> beforeFiles =
+ split.beforeFiles().stream()
+ .filter(file -> file.level() > 0)
+ .collect(Collectors.toList());
+ List<DataFileMeta> afterFiles =
+ split.dataFiles().stream()
+ .filter(file -> file.level() > 0)
+ .collect(Collectors.toList());
+ DataSplit.Builder builder =
+ new DataSplit.Builder()
+ .withSnapshot(split.snapshotId())
+ .withPartition(split.partition())
+ .withBucket(split.bucket())
+ .withBucketPath(split.bucketPath())
+ .withBeforeFiles(beforeFiles)
+ .withDataFiles(afterFiles)
+ .isStreaming(split.isStreaming())
+ .rawConvertible(split.rawConvertible());
+
+ if (split.beforeDeletionFiles().isPresent()) {
+ builder.withBeforeDeletionFiles(split.beforeDeletionFiles().get());
+ }
+ if (split.deletionFiles().isPresent()) {
+ builder.withDataDeletionFiles(split.deletionFiles().get());
+ }
+ return builder.build();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
new file mode 100644
index 000000000..71a8a73ad
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.AbstractDataTableRead;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.IOException;
+
+import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
+
+/** An {@link InnerTableRead} that reads the data changed before and after
compaction. */
+public class LookupCompactDiffRead extends AbstractDataTableRead<KeyValue> {
+ private final SplitRead<InternalRow> fullPhaseMergeRead;
+ private final SplitRead<InternalRow> incrementalDiffRead;
+
+ public LookupCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema
schema) {
+ super(schema);
+ this.incrementalDiffRead = new
IncrementalCompactDiffSplitRead(mergeRead);
+ this.fullPhaseMergeRead =
+ SplitRead.convert(mergeRead, split ->
unwrap(mergeRead.createReader(split)));
+ }
+
+ @Override
+ public void projection(int[][] projection) {
+ fullPhaseMergeRead.withProjection(projection);
+ incrementalDiffRead.withProjection(projection);
+ }
+
+ @Override
+ public RecordReader<InternalRow> reader(Split split) throws IOException {
+ DataSplit dataSplit = (DataSplit) split;
+ if (dataSplit.beforeFiles().isEmpty()) {
+ return fullPhaseMergeRead.createReader(dataSplit); // full reading
phase
+ } else {
+ return incrementalDiffRead.createReader((DataSplit) split);
+ }
+ }
+
+ @Override
+ protected InnerTableRead innerWithFilter(Predicate predicate) {
+ fullPhaseMergeRead.withFilter(predicate);
+ incrementalDiffRead.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ fullPhaseMergeRead.forceKeepDelete();
+ incrementalDiffRead.forceKeepDelete();
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ fullPhaseMergeRead.withIOManager(ioManager);
+ incrementalDiffRead.withIOManager(ioManager);
+ return this;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
new file mode 100644
index 000000000..fe064b80e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
+import org.apache.paimon.table.source.snapshot.FollowUpScanner;
+import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.SnapshotManager;
+
+import static org.apache.paimon.CoreOptions.StartupMode;
+import static
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode;
+
+/**
+ * {@link org.apache.paimon.table.source.StreamTableScan} implementation for
lookup streaming
+ * planning.
+ */
+public class LookupDataTableScan extends DataTableStreamScan {
+
+ private StartupMode startupMode;
+ private LookupStreamScanMode lookupScanMode;
+
+ public LookupDataTableScan(
+ CoreOptions options,
+ SnapshotReader snapshotReader,
+ SnapshotManager snapshotManager,
+ boolean supportStreamingReadOverwrite,
+ DefaultValueAssigner defaultValueAssigner,
+ LookupStreamScanMode lookupScanMode) {
+ super(
+ options,
+ snapshotReader,
+ snapshotManager,
+ supportStreamingReadOverwrite,
+ defaultValueAssigner);
+ this.startupMode = options.startupMode();
+ this.lookupScanMode = lookupScanMode;
+ }
+
+ @Override
+ protected StartingScanner createStartingScanner(boolean isStreaming) {
+ return startupMode != CoreOptions.StartupMode.COMPACTED_FULL
+ ? new FullStartingScanner(snapshotReader.snapshotManager())
+ : super.createStartingScanner(isStreaming);
+ }
+
+ @Override
+ protected FollowUpScanner createFollowUpScanner() {
+ switch (lookupScanMode) {
+ case CHANGELOG:
+ return super.createFollowUpScanner();
+ case FILE_MONITOR:
+ return new AllDeltaFollowUpScanner();
+ case COMPACT_DELTA_MONITOR:
+ return new CompactionDiffFollowUpScanner();
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown lookup stream scan mode: " +
lookupScanMode.name());
+ }
+ }
+
+ @Override
+ protected BoundedChecker createBoundedChecker() {
+ return BoundedChecker.neverEnd(); // dim table should never end
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
new file mode 100644
index 000000000..bb5274bc6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.DelegatedFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.ReadBuilderImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.utils.SimpleFileReader;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static org.apache.paimon.options.description.TextElement.text;
+
+/** {@link FileStoreTable} for lookup table. */
+public class LookupFileStoreTable extends DelegatedFileStoreTable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LookupStreamScanMode lookupScanMode;
+
+ public LookupFileStoreTable(FileStoreTable wrapped, List<String> joinKeys)
{
+ super(wrapped);
+ this.lookupScanMode = lookupStreamScanMode(wrapped, joinKeys);
+ }
+
+ public LookupFileStoreTable(FileStoreTable wrapped, LookupStreamScanMode
lookupScanMode) {
+ super(wrapped);
+ this.lookupScanMode = lookupScanMode;
+ }
+
+ @Override
+ public ReadBuilder newReadBuilder() {
+ return new ReadBuilderImpl(this);
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ switch (lookupScanMode) {
+ case CHANGELOG:
+ case FILE_MONITOR:
+ return wrapped.newRead();
+ case COMPACT_DELTA_MONITOR:
+ return new LookupCompactDiffRead(
+ ((KeyValueFileStore) wrapped.store()).newRead(),
wrapped.schema());
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown lookup stream scan mode: " +
lookupScanMode.name());
+ }
+ }
+
+ @Override
+ public StreamDataTableScan newStreamScan() {
+ return new LookupDataTableScan(
+ wrapped.coreOptions(),
+ wrapped.newSnapshotReader(),
+ wrapped.snapshotManager(),
+ wrapped.supportStreamingReadOverwrite(),
+ DefaultValueAssigner.create(wrapped.schema()),
+ lookupScanMode);
+ }
+
+ @Override
+ public SimpleFileReader<ManifestFileMeta> manifestListReader() {
+ return wrapped.manifestListReader();
+ }
+
+ @Override
+ public SimpleFileReader<ManifestEntry> manifestFileReader() {
+ return wrapped.manifestFileReader();
+ }
+
+ @Override
+ public SimpleFileReader<IndexManifestEntry> indexManifestFileReader() {
+ return wrapped.indexManifestFileReader();
+ }
+
+ @Override
+ public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ return new LookupFileStoreTable(wrapped.copy(dynamicOptions),
lookupScanMode);
+ }
+
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return new LookupFileStoreTable(wrapped.copy(newTableSchema),
lookupScanMode);
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new LookupFileStoreTable(wrapped.copy(dynamicOptions),
lookupScanMode);
+ }
+
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ return new LookupFileStoreTable(wrapped.copyWithLatestSchema(),
lookupScanMode);
+ }
+
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ wrapped.switchToBranch(branchName);
+ return this;
+ }
+
+ private LookupStreamScanMode lookupStreamScanMode(FileStoreTable table,
List<String> joinKeys) {
+ Options options = Options.fromMap(table.options());
+ if (options.get(LOOKUP_CACHE_MODE) ==
FlinkConnectorOptions.LookupCacheMode.AUTO
+ && new HashSet<>(table.primaryKeys()).equals(new
HashSet<>(joinKeys))) {
+ return LookupStreamScanMode.FILE_MONITOR;
+ } else if (table.primaryKeys().size() > 0
+ && options.get(CHANGELOG_PRODUCER) ==
CoreOptions.ChangelogProducer.NONE
+ && TableScanUtils.supportCompactDiffStreamingReading(table)) {
+ return LookupStreamScanMode.COMPACT_DELTA_MONITOR;
+ } else {
+ return LookupStreamScanMode.CHANGELOG;
+ }
+ }
+
+ /** Inner stream scan mode for lookup table. */
+ public enum LookupStreamScanMode implements DescribedEnum {
+ CHANGELOG("changelog", "Streaming reading based on changelog or delta
data files."),
+ FILE_MONITOR("file-monitor", "Monitor data file changes."),
+ COMPACT_DELTA_MONITOR(
+ "compact-delta-monitor",
+ "Streaming reading based on data changes before and after
compaction.");
+
+ private final String value;
+ private final String description;
+
+ LookupStreamScanMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ceb40c1a8..fa9b7672d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -22,13 +22,10 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -42,10 +39,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
@@ -62,20 +56,12 @@ public class LookupStreamingReader {
@Nullable private final Predicate projectedPredicate;
private final StreamTableScan scan;
- private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
- Arrays.asList(
- CoreOptions.SCAN_TIMESTAMP_MILLIS,
- CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
- CoreOptions.SCAN_SNAPSHOT_ID,
- CoreOptions.SCAN_TAG_NAME,
- CoreOptions.SCAN_VERSION);
-
public LookupStreamingReader(
Table table,
int[] projection,
@Nullable Predicate predicate,
Set<Integer> requireCachedBucketIds) {
- this.table = unsetTimeTravelOptions(table);
+ this.table = table;
this.projection = projection;
this.readBuilder =
this.table
@@ -112,21 +98,6 @@ public class LookupStreamingReader {
}
}
- private Table unsetTimeTravelOptions(Table origin) {
- FileStoreTable fileStoreTable = (FileStoreTable) origin;
- Map<String, String> newOptions = new
HashMap<>(fileStoreTable.options());
-
TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove);
-
- CoreOptions.StartupMode startupMode =
CoreOptions.fromMap(newOptions).startupMode();
- if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
- startupMode = CoreOptions.StartupMode.LATEST_FULL;
- }
- newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
-
- TableSchema newSchema = fileStoreTable.schema().copy(newOptions);
- return fileStoreTable.copy(newSchema);
- }
-
public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws
Exception {
List<Split> splits = scan.plan().splits();
CoreOptions options = CoreOptions.fromMap(table.options());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index bdf0a1b4a..967826e11 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -40,16 +40,10 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.function.Function;
-import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
-import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
-import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
-
/** Lookup table for primary key which supports to read the LSM tree directly.
*/
public class PrimaryKeyPartialLookupTable implements LookupTable {
@@ -158,7 +152,11 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
return new PrimaryKeyPartialLookupTable(
filter ->
new LocalQueryExecutor(
- table, projection, tempPath, filter,
requireCachedBucketIds),
+ new LookupFileStoreTable(table, joinKey),
+ projection,
+ tempPath,
+ filter,
+ requireCachedBucketIds),
table,
joinKey);
}
@@ -192,12 +190,8 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
.withValueProjection(Projection.of(projection).toNestedIndexes())
.withIOManager(new
IOManagerImpl(tempPath.toString()));
- Map<String, String> dynamicOptions = new HashMap<>();
- dynamicOptions.put(STREAM_SCAN_MODE.key(),
FILE_MONITOR.getValue());
- dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null);
this.scan =
- table.copy(dynamicOptions)
- .newReadBuilder()
+ table.newReadBuilder()
.withFilter(filter)
.withBucketFilter(
requireCachedBucketIds == null
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index a5645302f..30b7bbdd5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -20,12 +20,15 @@ package org.apache.paimon.flink.utils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
/** Utility methods for {@link TableScan}, such as validating. */
public class TableScanUtils {
@@ -59,4 +62,24 @@ public class TableScanUtils {
}
return Optional.empty();
}
+
+ /**
+ * Check whether streaming reading is supported based on the data changed
before and after
+ * compact.
+ */
+ public static boolean supportCompactDiffStreamingReading(Table table) {
+ CoreOptions options = CoreOptions.fromMap(table.options());
+ Set<CoreOptions.MergeEngine> compactDiffReadingEngine =
+ new HashSet<CoreOptions.MergeEngine>() {
+ {
+ add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
+ add(CoreOptions.MergeEngine.AGGREGATE);
+ }
+ };
+
+ return options.needLookup()
+ && compactDiffReadingEngine.contains(options.mergeEngine())
+ && !Options.fromMap(options.toMap())
+
.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 14643542e..619cb4c1d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
@@ -722,6 +723,108 @@ public class LookupTableTest extends TableTestBase {
table.close();
}
+ @Test
+ public void testFullCacheLookupTableWithForceLookup() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.MERGE_ENGINE,
CoreOptions.MergeEngine.PARTIAL_UPDATE);
+ options.set(
+ FlinkConnectorOptions.LOOKUP_CACHE_MODE,
+ FlinkConnectorOptions.LookupCacheMode.FULL);
+ options.set(CoreOptions.WRITE_ONLY, true);
+ options.set(CoreOptions.FORCE_LOOKUP, true);
+ options.set(CoreOptions.BUCKET, 1);
+ FileStoreTable storeTable = createTable(singletonList("f0"), options);
+ FileStoreTable compactTable =
+
storeTable.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(),
"false"));
+ FullCacheLookupTable.Context context =
+ new FullCacheLookupTable.Context(
+ storeTable,
+ new int[] {0, 1, 2},
+ null,
+ null,
+ tempDir.toFile(),
+ singletonList("f0"),
+ null);
+ table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+
+ // initialize
+ write(storeTable, ioManager, GenericRow.of(1, 11, 111));
+ compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true);
+ table.open();
+
+ List<InternalRow> result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 11, 111);
+
+ // first write
+ write(storeTable, GenericRow.of(1, null, 222));
+ table.refresh();
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 11, 111); // old value because there is no
compact
+
+ // only L0 occur compact
+ compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, false);
+ table.refresh();
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 11, 222); // get new value after compact
+
+ // second write
+ write(storeTable, GenericRow.of(1, 22, null));
+ table.refresh();
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 11, 222); // old value
+
+ // full compact
+ compact(compactTable, BinaryRow.EMPTY_ROW, 0, ioManager, true);
+ table.refresh();
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 22, 222); // new value
+ }
+
+ @Test
+ public void testPartialLookupTableWithForceLookup() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.MERGE_ENGINE,
CoreOptions.MergeEngine.PARTIAL_UPDATE);
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.NONE);
+ options.set(CoreOptions.FORCE_LOOKUP, true);
+ options.set(CoreOptions.BUCKET, 1);
+ FileStoreTable dimTable = createTable(singletonList("f0"), options);
+
+ PrimaryKeyPartialLookupTable table =
+ PrimaryKeyPartialLookupTable.createLocalTable(
+ dimTable,
+ new int[] {0, 1, 2},
+ tempDir.toFile(),
+ ImmutableList.of("f0"),
+ null);
+ table.open();
+
+ List<InternalRow> result = table.get(row(1, -1));
+ assertThat(result).hasSize(0);
+
+ write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2,
-2, 22));
+ result = table.get(row(1));
+ assertThat(result).hasSize(0);
+
+ table.refresh();
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, -1, 11);
+ result = table.get(row(2));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 2, -2, 22);
+
+ write(dimTable, ioManager, GenericRow.of(1, null, 111));
+ table.refresh();
+ result = table.get(row(1));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, -1, 111);
+ }
+
private FileStoreTable createDimTable() throws Exception {
FileIO fileIO = LocalFileIO.create();
org.apache.paimon.fs.Path tablePath =