This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 97d6f34e2 [client] Fix the MetadataRequest sent by LogFetcher timeout 
exception when upgrading cluster (#1666)
97d6f34e2 is described below

commit 97d6f34e2b174615841d2b71d248ff0cf6ac97b4
Author: yunhong <[email protected]>
AuthorDate: Mon Sep 15 22:29:33 2025 +0800

    [client] Fix the MetadataRequest sent by LogFetcher timeout exception when 
upgrading cluster (#1666)
---
 .../fluss/client/metadata/MetadataUpdater.java     |  2 +-
 .../fluss/client/table/scanner/log/LogFetcher.java | 59 ++++++++++++++++------
 .../client/admin/ClientToServerITCaseBase.java     |  2 +
 .../fluss/client/metadata/MetadataUpdaterTest.java |  4 +-
 .../client/table/scanner/log/LogFetcherTest.java   | 43 ++++++++++++++--
 .../org/apache/fluss/config/ConfigOptions.java     |  4 +-
 website/docs/maintenance/configuration.md          |  2 +-
 7 files changed, 92 insertions(+), 24 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 1730235f4..92d81bd10 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -254,7 +254,7 @@ public class MetadataUpdater {
     }
 
     @VisibleForTesting
-    protected void updateMetadata(
+    public void updateMetadata(
             @Nullable Set<TablePath> tablePaths,
             @Nullable Collection<PhysicalTablePath> tablePartitionNames,
             @Nullable Collection<Long> tablePartitionIds)
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index d195dc6f0..ff67575e2 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -162,14 +162,19 @@ public class LogFetcher implements Closeable {
      * Set up a fetch request for any node that we have assigned buckets for 
which doesn't already
      * have an in-flight fetch or pending fetch data.
      */
-    public synchronized void sendFetches() {
-        Map<Integer, FetchLogRequest> fetchRequestMap = 
prepareFetchLogRequests();
-        fetchRequestMap.forEach(
-                (nodeId, fetchLogRequest) -> {
-                    LOG.debug("Adding pending request for node id {}", nodeId);
-                    nodesWithPendingFetchRequests.add(nodeId);
-                    sendFetchRequest(nodeId, fetchLogRequest);
-                });
+    public void sendFetches() {
+        checkAndUpdateMetadata(fetchableBuckets());
+        synchronized (this) {
+            // NOTE: Don't perform heavy I/O operations or synchronous waits 
inside this lock to
+            // avoid blocking the future complete of FetchLogResponse.
+            Map<Integer, FetchLogRequest> fetchRequestMap = 
prepareFetchLogRequests();
+            fetchRequestMap.forEach(
+                    (nodeId, fetchLogRequest) -> {
+                        LOG.debug("Adding pending request for node id {}", 
nodeId);
+                        nodesWithPendingFetchRequests.add(nodeId);
+                        sendFetchRequest(nodeId, fetchLogRequest);
+                    });
+        }
     }
 
     /**
@@ -190,6 +195,31 @@ public class LogFetcher implements Closeable {
         logFetchBuffer.wakeup();
     }
 
+    private void checkAndUpdateMetadata(List<TableBucket> tableBuckets) {
+        // If the table is partitioned table, check if we need update 
partition metadata.
+        List<Long> partitionIds = isPartitioned ? new ArrayList<>() : null;
+        // If the table is none-partitioned table, check if we need update 
table metadata.
+        boolean needUpdate = false;
+        for (TableBucket tb : tableBuckets) {
+            if (getTableBucketLeader(tb) != null) {
+                continue;
+            }
+
+            if (isPartitioned) {
+                partitionIds.add(tb.getPartitionId());
+            } else {
+                needUpdate = true;
+                break;
+            }
+        }
+
+        if (isPartitioned && !partitionIds.isEmpty()) {
+            metadataUpdater.updateMetadata(Collections.singleton(tablePath), 
null, partitionIds);
+        } else if (needUpdate) {
+            metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+        }
+    }
+
     private void sendFetchRequest(int destination, FetchLogRequest 
fetchLogRequest) {
         TableOrPartitions tableOrPartitionsInFetchRequest =
                 getTableOrPartitionsInFetchRequest(fetchLogRequest);
@@ -243,18 +273,21 @@ public class LogFetcher implements Closeable {
         return new TableOrPartitions(tableIdsInFetchRequest, 
tablePartitionsInFetchRequest);
     }
 
-    private static class TableOrPartitions {
+    /** A helper class to hold table ids or table partitions. */
+    @VisibleForTesting
+    static class TableOrPartitions {
         private final @Nullable Set<Long> tableIds;
         private final @Nullable Set<TablePartition> tablePartitions;
 
-        private TableOrPartitions(
+        TableOrPartitions(
                 @Nullable Set<Long> tableIds, @Nullable Set<TablePartition> 
tablePartitions) {
             this.tableIds = tableIds;
             this.tablePartitions = tablePartitions;
         }
     }
 
-    private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) 
{
+    @VisibleForTesting
+    void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {
         Set<PhysicalTablePath> physicalTablePaths =
                 metadataUpdater.getPhysicalTablePathByIds(
                         tableOrPartitions.tableIds, 
tableOrPartitions.tablePartitions);
@@ -404,9 +437,6 @@ public class LogFetcher implements Closeable {
                 LOG.trace(
                         "Skipping fetch request for bucket {} because leader 
is not available.",
                         tb);
-                // try to get the latest metadata info of this table because 
the leader for this
-                // bucket is unknown.
-                metadataUpdater.updateTableOrPartitionMetadata(tablePath, 
tb.getPartitionId());
             } else if (nodesWithPendingFetchRequests.contains(leader)) {
                 LOG.trace(
                         "Skipping fetch request for bucket {} because previous 
request "
@@ -472,7 +502,6 @@ public class LogFetcher implements Closeable {
     }
 
     private Integer getTableBucketLeader(TableBucket tableBucket) {
-        metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
         if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
             BucketLocation bucketLocation = 
metadataUpdater.getBucketLocation(tableBucket).get();
             if (bucketLocation.getLeader() != null) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index 3925384e7..f26ee3d6a 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -121,6 +121,8 @@ public abstract class ClientToServerITCaseBase {
         conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, 
MemorySize.parse("1kb"));
         conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
         conf.set(ConfigOptions.MAX_BUCKET_NUM, 30);
+
+        conf.set(ConfigOptions.NETTY_CLIENT_NUM_NETWORK_THREADS, 1);
         return conf;
     }
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
index 514025ae9..5471d8b60 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
@@ -20,7 +20,6 @@ package org.apache.fluss.client.metadata;
 import org.apache.fluss.client.Connection;
 import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
-import org.apache.fluss.client.utils.MetadataUtils;
 import org.apache.fluss.cluster.Cluster;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.config.Configuration;
@@ -34,6 +33,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -64,7 +64,7 @@ class MetadataUpdaterTest {
         // any N levels UnmodifiableCollection
         for (int i = 0; i < 20000; i++) {
             cluster =
-                    MetadataUtils.sendMetadataRequestAndRebuildCluster(
+                    sendMetadataRequestAndRebuildCluster(
                             FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(),
                             true,
                             cluster,
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index 634bcb5ea..b131986e7 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -39,6 +39,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.fluss.record.TestData.DATA1;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -55,6 +59,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
     private long tableId;
     private final int bucketId0 = 0;
     private final int bucketId1 = 1;
+    private LogScannerStatus logScannerStatus;
 
     // TODO covert this test to UT as kafka.
 
@@ -74,9 +79,8 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
         // add bucket 0 and bucket 1 to log scanner status.
         scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
         scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
-        LogScannerStatus logScannerStatus = new LogScannerStatus();
+        logScannerStatus = new LogScannerStatus();
         logScannerStatus.assignScanBuckets(scanBuckets);
-        TestingScannerMetricGroup scannerMetricGroup = 
TestingScannerMetricGroup.newInstance();
         logFetcher =
                 new LogFetcher(
                         DATA1_TABLE_INFO,
@@ -84,7 +88,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
                         logScannerStatus,
                         clientConf,
                         metadataUpdater,
-                        scannerMetricGroup,
+                        TestingScannerMetricGroup.newInstance(),
                         new RemoteFileDownloader(1));
     }
 
@@ -183,6 +187,39 @@ public class LogFetcherTest extends 
ClientToServerITCaseBase {
         assertThat(records.get(tb0).size()).isEqualTo(10);
     }
 
+    @Test
+    void testFetchWithInvalidTableOrPartitions() throws Exception {
+        MetadataUpdater metadataUpdater1 =
+                new MetadataUpdater(clientConf, 
FLUSS_CLUSTER_EXTENSION.getRpcClient());
+        logFetcher =
+                new LogFetcher(
+                        DATA1_TABLE_INFO,
+                        null,
+                        logScannerStatus,
+                        clientConf,
+                        metadataUpdater1,
+                        TestingScannerMetricGroup.newInstance(),
+                        new RemoteFileDownloader(1));
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<?> future =
+                executor.submit(
+                        () -> {
+                            // If this test blocked, please checking whether 
it was blocked with
+                            // the same reason as 
https://github.com/apache/fluss/pull/1666
+                            for (int i = 0; i < 1000; i++) {
+                                logFetcher.sendFetches();
+                                logFetcher.invalidTableOrPartitions(
+                                        new LogFetcher.TableOrPartitions(
+                                                
Collections.singleton(tableId), null));
+                            }
+                        });
+
+        future.get(30, TimeUnit.SECONDS);
+        assertThat(future.isDone()).isTrue();
+        executor.shutdownNow();
+    }
+
     private void addRecordsToBucket(
             TableBucket tableBucket, MemoryLogRecords logRecords, long 
expectedBaseOffset)
             throws Exception {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 343434d38..07de50bfc 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -763,10 +763,10 @@ public class ConfigOptions {
     public static final ConfigOption<Integer> NETTY_CLIENT_NUM_NETWORK_THREADS 
=
             key("netty.client.num-network-threads")
                     .intType()
-                    .defaultValue(3)
+                    .defaultValue(4)
                     .withDescription(
                             "The number of threads that the client uses for 
sending requests to the "
-                                    + "network and receiving responses from 
network. The default value is 3");
+                                    + "network and receiving responses from 
network. The default value is 4");
 
     // ------------------------------------------------------------------------
     //  Client Settings
diff --git a/website/docs/maintenance/configuration.md 
b/website/docs/maintenance/configuration.md
index c5dc0ed7e..79ca3c347 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -92,7 +92,7 @@ during the Fluss cluster working.
 | netty.server.num-worker-threads  | Integer  | 8       | The number of 
threads that the server uses for processing requests, which may include disk 
and remote I/O.                                  |
 | netty.server.max-queued-requests | Integer  | 500     | The number of queued 
requests allowed for worker threads, before blocking the I/O threads.           
                                       |
 | netty.connection.max-idle-time   | Duration | 10min   | Close idle 
connections after the given time specified by this config.                      
                                                 |
-| netty.client.num-network-threads | Integer  | 3       | The number of 
threads that the client uses for sending requests to the network and receiving 
responses from network. The default value is 3 |
+| netty.client.num-network-threads | Integer  | 4       | The number of 
threads that the client uses for sending requests to the network and receiving 
responses from network. The default value is 4 |
 
 ## Log
 

Reply via email to