This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e34505c1a [flink] Flink Lookup supports async mode (#1770)
e34505c1a is described below
commit e34505c1af6e7d8090826b97d73bc4f89f808598
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 10 18:28:07 2023 +0800
[flink] Flink Lookup supports async mode (#1770)
---
docs/content/how-to/lookup-joins.md | 43 +++++++++--
.../generated/flink_connector_configuration.html | 12 +++
.../flink/lookup/LookupRuntimeProviderFactory.java | 3 +-
.../flink/lookup/LookupRuntimeProviderFactory.java | 3 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 12 +++
.../flink/lookup/AsyncLookupFunctionWrapper.java | 85 ++++++++++++++++++++++
.../flink/lookup/FileStoreLookupFunction.java | 14 +++-
.../flink/lookup/LookupRuntimeProviderFactory.java | 10 ++-
.../paimon/flink/source/DataTableSource.java | 9 ++-
.../org/apache/paimon/flink/LookupJoinITCase.java | 23 ++++++
10 files changed, 203 insertions(+), 11 deletions(-)
diff --git a/docs/content/how-to/lookup-joins.md
b/docs/content/how-to/lookup-joins.md
index 878bf6eb6..cd5655b60 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -30,6 +30,8 @@ under the License.
Paimon supports lookup joins on tables with primary keys and append-only
tables in Flink. The following example illustrates this feature.
+### Prepare
+
First, let's create a Paimon table and update it in real-time.
```sql
@@ -67,6 +69,8 @@ CREATE TEMPORARY TABLE Orders (
);
```
+### Normal Lookup
+
You can now use `customers` in a lookup join query.
```sql
@@ -78,14 +82,43 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
-The lookup join operator will maintain a RocksDB cache locally and pull the
latest updates of the table in real time. Lookup join operator will only pull
the necessary data, so your filter conditions are very important for
performance.
+### Retry Lookup
-This feature is only suitable for tables containing at most tens of millions
of records to avoid excessive use of local disks.
-
-{{< hint info >}}
If the records of `Orders` (main table) join missing because the corresponding
data of `customers` (lookup table) is not ready.
You can consider using Flink's [Delayed Retry Strategy For
Lookup](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup).
-{{< /hint >}}
+Only for Flink 1.16+.
+
+```sql
+-- enrich each order with customer information
+SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss',
'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
+o.order_id, o.total, c.country, c.zip
+FROM Orders AS o
+JOIN customers
+FOR SYSTEM_TIME AS OF o.proc_time AS c
+ON o.customer_id = c.id;
+```
+
+### Async Retry Lookup
+
+The problem with synchronous retry is that one record will block subsequent
records, causing the entire job to be blocked.
+You can consider using async + allow_unordered to avoid blocking, the records
that join missing will no longer block
+other records.
+
+```sql
+-- enrich each order with customer information
+SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss',
'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay',
'fixed-delay'='1s', 'max-attempts'='600') */
+o.order_id, o.total, c.country, c.zip
+FROM Orders AS o
+JOIN customers /*+ OPTIONS('lookup.async'='true',
'lookup.async-thread-number'='16') */
+FOR SYSTEM_TIME AS OF o.proc_time AS c
+ON o.customer_id = c.id;
+```
+
+### Performance
+
+The lookup join operator will maintain a RocksDB cache locally and pull the
latest updates of the table in real time. Lookup join operator will only pull
the necessary data, so your filter conditions are very important for
performance.
+
+This feature is only suitable for tables containing at most tens of millions
of records to avoid excessive use of local disks.
## RocksDB Cache Options
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 8ce743cbb..d72eee6f6 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -38,6 +38,18 @@ under the License.
<td>String</td>
<td>The log system used to keep changes of the table.<br /><br
/>Possible values:<br /><ul><li>"none": No log system, the data is written only
to file store, and the streaming read will be directly read from the file
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written
to file store and kafka, and the streaming read will be read from kafka. If
streaming read from file, configures streaming-read-mode to file.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>lookup.async</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable async lookup join.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.async-thread-number</h5></td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>The thread number for lookup async.</td>
+ </tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
index b23b83a3c..4221f4f1d 100644
---
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
+++
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
@@ -24,7 +24,8 @@ import
org.apache.flink.table.connector.source.TableFunctionProvider;
/** Factory to create {@link LookupRuntimeProvider}. */
public class LookupRuntimeProviderFactory {
- public static LookupRuntimeProvider create(FileStoreLookupFunction
function) {
+ public static LookupRuntimeProvider create(
+ FileStoreLookupFunction function, boolean enableAsync, int
asyncThreadNumber) {
return TableFunctionProvider.of(new OldLookupFunction(function));
}
}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
index b23b83a3c..4221f4f1d 100644
---
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
@@ -24,7 +24,8 @@ import
org.apache.flink.table.connector.source.TableFunctionProvider;
/** Factory to create {@link LookupRuntimeProvider}. */
public class LookupRuntimeProviderFactory {
- public static LookupRuntimeProvider create(FileStoreLookupFunction
function) {
+ public static LookupRuntimeProvider create(
+ FileStoreLookupFunction function, boolean enableAsync, int
asyncThreadNumber) {
return TableFunctionProvider.of(new OldLookupFunction(function));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index c56369fb3..682053602 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -224,6 +224,18 @@ public class FlinkConnectorOptions {
.withDescription(
"If the new snapshot has not been generated when
the checkpoint starts to trigger, the enumerator will block the checkpoint and
wait for the new snapshot. Set the maximum waiting time to avoid infinite
waiting, if timeout, the checkpoint will fail. Note that it should be set
smaller than the checkpoint timeout.");
+ public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+ ConfigOptions.key("lookup.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable async lookup join.");
+
+ public static final ConfigOption<Integer> LOOKUP_ASYNC_THREAD_NUMBER =
+ ConfigOptions.key("lookup.async-thread-number")
+ .intType()
+ .defaultValue(16)
+ .withDescription("The thread number for lookup async.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
new file mode 100644
index 000000000..f8b41d141
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.utils.ExecutorThreadFactory;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A {@link AsyncLookupFunction} to wrap sync function. */
+public class AsyncLookupFunctionWrapper extends AsyncLookupFunction {
+
+ private final NewLookupFunction function;
+ private final int threadNumber;
+
+ private transient ExecutorService lazyExecutor;
+
+ public AsyncLookupFunctionWrapper(NewLookupFunction function, int
threadNumber) {
+ this.function = function;
+ this.threadNumber = threadNumber;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ function.open(context);
+ }
+
+ private Collection<RowData> lookup(RowData keyRow) {
+ try {
+ synchronized (function) {
+ return function.lookup(keyRow);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+ return CompletableFuture.supplyAsync(() -> lookup(keyRow), executor());
+ }
+
+ @Override
+ public void close() throws Exception {
+ function.close();
+ if (lazyExecutor != null) {
+ lazyExecutor.shutdownNow();
+ lazyExecutor = null;
+ }
+ }
+
+ private ExecutorService executor() {
+ if (lazyExecutor == null) {
+ lazyExecutor =
+ Executors.newFixedThreadPool(
+ threadNumber,
+ new
ExecutorThreadFactory(Thread.currentThread().getName() + "-async"));
+ }
+ return lazyExecutor;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 3381cfa7a..55b2fa8d5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -231,7 +231,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
try {
Field field = context.getClass().getDeclaredField("context");
field.setAccessible(true);
- StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)
field.get(context);
+ StreamingRuntimeContext runtimeContext =
+ extractStreamingRuntimeContext(field.get(context));
String[] tmpDirectories =
runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
return
tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
@@ -239,4 +240,15 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
throw new RuntimeException(e);
}
}
+
+ private static StreamingRuntimeContext
extractStreamingRuntimeContext(Object runtimeContext)
+ throws NoSuchFieldException, IllegalAccessException {
+ if (runtimeContext instanceof StreamingRuntimeContext) {
+ return (StreamingRuntimeContext) runtimeContext;
+ }
+
+ Field field =
runtimeContext.getClass().getDeclaredField("runtimeContext");
+ field.setAccessible(true);
+ return extractStreamingRuntimeContext(field.get(runtimeContext));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
index 47e99f1ea..8b6957993 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupRuntimeProviderFactory.java
@@ -19,12 +19,18 @@
package org.apache.paimon.flink.lookup;
import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
/** Factory to create {@link LookupRuntimeProvider}. */
public class LookupRuntimeProviderFactory {
- public static LookupRuntimeProvider create(FileStoreLookupFunction
function) {
- return LookupFunctionProvider.of(new NewLookupFunction(function));
+ public static LookupRuntimeProvider create(
+ FileStoreLookupFunction function, boolean enableAsync, int
asyncThreadNumber) {
+ NewLookupFunction lookup = new NewLookupFunction(function);
+ return enableAsync
+ ? AsyncLookupFunctionProvider.of(
+ new AsyncLookupFunctionWrapper(lookup,
asyncThreadNumber))
+ : LookupFunctionProvider.of(lookup);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index d9f72d335..bcda510f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -61,6 +61,8 @@ import static
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
@@ -276,8 +278,13 @@ public class DataTableSource extends FlinkTableSource {
? IntStream.range(0,
table.rowType().getFieldCount()).toArray()
: Projection.of(projectFields).toTopLevelIndexes();
int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
+ Options options = new Options(table.options());
+ boolean enableAsync = options.get(LOOKUP_ASYNC);
+ int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER);
return LookupRuntimeProviderFactory.create(
- new FileStoreLookupFunction(table, projection, joinKey,
predicate));
+ new FileStoreLookupFunction(table, projection, joinKey,
predicate),
+ enableAsync,
+ asyncThreadNumber);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index a078858ec..713d4ef2a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -517,6 +517,29 @@ public class LookupJoinITCase extends CatalogITCaseBase {
iterator.close();
}
+ @Test
+ public void testAsyncRetryLookup() throws Exception {
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+ String query =
+ "SELECT /*+ LOOKUP('table'='D',
'retry-predicate'='lookup_miss',"
+ + " 'retry-strategy'='fixed_delay',
'output-mode'='allow_unordered', 'fixed-delay'='1s','max-attempts'='60') */"
+ + " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+
OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i
= D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (3)");
+ sql("INSERT INTO T VALUES (2)");
+ sql("INSERT INTO T VALUES (1)");
+ Thread.sleep(2000); // wait
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111, 1111), Row.of(2,
22, 222, 2222));
+
+ sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3,
33, 333, 3333));
+
+ iterator.close();
+ }
+
@Test
public void testLookupPartitionedTable() throws Exception {
String query =