This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a26dd5851 [flink] Allow dynamic configuration of 
'continuous.discovery-interval' using hints in Flink lookup joins. (#2021)
a26dd5851 is described below

commit a26dd585112c2b0a630436e2a9e72eccaaf01ee0
Author: sai <[email protected]>
AuthorDate: Wed Sep 20 21:12:53 2023 +0800

    [flink] Allow dynamic configuration of 'continuous.discovery-interval' 
using hints in Flink lookup joins. (#2021)
---
 docs/layouts/shortcodes/generated/rocksdb_configuration.html   |  6 ++++++
 .../src/main/java/org/apache/paimon/flink/RocksDBOptions.java  |  8 ++++++++
 .../apache/paimon/flink/lookup/FileStoreLookupFunction.java    | 10 +++++++---
 .../paimon/flink/lookup/FileStoreLookupFunctionTest.java       |  3 ++-
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index a7ff2b0f7..0a6d09235 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Long</td>
             <td>The maximum number of rows to store in the cache.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.continuous.discovery-interval</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>The discovery interval of lookup continuous reading. This is 
used as an SQL hint. If it's not configured, the lookup function will fallback 
to 'continuous.discovery-interval'.</td>
+        </tr>
         <tr>
             <td><h5>rocksdb.block.blocksize</h5></td>
             <td style="word-wrap: break-word;">4 kb</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
index 42f13aa73..d54a5ef8a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
@@ -35,6 +35,7 @@ import org.rocksdb.PlainTableConfig;
 import org.rocksdb.TableFormatConfig;
 
 import java.io.File;
+import java.time.Duration;
 
 import static org.apache.paimon.options.ConfigOptions.key;
 import static org.apache.paimon.options.description.LinkElement.link;
@@ -55,6 +56,13 @@ public class RocksDBOptions {
                     .defaultValue(10_000L)
                     .withDescription("The maximum number of rows to store in 
the cache.");
 
+    public static final ConfigOption<Duration> 
LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL =
+            key("lookup.continuous.discovery-interval")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The discovery interval of lookup continuous 
reading. This is used as an SQL hint. If it's not configured, the lookup 
function will fallback to 'continuous.discovery-interval'.");
+
     // 
--------------------------------------------------------------------------
     // Provided configurable DBOptions within Flink
     // 
--------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 4715c3244..36f393210 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
@@ -59,7 +58,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static 
org.apache.paimon.flink.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A lookup {@link TableFunction} for file store. */
@@ -123,7 +124,9 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     private void open() throws Exception {
         Options options = Options.fromMap(table.options());
-        this.refreshInterval = 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
+        this.refreshInterval =
+                options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
+                        .orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
         this.stateFactory = new RocksDBStateFactory(path.toString(), options, 
null);
 
         List<String> fieldNames = table.rowType().getFieldNames();
@@ -194,7 +197,8 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
         if (nextLoadTime > 0) {
             LOG.info(
-                    "Lookup table has refreshed after {} second(s), 
refreshing",
+                    "Lookup table {} has refreshed after {} second(s), 
refreshing",
+                    table.name(),
                     refreshInterval.toMillis() / 1000);
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index b0931f23d..5c666c7fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.RocksDBOptions;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -73,7 +74,7 @@ public class FileStoreLookupFunctionTest {
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
         conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
         conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
-        conf.set(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL, Duration.ZERO);
+        conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, 
Duration.ofSeconds(1));
 
         RowType rowType =
                 RowType.of(

Reply via email to