This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e79b2ef0 [client] Set the default value of 
'scan.partition.discovery.interval' to 1min to reduce the server load (#1736)
3e79b2ef0 is described below

commit 3e79b2ef07e2a29c2733e965bd5175c6abb796c0
Author: yunhong <[email protected]>
AuthorDate: Sun Sep 28 20:58:21 2025 +0800

    [client] Set the default value of 'scan.partition.discovery.interval' to 
1min to reduce the server load (#1736)
---
 .../apache/fluss/flink/FlinkConnectorOptions.java  |  8 +++++--
 .../fluss/flink/sink/FlinkTableSinkITCase.java     |  9 ++++++-
 .../fluss/flink/source/FlinkTableSourceITCase.java | 16 +++++++++++--
 .../fluss/flink/source/FlussSourceBuilderTest.java |  2 +-
 website/docs/engine-flink/options.md               | 28 +++++++++++-----------
 5 files changed, 43 insertions(+), 20 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index 53c69fbc0..cfbcccfdc 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -95,11 +95,15 @@ public class FlinkConnectorOptions {
     public static final ConfigOption<Duration> 
SCAN_PARTITION_DISCOVERY_INTERVAL =
             ConfigOptions.key("scan.partition.discovery.interval")
                     .durationType()
-                    .defaultValue(Duration.ofSeconds(10))
+                    .defaultValue(Duration.ofMinutes(1))
                     .withDescription(
                             "The time interval for the Fluss source to 
discover "
                                     + "the new partitions for partitioned 
table while scanning."
-                                    + " A non-positive value disables the 
partition discovery.");
+                                    + " A non-positive value disables the 
partition discovery. The default value is 1 "
+                                    + "minute. Currently, since Fluss 
Admin#listPartitions(TablePath tablePath) requires a large "
+                                    + "number of requests to ZooKeeper in 
server, this option cannot be set too small, "
+                                    + "as a small value would cause frequent 
requests and increase server load. In the future, "
+                                    + "once list partitions is optimized, the 
default value of this parameter can be reduced.");
 
     public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
             ConfigOptions.key("sink.ignore-delete")
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 54fb906ae..b01a7e4b2 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -566,8 +566,15 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
                                 tableName, String.join(", ", insertValues)))
                 .await();
 
+        // This test requires dynamically discovering newly created 
partitions, so
+        // 'scan.partition.discovery.interval' needs to be set to 2s (default 
is 1 minute),
+        // otherwise the test may hang for 1 minute.
         CloseableIterator<Row> rowIter =
-                tEnv.executeSql(String.format("select * from %s", 
tableName)).collect();
+                tEnv.executeSql(
+                                String.format(
+                                        "select * from %s /*+ 
OPTIONS('scan.partition.discovery.interval' = '2s') */",
+                                        tableName))
+                        .collect();
         assertResultsIgnoreOrder(rowIter, expectedRows, false);
 
         // create two partitions, write data to the new partitions
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index f9577af97..d765421bc 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -607,8 +607,15 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 writeRowsToPartition(conn, tablePath, 
partitionNameById.values());
         waitUntilAllBucketFinishSnapshot(admin, tablePath, 
partitionNameById.values());
 
+        // This test requires dynamically discovering newly created 
partitions, so
+        // 'scan.partition.discovery.interval' needs to be set to 2s (default 
is 1 minute),
+        // otherwise the test may hang for 1 minute.
         org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(String.format("select * from %s", 
tableName)).collect();
+                tEnv.executeSql(
+                                String.format(
+                                        "select * from %s /*+ 
OPTIONS('scan.partition.discovery.interval' = '2s') */",
+                                        tableName))
+                        .collect();
         assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
 
         // then create some new partitions, and write rows to the new 
partitions
@@ -1047,8 +1054,13 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                                 + "project=[a, b, d]]], fields=[a, b, d])");
 
         // test partition key prefix match
+        // This test requires dynamically discovering newly created 
partitions, so
+        // 'scan.partition.discovery.interval' needs to be set to 2s (default 
is 1 minute),
+        // otherwise the test may hang for 1 minute.
         org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql("select * from multi_partitioned_table where c 
='2025'").collect();
+                tEnv.executeSql(
+                                "select * from multi_partitioned_table /*+ 
OPTIONS('scan.partition.discovery.interval' = '2s') */ where c ='2025'")
+                        .collect();
 
         assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
index 9553e70a0..5091d29d4 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java
@@ -192,7 +192,7 @@ public class FlussSourceBuilderTest extends FlinkTestBase {
                         .build();
 
         // Then
-        assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(10000L);
+        assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(60000L);
     }
 
     @Test
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index 8eacd0cfd..d53a468e5 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -86,20 +86,20 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d');
 
 ## Read Options
 
-| Option                                              | Type       | Default   
                                      | Description                             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|-----------------------------------------------------|------------|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| scan.startup.mode                                   | Enum       | full      
                                      | The scan startup mode enables you to 
specify the starting point for data consumption. Fluss currently supports the 
following `scan.startup.mode` options: `full` (default), earliest, latest, 
timestamp. See the [Start Reading 
Position](engine-flink/reads.md#start-reading-position) for more details.       
                                                                      [...]
-| scan.startup.timestamp                              | Long       | (None)    
                                      | The timestamp to start reading the data 
from. This option is only valid when `scan.startup.mode` is set to `timestamp`. 
The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like 
`1678883047356` or `2023-12-09 23:09:12`.                                       
                                                                                
                      [...]
-| scan.partition.discovery.interval                   | Duration   | 10s       
                                      | The time interval for the Fluss source 
to discover the new partitions for partitioned table while scanning. A 
non-positive value disables the partition discovery.                            
                                                                                
                                                                                
                        [...]
-| client.scanner.log.check-crc                        | Boolean    | true      
                                      | Automatically check the CRC3 of the 
read records for LogScanner. This ensures no on-the-wire or on-disk corruption 
to the messages occurred. This check adds some overhead, so it may be disabled 
in cases seeking extreme performance.                                           
                                                                                
                    [...]
-| client.scanner.log.max-poll-records                 | Integer    | 500       
                                      | The maximum number of records returned 
in a single call to poll() for LogScanner. Note that this config doesn't impact 
the underlying fetching behavior. The Scanner will cache the records from each 
fetch request and returns them incrementally from each poll.                    
                                                                                
                [...]
-| client.scanner.log.fetch.max-bytes                  | MemorySize | 16mb      
                                      | The maximum amount of data the server 
should return for a fetch request from client. Records are fetched in batches, 
and if the first record batch in the first non-empty bucket of the fetch is 
larger than this value, the record batch will still be returned to ensure that 
the fetch can make progress. As such, this is not a absolute maximum.           
                      [...]
-| client.scanner.log.fetch.max-bytes-for-bucket       | MemorySize | 1mb       
                                      | The maximum amount of data the server 
should return for a table bucket in fetch request fom client. Records are 
fetched in batches, and the max bytes size is config by this option.            
                                                                                
                                                                                
                      [...]
-| client.scanner.log.fetch.min-bytes                  | MemorySize | 1b        
                                      | The minimum bytes expected for each 
fetch log request from client to response. If not enough bytes, wait up to 
client.scanner.log.fetch-wait-max-time time to return.                          
                                                                                
                                                                                
                       [...]
-| client.scanner.log.fetch.wait-max-time              | Duration   | 500ms     
                                      | The maximum time to wait for enough 
bytes to be available for a fetch log request from client to response.          
                                                                                
                                                                                
                                                                                
                  [...]
-| client.scanner.io.tmpdir                            | String     | 
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used 
by client for storing the data files (like kv snapshot, log segment files) to 
read temporarily                                                                
                                                                                
                                                                                
                           [...]
-| client.scanner.remote-log.prefetch-num              | Integer    | 4         
                                      | The number of remote log segments to 
keep in local temp file for LogScanner, which download from remote storage. The 
default setting is 4.                                                           
                                                                                
                                                                                
                 [...]
-| client.remote-file.download-thread-num              | Integer    | 3         
                                      | The number of threads the client uses 
to download remote files.                                                       
                                                                                
                                                                                
                                                                                
                [...]
+| Option                                              | Type       | Default   
                                      | Description                             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|-----------------------------------------------------|------------|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| scan.startup.mode                                   | Enum       | full      
                                      | The scan startup mode enables you to 
specify the starting point for data consumption. Fluss currently supports the 
following `scan.startup.mode` options: `full` (default), earliest, latest, 
timestamp. See the [Start Reading 
Position](engine-flink/reads.md#start-reading-position) for more details.       
                                                                      [...]
+| scan.startup.timestamp                              | Long       | (None)    
                                      | The timestamp to start reading the data 
from. This option is only valid when `scan.startup.mode` is set to `timestamp`. 
The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like 
`1678883047356` or `2023-12-09 23:09:12`.                                       
                                                                                
                      [...]
+| scan.partition.discovery.interval                   | Duration   | 1min      
                                      | The time interval for the Fluss source 
to discover the new partitions for partitioned table while scanning. A 
non-positive value disables the partition discovery. The default value is 1 
minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) 
requires a large number of requests to ZooKeeper in server, this option cannot 
be set too small, as a small value  [...]
+| client.scanner.log.check-crc                        | Boolean    | true      
                                      | Automatically check the CRC3 of the 
read records for LogScanner. This ensures no on-the-wire or on-disk corruption 
to the messages occurred. This check adds some overhead, so it may be disabled 
in cases seeking extreme performance.                                           
                                                                                
                    [...]
+| client.scanner.log.max-poll-records                 | Integer    | 500       
                                      | The maximum number of records returned 
in a single call to poll() for LogScanner. Note that this config doesn't impact 
the underlying fetching behavior. The Scanner will cache the records from each 
fetch request and returns them incrementally from each poll.                    
                                                                                
                [...]
+| client.scanner.log.fetch.max-bytes                  | MemorySize | 16mb      
                                      | The maximum amount of data the server 
should return for a fetch request from client. Records are fetched in batches, 
and if the first record batch in the first non-empty bucket of the fetch is 
larger than this value, the record batch will still be returned to ensure that 
the fetch can make progress. As such, this is not a absolute maximum.           
                      [...]
+| client.scanner.log.fetch.max-bytes-for-bucket       | MemorySize | 1mb       
                                      | The maximum amount of data the server 
should return for a table bucket in fetch request fom client. Records are 
fetched in batches, and the max bytes size is config by this option.            
                                                                                
                                                                                
                      [...]
+| client.scanner.log.fetch.min-bytes                  | MemorySize | 1b        
                                      | The minimum bytes expected for each 
fetch log request from client to response. If not enough bytes, wait up to 
client.scanner.log.fetch-wait-max-time time to return.                          
                                                                                
                                                                                
                       [...]
+| client.scanner.log.fetch.wait-max-time              | Duration   | 500ms     
                                      | The maximum time to wait for enough 
bytes to be available for a fetch log request from client to response.          
                                                                                
                                                                                
                                                                                
                  [...]
+| client.scanner.io.tmpdir                            | String     | 
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used 
by client for storing the data files (like kv snapshot, log segment files) to 
read temporarily                                                                
                                                                                
                                                                                
                           [...]
+| client.scanner.remote-log.prefetch-num              | Integer    | 4         
                                      | The number of remote log segments to 
keep in local temp file for LogScanner, which download from remote storage. The 
default setting is 4.                                                           
                                                                                
                                                                                
                 [...]
+| client.remote-file.download-thread-num              | Integer    | 3         
                                      | The number of threads the client uses 
to download remote files.                                                       
                                                                                
                                                                                
                                                                                
                [...]
 
 ## Lookup Options
 

Reply via email to