This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a26dd5851 [flink] Allow dynamic configuration of
'continuous.discovery-interval' using hints in Flink lookup joins. (#2021)
a26dd5851 is described below
commit a26dd585112c2b0a630436e2a9e72eccaaf01ee0
Author: sai <[email protected]>
AuthorDate: Wed Sep 20 21:12:53 2023 +0800
[flink] Allow dynamic configuration of 'continuous.discovery-interval'
using hints in Flink lookup joins. (#2021)
---
docs/layouts/shortcodes/generated/rocksdb_configuration.html | 6 ++++++
.../src/main/java/org/apache/paimon/flink/RocksDBOptions.java | 8 ++++++++
.../apache/paimon/flink/lookup/FileStoreLookupFunction.java | 10 +++++++---
.../paimon/flink/lookup/FileStoreLookupFunctionTest.java | 3 ++-
4 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index a7ff2b0f7..0a6d09235 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -32,6 +32,12 @@ under the License.
<td>Long</td>
<td>The maximum number of rows to store in the cache.</td>
</tr>
+ <tr>
+ <td><h5>lookup.continuous.discovery-interval</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The discovery interval of lookup continuous reading. This is
used as an SQL hint. If it's not configured, the lookup function will fallback
to 'continuous.discovery-interval'.</td>
+ </tr>
<tr>
<td><h5>rocksdb.block.blocksize</h5></td>
<td style="word-wrap: break-word;">4 kb</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
index 42f13aa73..d54a5ef8a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java
@@ -35,6 +35,7 @@ import org.rocksdb.PlainTableConfig;
import org.rocksdb.TableFormatConfig;
import java.io.File;
+import java.time.Duration;
import static org.apache.paimon.options.ConfigOptions.key;
import static org.apache.paimon.options.description.LinkElement.link;
@@ -55,6 +56,13 @@ public class RocksDBOptions {
.defaultValue(10_000L)
.withDescription("The maximum number of rows to store in
the cache.");
+ public static final ConfigOption<Duration>
LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL =
+ key("lookup.continuous.discovery-interval")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The discovery interval of lookup continuous
reading. This is used as an SQL hint. If it's not configured, the lookup
function will fallback to 'continuous.discovery-interval'.");
+
//
--------------------------------------------------------------------------
// Provided configurable DBOptions within Flink
//
--------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 4715c3244..36f393210 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
@@ -59,7 +58,9 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static
org.apache.paimon.flink.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A lookup {@link TableFunction} for file store. */
@@ -123,7 +124,9 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
private void open() throws Exception {
Options options = Options.fromMap(table.options());
- this.refreshInterval =
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
+ this.refreshInterval =
+ options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
+ .orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
this.stateFactory = new RocksDBStateFactory(path.toString(), options,
null);
List<String> fieldNames = table.rowType().getFieldNames();
@@ -194,7 +197,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
if (nextLoadTime > 0) {
LOG.info(
- "Lookup table has refreshed after {} second(s),
refreshing",
+ "Lookup table {} has refreshed after {} second(s),
refreshing",
+ table.name(),
refreshInterval.toMillis() / 1000);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index b0931f23d..5c666c7fe 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.RocksDBOptions;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -73,7 +74,7 @@ public class FileStoreLookupFunctionTest {
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
- conf.set(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL, Duration.ZERO);
+ conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL,
Duration.ofSeconds(1));
RowType rowType =
RowType.of(