This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3e79b2ef0 [client] Set the default value of
'scan.partition.discovery.interval' to 1min to reduce the server load (#1736)
3e79b2ef0 is described below
commit 3e79b2ef07e2a29c2733e965bd5175c6abb796c0
Author: yunhong <[email protected]>
AuthorDate: Sun Sep 28 20:58:21 2025 +0800
[client] Set the default value of 'scan.partition.discovery.interval' to
1min to reduce the server load (#1736)
---
.../apache/fluss/flink/FlinkConnectorOptions.java | 8 +++++--
.../fluss/flink/sink/FlinkTableSinkITCase.java | 9 ++++++-
.../fluss/flink/source/FlinkTableSourceITCase.java | 16 +++++++++++--
.../fluss/flink/source/FlussSourceBuilderTest.java | 2 +-
website/docs/engine-flink/options.md | 28 +++++++++++-----------
5 files changed, 43 insertions(+), 20 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index 53c69fbc0..cfbcccfdc 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -95,11 +95,15 @@ public class FlinkConnectorOptions {
public static final ConfigOption<Duration>
SCAN_PARTITION_DISCOVERY_INTERVAL =
ConfigOptions.key("scan.partition.discovery.interval")
.durationType()
- .defaultValue(Duration.ofSeconds(10))
+ .defaultValue(Duration.ofMinutes(1))
.withDescription(
"The time interval for the Fluss source to
discover "
+ "the new partitions for partitioned
table while scanning."
- + " A non-positive value disables the
partition discovery.");
+ + " A non-positive value disables the
partition discovery. The default value is 1 "
+ + "minute. Currently, since Fluss
Admin#listPartitions(TablePath tablePath) requires a large "
+ + "number of requests to ZooKeeper in
server, this option cannot be set too small, "
+ + "as a small value would cause frequent
requests and increase server load. In the future, "
+ + "once list partitions is optimized, the
default value of this parameter can be reduced.");
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
ConfigOptions.key("sink.ignore-delete")
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 54fb906ae..b01a7e4b2 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -566,8 +566,15 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
tableName, String.join(", ", insertValues)))
.await();
+ // This test requires dynamically discovering newly created
partitions, so
+ // 'scan.partition.discovery.interval' needs to be set to 2s (default
is 1 minute),
+ // otherwise the test may hang for 1 minute.
CloseableIterator<Row> rowIter =
- tEnv.executeSql(String.format("select * from %s",
tableName)).collect();
+ tEnv.executeSql(
+ String.format(
+ "select * from %s /*+
OPTIONS('scan.partition.discovery.interval' = '2s') */",
+ tableName))
+ .collect();
assertResultsIgnoreOrder(rowIter, expectedRows, false);
// create two partitions, write data to the new partitions
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index f9577af97..d765421bc 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -607,8 +607,15 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
writeRowsToPartition(conn, tablePath,
partitionNameById.values());
waitUntilAllBucketFinishSnapshot(admin, tablePath,
partitionNameById.values());
+ // This test requires dynamically discovering newly created
partitions, so
+ // 'scan.partition.discovery.interval' needs to be set to 2s (default
is 1 minute),
+ // otherwise the test may hang for 1 minute.
org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(String.format("select * from %s",
tableName)).collect();
+ tEnv.executeSql(
+ String.format(
+ "select * from %s /*+
OPTIONS('scan.partition.discovery.interval' = '2s') */",
+ tableName))
+ .collect();
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
// then create some new partitions, and write rows to the new
partitions
@@ -1047,8 +1054,13 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
+ "project=[a, b, d]]], fields=[a, b, d])");
// test partition key prefix match
+ // This test requires dynamically discovering newly created
partitions, so
+ // 'scan.partition.discovery.interval' needs to be set to 2s (default
is 1 minute),
+ // otherwise the test may hang for 1 minute.
org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql("select * from multi_partitioned_table where c
='2025'").collect();
+ tEnv.executeSql(
+ "select * from multi_partitioned_table /*+
OPTIONS('scan.partition.discovery.interval' = '2s') */ where c ='2025'")
+ .collect();
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
index 9553e70a0..5091d29d4 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
@@ -192,7 +192,7 @@ public class FlussSourceBuilderTest extends FlinkTestBase {
.build();
// Then
- assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(10000L);
+ assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(60000L);
}
@Test
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index 8eacd0cfd..d53a468e5 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -86,20 +86,20 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d');
## Read Options
-| Option | Type | Default
| Description
[...]
-|-----------------------------------------------------|------------|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| scan.startup.mode | Enum | full
| The scan startup mode enables you to
specify the starting point for data consumption. Fluss currently supports the
following `scan.startup.mode` options: `full` (default), earliest, latest,
timestamp. See the [Start Reading
Position](engine-flink/reads.md#start-reading-position) for more details.
[...]
-| scan.startup.timestamp | Long | (None)
| The timestamp to start reading the data
from. This option is only valid when `scan.startup.mode` is set to `timestamp`.
The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like
`1678883047356` or `2023-12-09 23:09:12`.
[...]
-| scan.partition.discovery.interval | Duration | 10s
| The time interval for the Fluss source
to discover the new partitions for partitioned table while scanning. A
non-positive value disables the partition discovery.
[...]
-| client.scanner.log.check-crc | Boolean | true
| Automatically check the CRC3 of the
read records for LogScanner. This ensures no on-the-wire or on-disk corruption
to the messages occurred. This check adds some overhead, so it may be disabled
in cases seeking extreme performance.
[...]
-| client.scanner.log.max-poll-records | Integer | 500
| The maximum number of records returned
in a single call to poll() for LogScanner. Note that this config doesn't impact
the underlying fetching behavior. The Scanner will cache the records from each
fetch request and returns them incrementally from each poll.
[...]
-| client.scanner.log.fetch.max-bytes | MemorySize | 16mb
| The maximum amount of data the server
should return for a fetch request from client. Records are fetched in batches,
and if the first record batch in the first non-empty bucket of the fetch is
larger than this value, the record batch will still be returned to ensure that
the fetch can make progress. As such, this is not a absolute maximum.
[...]
-| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb
| The maximum amount of data the server
should return for a table bucket in fetch request fom client. Records are
fetched in batches, and the max bytes size is config by this option.
[...]
-| client.scanner.log.fetch.min-bytes | MemorySize | 1b
| The minimum bytes expected for each
fetch log request from client to response. If not enough bytes, wait up to
client.scanner.log.fetch-wait-max-time time to return.
[...]
-| client.scanner.log.fetch.wait-max-time | Duration | 500ms
| The maximum time to wait for enough
bytes to be available for a fetch log request from client to response.
[...]
-| client.scanner.io.tmpdir | String |
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used
by client for storing the data files (like kv snapshot, log segment files) to
read temporarily
[...]
-| client.scanner.remote-log.prefetch-num | Integer | 4
| The number of remote log segments to
keep in local temp file for LogScanner, which download from remote storage. The
default setting is 4.
[...]
-| client.remote-file.download-thread-num | Integer | 3
| The number of threads the client uses
to download remote files.
[...]
+| Option | Type | Default
| Description
[...]
+|-----------------------------------------------------|------------|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| scan.startup.mode | Enum | full
| The scan startup mode enables you to
specify the starting point for data consumption. Fluss currently supports the
following `scan.startup.mode` options: `full` (default), earliest, latest,
timestamp. See the [Start Reading
Position](engine-flink/reads.md#start-reading-position) for more details.
[...]
+| scan.startup.timestamp | Long | (None)
| The timestamp to start reading the data
from. This option is only valid when `scan.startup.mode` is set to `timestamp`.
The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like
`1678883047356` or `2023-12-09 23:09:12`.
[...]
+| scan.partition.discovery.interval | Duration | 1min
| The time interval for the Fluss source
to discover the new partitions for partitioned table while scanning. A
non-positive value disables the partition discovery. The default value is 1
minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath)
requires a large number of requests to ZooKeeper in server, this option cannot
be set too small, as a small value [...]
+| client.scanner.log.check-crc | Boolean | true
| Automatically check the CRC3 of the
read records for LogScanner. This ensures no on-the-wire or on-disk corruption
to the messages occurred. This check adds some overhead, so it may be disabled
in cases seeking extreme performance.
[...]
+| client.scanner.log.max-poll-records | Integer | 500
| The maximum number of records returned
in a single call to poll() for LogScanner. Note that this config doesn't impact
the underlying fetching behavior. The Scanner will cache the records from each
fetch request and returns them incrementally from each poll.
[...]
+| client.scanner.log.fetch.max-bytes | MemorySize | 16mb
| The maximum amount of data the server
should return for a fetch request from client. Records are fetched in batches,
and if the first record batch in the first non-empty bucket of the fetch is
larger than this value, the record batch will still be returned to ensure that
the fetch can make progress. As such, this is not a absolute maximum.
[...]
+| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb
| The maximum amount of data the server
should return for a table bucket in fetch request fom client. Records are
fetched in batches, and the max bytes size is config by this option.
[...]
+| client.scanner.log.fetch.min-bytes | MemorySize | 1b
| The minimum bytes expected for each
fetch log request from client to response. If not enough bytes, wait up to
client.scanner.log.fetch-wait-max-time time to return.
[...]
+| client.scanner.log.fetch.wait-max-time | Duration | 500ms
| The maximum time to wait for enough
bytes to be available for a fetch log request from client to response.
[...]
+| client.scanner.io.tmpdir | String |
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used
by client for storing the data files (like kv snapshot, log segment files) to
read temporarily
[...]
+| client.scanner.remote-log.prefetch-num | Integer | 4
| The number of remote log segments to
keep in local temp file for LogScanner, which download from remote storage. The
default setting is 4.
[...]
+| client.remote-file.download-thread-num | Integer | 3
| The number of threads the client uses
to download remote files.
[...]
## Lookup Options