This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new df2cf190e [flink] Introduce max_pt for Flink lookup join (#2926)
df2cf190e is described below
commit df2cf190e879e533e6139f6a76a74f76732c64dc
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 1 12:01:41 2024 +0800
[flink] Introduce max_pt for Flink lookup join (#2926)
---
docs/content/how-to/lookup-joins.md | 30 ++++++
.../generated/flink_connector_configuration.html | 12 +++
.../java/org/apache/paimon/data/JoinedRow.java | 4 +
.../apache/paimon/predicate/PredicateBuilder.java | 17 ++++
.../main/java/org/apache/paimon/types/RowType.java | 16 +++
.../org/apache/paimon/utils/KeyProjectedRow.java | 4 +
.../apache/paimon/flink/FlinkConnectorOptions.java | 15 +++
.../flink/lookup/DynamicPartitionLoader.java | 112 +++++++++++++++++++++
.../flink/lookup/FileStoreLookupFunction.java | 62 +++++++++++-
.../paimon/flink/lookup/FullCacheLookupTable.java | 16 ++-
.../apache/paimon/flink/lookup/LookupTable.java | 3 +
.../flink/lookup/NoPrimaryKeyLookupTable.java | 12 ++-
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 29 +++---
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 72 +++++++++----
.../flink/lookup/SecondaryIndexLookupTable.java | 16 ++-
.../org/apache/paimon/flink/LookupJoinITCase.java | 38 +++++++
.../flink/lookup/FileStoreLookupFunctionTest.java | 4 +-
.../paimon/flink/lookup/LookupTableTest.java | 14 +++
18 files changed, 433 insertions(+), 43 deletions(-)
diff --git a/docs/content/how-to/lookup-joins.md
b/docs/content/how-to/lookup-joins.md
index e0dba4c59..ee0706c31 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -120,6 +120,36 @@ your streaming job may be blocked. You can try to use
`audit_log` system table f
(convert CDC stream to append stream).
{{< /hint >}}
+## Dynamic Partition
+
+In traditional data warehouses, each partition often maintains the latest full
data, so this partition table only
+needs to join the latest partition. Paimon has specifically developed the
`max_pt` feature for this scenario.
+
+**Create Paimon Partitioned Table**
+
+```sql
+CREATE TABLE customers (
+ id INT,
+ name STRING,
+ country STRING,
+ zip STRING,
+ dt STRING,
+ PRIMARY KEY (id, dt) NOT ENFORCED
+) PARTITIONED BY (dt);
+```
+
+**Lookup Join**
+
+```sql
+SELECT o.order_id, o.total, c.country, c.zip
+FROM orders AS o
+JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()',
'lookup.dynamic-partition.refresh-interval'='1 h') */
+FOR SYSTEM_TIME AS OF o.proc_time AS c
+ON o.customer_id = c.id;
+```
+
+The Lookup node will automatically refresh the latest partition and query the
data of the latest partition.
+
## Query Service
You can run a Flink Streaming Job to start query service for the table. When
QueryService exists, Flink Lookup Join
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 099d9127c..f02046f31 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -56,6 +56,18 @@ under the License.
<td><p>Enum</p></td>
<td>The cache mode of lookup join.<br /><br />Possible
values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td>
</tr>
+ <tr>
+ <td><h5>lookup.dynamic-partition</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specific dynamic partition for lookup, only support 'max_pt()'
currently.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.dynamic-partition.refresh-interval</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>Specific dynamic partition refresh interval for lookup, scan
all partitions and obtain corresponding partition.</td>
+ </tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
index 526886598..3650000b6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
@@ -75,6 +75,10 @@ public class JoinedRow implements InternalRow {
this.row2 = row2;
}
+ public static JoinedRow join(InternalRow row1, InternalRow row2) {
+ return new JoinedRow(row1, row2);
+ }
+
/**
* Replaces the {@link InternalRow} backing this {@link JoinedRow}.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index ee84d59d8..f6410adca 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -46,8 +46,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
@@ -161,6 +163,21 @@ public class PredicateBuilder {
.get();
}
+ @Nullable
+ public static Predicate andNullable(Predicate... predicates) {
+ return andNullable(Arrays.asList(predicates));
+ }
+
+ @Nullable
+ public static Predicate andNullable(List<Predicate> predicates) {
+ predicates =
predicates.stream().filter(Objects::nonNull).collect(Collectors.toList());
+ if (predicates.isEmpty()) {
+ return null;
+ }
+
+ return and(predicates);
+ }
+
public static Predicate or(Predicate... predicates) {
return or(Arrays.asList(predicates));
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 692c60e51..1462d330e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -27,6 +27,7 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -184,6 +185,21 @@ public final class RowType extends DataType {
return new RowType(newFields);
}
+ public RowType project(int[] mapping) {
+ List<DataField> fields = getFields();
+ return new RowType(
+
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
+ }
+
+ public RowType project(List<String> names) {
+ List<DataField> fields = getFields();
+ List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
+ return new RowType(
+ names.stream()
+ .map(k -> fields.get(fieldNames.indexOf(k)))
+ .collect(Collectors.toList()));
+ }
+
public static RowType of(DataType... types) {
final List<DataField> fields = new ArrayList<>();
for (int i = 0; i < types.length; i++) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
index ea11105bb..01d9c313a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java
@@ -44,6 +44,10 @@ public class KeyProjectedRow implements InternalRow {
return this;
}
+ public int[] indexMapping() {
+ return indexMapping;
+ }
+
@Override
public int getFieldCount() {
return indexMapping.length;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index ae6451916..d98709f05 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -293,6 +293,21 @@ public class FlinkConnectorOptions {
.defaultValue(LookupCacheMode.AUTO)
.withDescription("The cache mode of lookup join.");
+ public static final ConfigOption<String> LOOKUP_DYNAMIC_PARTITION =
+ ConfigOptions.key("lookup.dynamic-partition")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specific dynamic partition for lookup, only
support 'max_pt()' currently.");
+
+ public static final ConfigOption<Duration>
LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
+ ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "Specific dynamic partition refresh interval for
lookup, "
+ + "scan all partitions and obtain
corresponding partition.");
+
public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
ConfigOptions.key("sink.savepoint.auto-tag")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
new file mode 100644
index 000000000..26c172621
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Dynamic partition for lookup. */
+public class DynamicPartitionLoader implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String MAX_PT = "max_pt()";
+
+ private final Table table;
+ private final Duration refreshInterval;
+
+ private TableScan scan;
+ private Comparator<InternalRow> comparator;
+
+ private LocalDateTime lastRefresh;
+ private BinaryRow partition;
+
+ private DynamicPartitionLoader(Table table, Duration refreshInterval) {
+ this.table = table;
+ this.refreshInterval = refreshInterval;
+ }
+
+ public void open() {
+ this.scan = table.newReadBuilder().newScan();
+ RowType partitionType = table.rowType().project(table.partitionKeys());
+ this.comparator =
+
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes(), "Partition");
+ }
+
+ public void addJoinKeys(List<String> joinKeys) {
+ List<String> partitionKeys = table.partitionKeys();
+ checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains));
+ joinKeys.addAll(partitionKeys);
+ }
+
+ @Nullable
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ /** @return true if partition changed. */
+ public boolean checkRefresh() {
+ if (lastRefresh != null
+ &&
!lastRefresh.plus(refreshInterval).isBefore(LocalDateTime.now())) {
+ return false;
+ }
+
+ BinaryRow previous = this.partition;
+ partition =
scan.listPartitions().stream().max(comparator).orElse(null);
+ lastRefresh = LocalDateTime.now();
+
+ return !Objects.equals(previous, partition);
+ }
+
+ @Nullable
+ public static DynamicPartitionLoader of(Table table) {
+ Options options = Options.fromMap(table.options());
+ String dynamicPartition = options.get(LOOKUP_DYNAMIC_PARTITION);
+ if (dynamicPartition == null) {
+ return null;
+ }
+
+ if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) {
+ throw new UnsupportedOperationException(
+ "Unsupported dynamic partition pattern: " +
dynamicPartition);
+ }
+
+ Duration refresh =
+
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
+ return new DynamicPartitionLoader(table, refresh);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 77ea719a2..a321da634 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -19,17 +19,22 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -51,6 +56,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
@@ -73,6 +79,7 @@ public class FileStoreLookupFunction implements Serializable,
Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(FileStoreLookupFunction.class);
private final Table table;
+ @Nullable private final DynamicPartitionLoader partitionLoader;
private final List<String> projectFields;
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
@@ -89,6 +96,7 @@ public class FileStoreLookupFunction implements Serializable,
Closeable {
TableScanUtils.streamingReadingValidate(table);
this.table = table;
+ this.partitionLoader = DynamicPartitionLoader.of(table);
// join keys are based on projection fields
this.joinKeys =
@@ -96,6 +104,10 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
.mapToObj(i ->
table.rowType().getFieldNames().get(projection[i]))
.collect(Collectors.toList());
+ if (partitionLoader != null) {
+ partitionLoader.addJoinKeys(joinKeys);
+ }
+
this.projectFields =
Arrays.stream(projection)
.mapToObj(i -> table.rowType().getFieldNames().get(i))
@@ -126,6 +138,10 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
private void open() throws Exception {
+ if (partitionLoader != null) {
+ partitionLoader.open();
+ }
+
this.nextLoadTime = -1;
Options options = Options.fromMap(table.options());
@@ -165,6 +181,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
this.lookupTable = FullCacheLookupTable.create(context,
options.get(LOOKUP_CACHE_ROWS));
}
+ refreshDynamicPartition(false);
lookupTable.open();
}
@@ -187,7 +204,17 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
public Collection<RowData> lookup(RowData keyRow) {
try {
checkRefresh();
- List<InternalRow> results = lookupTable.get(new
FlinkRowWrapper(keyRow));
+
+ InternalRow key = new FlinkRowWrapper(keyRow);
+ if (partitionLoader != null) {
+ InternalRow partition = refreshDynamicPartition(true);
+ if (partition == null) {
+ return Collections.emptyList();
+ }
+ key = JoinedRow.join(key, partition);
+ }
+
+ List<InternalRow> results = lookupTable.get(key);
List<RowData> rows = new ArrayList<>(results.size());
for (InternalRow matchedRow : results) {
rows.add(new FlinkRowData(matchedRow));
@@ -201,6 +228,39 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
}
+ @Nullable
+ private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception
{
+ if (partitionLoader == null) {
+ return null;
+ }
+
+ boolean partitionChanged = partitionLoader.checkRefresh();
+ BinaryRow partition = partitionLoader.partition();
+
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+
+ if (partitionChanged && reopen) {
+ lookupTable.close();
+ lookupTable.open();
+ }
+
+ return partition;
+ }
+
+ private Predicate createSpecificPartFilter(BinaryRow partition) {
+ RowType rowType = table.rowType();
+ List<String> fieldNames = rowType.getFieldNames();
+ List<String> partitionKeys = table.partitionKeys();
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ List<Predicate> predicates = new ArrayList<>();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ int index = fieldNames.indexOf(partitionKeys.get(i));
+ predicates.add(
+ builder.equal(
+ index, InternalRowUtils.get(partition, i,
rowType.getTypeAt(index))));
+ }
+ return PredicateBuilder.and(predicates);
+ }
+
private void reopen() {
try {
close();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index d6ccacf87..7f9b019bd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -27,11 +27,13 @@ import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
@@ -53,9 +55,11 @@ public abstract class FullCacheLookupTable implements
LookupTable {
protected final Context context;
protected final RocksDBStateFactory stateFactory;
protected final RowType projectedType;
- private final LookupStreamingReader reader;
private final boolean sequenceFieldEnabled;
+ private LookupStreamingReader reader;
+ private Predicate specificPartition;
+
public FullCacheLookupTable(Context context) throws IOException {
this.context = context;
this.stateFactory =
@@ -64,7 +68,6 @@ public abstract class FullCacheLookupTable implements
LookupTable {
context.table.coreOptions().toConfiguration(),
null);
FileStoreTable table = context.table;
- this.reader = new LookupStreamingReader(table, context.projection,
context.tablePredicate);
this.sequenceFieldEnabled =
table.primaryKeys().size() > 0
&& new
CoreOptions(table.options()).sequenceField().isPresent();
@@ -75,8 +78,16 @@ public abstract class FullCacheLookupTable implements
LookupTable {
this.projectedType = projectedType;
}
+ @Override
+ public void specificPartitionFilter(Predicate filter) {
+ this.specificPartition = filter;
+ }
+
@Override
public void open() throws Exception {
+ Predicate scanPredicate =
+ PredicateBuilder.andNullable(context.tablePredicate,
specificPartition);
+ this.reader = new LookupStreamingReader(context.table,
context.projection, scanPredicate);
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(context.tempPath.toString()),
context.table.coreOptions());
@@ -155,6 +166,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
@Override
public void close() throws IOException {
stateFactory.close();
+ FileIOUtils.deleteDirectory(context.tempPath);
}
/** Bulk loader for the table. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index e89259073..c13947bbd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
import java.io.Closeable;
import java.io.IOException;
@@ -27,6 +28,8 @@ import java.util.List;
/** A lookup table which provides get and refresh. */
public interface LookupTable extends Closeable {
+ void specificPartitionFilter(Predicate filter);
+
void open() throws Exception;
List<InternalRow> get(InternalRow key) throws IOException;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index cd1c5e7a4..b931d18b4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -36,22 +36,30 @@ import java.util.List;
/** A {@link LookupTable} for table without primary key. */
public class NoPrimaryKeyLookupTable extends FullCacheLookupTable {
- private final RocksDBListState<InternalRow, InternalRow> state;
+ private final long lruCacheSize;
private final KeyProjectedRow joinKeyRow;
+ private RocksDBListState<InternalRow, InternalRow> state;
+
public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws
IOException {
super(context);
+ this.lruCacheSize = lruCacheSize;
List<String> fieldNames = projectedType.getFieldNames();
int[] joinKeyMapping =
context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.joinKeyRow = new KeyProjectedRow(joinKeyMapping);
+ }
+
+ @Override
+ public void open() throws Exception {
this.state =
stateFactory.listState(
"join-key-index",
InternalSerializers.create(
- TypeUtils.project(projectedType,
joinKeyMapping)),
+ TypeUtils.project(projectedType,
joinKeyRow.indexMapping())),
InternalSerializers.create(projectedType),
lruCacheSize);
+ super.open();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index fbec4d443..97034070c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -39,31 +39,24 @@ import java.util.List;
/** A {@link LookupTable} for primary key table. */
public class PrimaryKeyLookupTable extends FullCacheLookupTable {
- protected final RocksDBValueState<InternalRow, InternalRow> tableState;
-
- protected int[] primaryKeyMapping;
+ protected final long lruCacheSize;
protected final KeyProjectedRow primaryKeyRow;
@Nullable private final ProjectedRow keyRearrange;
+ protected RocksDBValueState<InternalRow, InternalRow> tableState;
+
public PrimaryKeyLookupTable(Context context, long lruCacheSize,
List<String> joinKey)
throws IOException {
super(context);
+ this.lruCacheSize = lruCacheSize;
List<String> fieldNames = projectedType.getFieldNames();
FileStoreTable table = context.table;
- this.primaryKeyMapping =
+ int[] primaryKeyMapping =
table.primaryKeys().stream().mapToInt(fieldNames::indexOf).toArray();
this.primaryKeyRow = new KeyProjectedRow(primaryKeyMapping);
- this.tableState =
- stateFactory.valueState(
- "table",
- InternalSerializers.create(
- TypeUtils.project(projectedType,
primaryKeyMapping)),
- InternalSerializers.create(projectedType),
- lruCacheSize);
-
ProjectedRow keyRearrange = null;
if (!table.primaryKeys().equals(joinKey)) {
keyRearrange =
@@ -76,6 +69,18 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
this.keyRearrange = keyRearrange;
}
+ @Override
+ public void open() throws Exception {
+ this.tableState =
+ stateFactory.valueState(
+ "table",
+ InternalSerializers.create(
+ TypeUtils.project(projectedType,
primaryKeyRow.indexMapping())),
+ InternalSerializers.create(projectedType),
+ lruCacheSize);
+ super.open();
+ }
+
@Override
public List<InternalRow> innerGet(InternalRow key) throws IOException {
if (keyRearrange != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 690cca544..7082a670e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
@@ -42,6 +43,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
@@ -50,17 +52,19 @@ import static
org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
/** Lookup table for primary key which supports to read the LSM tree directly.
*/
public class PrimaryKeyPartialLookupTable implements LookupTable {
- private final QueryExecutor queryExecutor;
+ private final Function<Predicate, QueryExecutor> executorFactory;
private final FixedBucketFromPkExtractor extractor;
@Nullable private final ProjectedRow keyRearrange;
+ @Nullable private final ProjectedRow trimmedKeyRearrange;
+
+ private Predicate specificPartition;
+ private QueryExecutor queryExecutor;
private PrimaryKeyPartialLookupTable(
- QueryExecutor queryExecutor, FileStoreTable table, List<String>
joinKey) {
- this.queryExecutor = queryExecutor;
- if (table.partitionKeys().size() > 0) {
- throw new UnsupportedOperationException(
- "The partitioned table are not supported in partial cache
mode.");
- }
+ Function<Predicate, QueryExecutor> executorFactory,
+ FileStoreTable table,
+ List<String> joinKey) {
+ this.executorFactory = executorFactory;
if (table.bucketMode() != BucketMode.FIXED) {
throw new UnsupportedOperationException(
@@ -79,6 +83,18 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
.toArray());
}
this.keyRearrange = keyRearrange;
+
+ List<String> trimmedPrimaryKeys = table.schema().trimmedPrimaryKeys();
+ ProjectedRow trimmedKeyRearrange = null;
+ if (!trimmedPrimaryKeys.equals(joinKey)) {
+ trimmedKeyRearrange =
+ ProjectedRow.from(
+ trimmedPrimaryKeys.stream()
+ .map(joinKey::indexOf)
+ .mapToInt(value -> value)
+ .toArray());
+ }
+ this.trimmedKeyRearrange = trimmedKeyRearrange;
}
@VisibleForTesting
@@ -86,21 +102,33 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
return queryExecutor;
}
+ @Override
+ public void specificPartitionFilter(Predicate filter) {
+ this.specificPartition = filter;
+ }
+
@Override
public void open() throws Exception {
+ this.queryExecutor = executorFactory.apply(specificPartition);
refresh();
}
@Override
public List<InternalRow> get(InternalRow key) throws IOException {
+ InternalRow adjustedKey = key;
if (keyRearrange != null) {
- key = keyRearrange.replaceRow(key);
+ adjustedKey = keyRearrange.replaceRow(adjustedKey);
}
- extractor.setRecord(key);
+ extractor.setRecord(adjustedKey);
int bucket = extractor.bucket();
BinaryRow partition = extractor.partition();
- InternalRow kv = queryExecutor.lookup(partition, bucket, key);
+ InternalRow trimmedKey = key;
+ if (trimmedKeyRearrange != null) {
+ trimmedKey = trimmedKeyRearrange.replaceRow(trimmedKey);
+ }
+
+ InternalRow kv = queryExecutor.lookup(partition, bucket, trimmedKey);
if (kv == null) {
return Collections.emptyList();
} else {
@@ -115,19 +143,23 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
@Override
public void close() throws IOException {
- queryExecutor.close();
+ if (queryExecutor != null) {
+ queryExecutor.close();
+ }
}
public static PrimaryKeyPartialLookupTable createLocalTable(
FileStoreTable table, int[] projection, File tempPath,
List<String> joinKey) {
- LocalQueryExecutor queryExecutor = new LocalQueryExecutor(table,
projection, tempPath);
- return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey);
+ return new PrimaryKeyPartialLookupTable(
+ filter -> new LocalQueryExecutor(table, projection, tempPath,
filter),
+ table,
+ joinKey);
}
public static PrimaryKeyPartialLookupTable createRemoteTable(
FileStoreTable table, int[] projection, List<String> joinKey) {
- RemoveQueryExecutor queryExecutor = new RemoveQueryExecutor(table,
projection);
- return new PrimaryKeyPartialLookupTable(queryExecutor, table, joinKey);
+ return new PrimaryKeyPartialLookupTable(
+ filter -> new RemoteQueryExecutor(table, projection), table,
joinKey);
}
interface QueryExecutor extends Closeable {
@@ -142,7 +174,8 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
private final LocalTableQuery tableQuery;
private final StreamTableScan scan;
- private LocalQueryExecutor(FileStoreTable table, int[] projection,
File tempPath) {
+ private LocalQueryExecutor(
+ FileStoreTable table, int[] projection, File tempPath,
@Nullable Predicate filter) {
this.tableQuery =
table.newLocalTableQuery()
.withValueProjection(Projection.of(projection).toNestedIndexes())
@@ -151,7 +184,8 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(STREAM_SCAN_MODE.key(),
FILE_MONITOR.getValue());
dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null);
- this.scan =
table.copy(dynamicOptions).newReadBuilder().newStreamScan();
+ this.scan =
+
table.copy(dynamicOptions).newReadBuilder().withFilter(filter).newStreamScan();
}
@Override
@@ -189,11 +223,11 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
}
}
- static class RemoveQueryExecutor implements QueryExecutor {
+ static class RemoteQueryExecutor implements QueryExecutor {
private final RemoteTableQuery tableQuery;
- private RemoveQueryExecutor(FileStoreTable table, int[] projection) {
+ private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
this.tableQuery = new
RemoteTableQuery(table).withValueProjection(projection);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index d95481a32..d0731ebdf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -34,22 +34,28 @@ import java.util.List;
/** A {@link LookupTable} for primary key table which provides lookup by
secondary key. */
public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
- private final RocksDBSetState<InternalRow, InternalRow> indexState;
-
private final KeyProjectedRow secKeyRow;
+ private RocksDBSetState<InternalRow, InternalRow> indexState;
+
public SecondaryIndexLookupTable(Context context, long lruCacheSize)
throws IOException {
super(context, lruCacheSize / 2, context.table.primaryKeys());
List<String> fieldNames = projectedType.getFieldNames();
int[] secKeyMapping =
context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.secKeyRow = new KeyProjectedRow(secKeyMapping);
+ }
+
+ @Override
+ public void open() throws Exception {
this.indexState =
stateFactory.setState(
"sec-index",
-
InternalSerializers.create(TypeUtils.project(projectedType, secKeyMapping)),
InternalSerializers.create(
- TypeUtils.project(projectedType,
primaryKeyMapping)),
- lruCacheSize / 2);
+ TypeUtils.project(projectedType,
secKeyRow.indexMapping())),
+ InternalSerializers.create(
+ TypeUtils.project(projectedType,
primaryKeyRow.indexMapping())),
+ lruCacheSize);
+ super.open();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 5fc9af74b..a11e7887c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -641,6 +641,44 @@ public class LookupJoinITCase extends CatalogITCaseBase {
iterator.close();
}
+ @Test
+ public void testLookupMaxPtPartitionedTablePartialCache() throws Exception
{
+ innerTestLookupMaxPtPartitionedTable(LookupCacheMode.AUTO);
+ }
+
+ @Test
+ public void testLookupMaxPtPartitionedTableFullCache() throws Exception {
+ innerTestLookupMaxPtPartitionedTable(LookupCacheMode.FULL);
+ }
+
+ private void innerTestLookupMaxPtPartitionedTable(LookupCacheMode mode)
throws Exception {
+ tEnv.executeSql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT,
PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + String.format("'lookup.cache' = '%s', ", mode)
+ + "'continuous.discovery-interval'='1 ms')");
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for
system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 2)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ List<Row> result = iterator.collect(1);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2));
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 3)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ result = iterator.collect(1);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 3));
+
+ iterator.close();
+ }
+
@Test
public void testLookupNonPkAppendTable() throws Exception {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index 61dc4e230..c21989ee8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor;
-import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoveQueryExecutor;
+import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoteQueryExecutor;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -142,7 +142,7 @@ public class FileStoreLookupFunctionTest {
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
QueryExecutor queryExecutor =
((PrimaryKeyPartialLookupTable)
lookupFunction.lookupTable()).queryExecutor();
- assertThat(queryExecutor).isInstanceOf(RemoveQueryExecutor.class);
+ assertThat(queryExecutor).isInstanceOf(RemoteQueryExecutor.class);
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 9af88502a..3af6b722d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -116,6 +116,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f0"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
// test bulk load error
{
@@ -174,6 +175,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f0"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
List<Pair<byte[], byte[]>> records = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
@@ -218,6 +220,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f0"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
table.refresh(singletonList(sequence(row(1, 11, 111),
-1L)).iterator(), false);
List<InternalRow> result = table.get(row(1));
@@ -249,6 +252,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f0"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
table.refresh(singletonList(sequence(row(1, 11, 111),
-1L)).iterator(), false);
List<InternalRow> result = table.get(row(1));
@@ -272,6 +276,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f1"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -316,6 +321,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f1"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
@@ -370,6 +376,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f1"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
table.refresh(singletonList(sequence(row(1, 11, 111),
-1L)).iterator(), false);
List<InternalRow> result = table.get(row(11));
@@ -410,6 +417,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f1"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
@@ -452,6 +460,7 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
singletonList("f1"));
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
table.refresh(singletonList(row(1, 11, 333)).iterator(), false);
List<InternalRow> result = table.get(row(11));
@@ -478,6 +487,8 @@ public class LookupTableTest extends TableTestBase {
new int[] {0, 1, 2},
tempDir.toFile(),
ImmutableList.of("pk1", "pk2"));
+ table.open();
+
List<InternalRow> result = table.get(row(1, -1));
assertThat(result).hasSize(0);
@@ -508,6 +519,7 @@ public class LookupTableTest extends TableTestBase {
new int[] {2, 1},
tempDir.toFile(),
ImmutableList.of("pk1", "pk2"));
+ table.open();
List<InternalRow> result = table.get(row(1, -1));
assertThat(result).hasSize(0);
@@ -533,6 +545,8 @@ public class LookupTableTest extends TableTestBase {
new int[] {2, 1},
tempDir.toFile(),
ImmutableList.of("pk2", "pk1"));
+ table.open();
+
List<InternalRow> result = table.get(row(-1, 1));
assertThat(result).hasSize(0);