wuchong commented on code in PR #1463:
URL: https://github.com/apache/fluss/pull/1463#discussion_r2354709190


##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.RpcServiceBase;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/** The coordinator metadata function provider. */
+public class CoordinatorMetadataFunctionProvider implements 
MetadataFunctionProvider {

Review Comment:
   We can simplify the interface to `MetadataProvider` and 
`CoordinatorMetadataProvider`, `TabletServerMetadataProvider`. 



##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.RpcServiceBase;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/** The coordinator metadata function provider. */
+public class CoordinatorMetadataFunctionProvider implements 
MetadataFunctionProvider {
+
+    private final ZooKeeperClient zkClient;
+
+    private final CoordinatorMetadataCache metadataCache;
+
+    private final CoordinatorContext ctx;
+
+    private final MetadataManager metadataManager;
+
+    public CoordinatorMetadataFunctionProvider(
+            ZooKeeperClient zkClient,
+            CoordinatorMetadataCache metadataCache,
+            CoordinatorContext ctx,
+            MetadataManager metadataManager) {
+        this.zkClient = zkClient;
+        this.metadataCache = metadataCache;
+        this.ctx = ctx;
+        this.metadataManager = metadataManager;
+    }
+
+    @Override
+    public Optional<TableMetadata> getTableMetadataFromCache(TablePath 
tablePath) {
+        TableInfo tableInfo = metadataManager.getTable(tablePath);
+        long tableId = ctx.getTableIdByPath(tablePath);
+        List<BucketMetadata> bucketMetadataList;
+        if (tableId == TableInfo.UNKNOWN_TABLE_ID) {
+            return Optional.empty();
+        }
+        bucketMetadataList =
+                getBucketMetadataFromContext(ctx, tableId, null, 
ctx.getTableAssignment(tableId));
+        return Optional.of(new TableMetadata(tableInfo, bucketMetadataList));
+    }
+
+    @Override
+    public CompletableFuture<TableMetadata> getTableMetadataFromZk(TablePath 
tablePath) {
+        TableInfo tableInfo = metadataManager.getTable(tablePath);
+        return RpcServiceBase.getTableMetadataFromZkAsync(

Review Comment:
   `RpcServiceBase.getTableMetadataFromZkAsync` is a purely zk operation, and 
is not related to `RpcServiceBase`, we should put it in `ZookeeperClient`. The 
same to the `getTableMetadataFromZkAsync` and 
`batchGetPartitionMetadataFromZkAsync`. 



##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.RpcServiceBase;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/** The coordinator metadata function provider. */
+public class CoordinatorMetadataFunctionProvider implements 
MetadataFunctionProvider {
+
+    private final ZooKeeperClient zkClient;
+
+    private final CoordinatorMetadataCache metadataCache;

Review Comment:
   The `CoordinatorMetadataCache` is not used here. 
   



##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.RpcServiceBase;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/** The coordinator metadata function provider. */
+public class CoordinatorMetadataFunctionProvider implements 
MetadataFunctionProvider {
+
+    private final ZooKeeperClient zkClient;
+
+    private final CoordinatorMetadataCache metadataCache;
+
+    private final CoordinatorContext ctx;
+
+    private final MetadataManager metadataManager;
+
+    public CoordinatorMetadataFunctionProvider(
+            ZooKeeperClient zkClient,
+            CoordinatorMetadataCache metadataCache,
+            CoordinatorContext ctx,
+            MetadataManager metadataManager) {
+        this.zkClient = zkClient;
+        this.metadataCache = metadataCache;
+        this.ctx = ctx;
+        this.metadataManager = metadataManager;
+    }
+
+    @Override
+    public Optional<TableMetadata> getTableMetadataFromCache(TablePath 
tablePath) {
+        TableInfo tableInfo = metadataManager.getTable(tablePath);
+        long tableId = ctx.getTableIdByPath(tablePath);
+        List<BucketMetadata> bucketMetadataList;
+        if (tableId == TableInfo.UNKNOWN_TABLE_ID) {
+            return Optional.empty();
+        }
+        bucketMetadataList =
+                getBucketMetadataFromContext(ctx, tableId, null, 
ctx.getTableAssignment(tableId));
+        return Optional.of(new TableMetadata(tableInfo, bucketMetadataList));
+    }
+
+    @Override
+    public CompletableFuture<TableMetadata> getTableMetadataFromZk(TablePath 
tablePath) {
+        TableInfo tableInfo = metadataManager.getTable(tablePath);
+        return RpcServiceBase.getTableMetadataFromZkAsync(
+                        zkClient, tablePath, tableInfo.getTableId(), 
tableInfo.isPartitioned())
+                .thenApply(bucketMetadata -> new TableMetadata(tableInfo, 
bucketMetadata));
+    }
+
+    @Override
+    public Optional<PhysicalTablePath> getPhysicalTablePathFromCache(long 
partitionId) {
+        return ctx.getPhysicalTablePath(partitionId);
+    }
+
+    @Override
+    public Optional<PartitionMetadata> getPartitionMetadataFromCache(
+            PhysicalTablePath partitionPath) {
+        TablePath tablePath =
+                new TablePath(partitionPath.getDatabaseName(), 
partitionPath.getTableName());

Review Comment:
   ```suggestion
           TablePath tablePath = partitionPath.getTablePath();
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java:
##########
@@ -122,18 +120,12 @@ public TableMetadata getTableMetadata(TablePath 
tablePath) {
         OptionalLong tableIdOpt = snapshot.getTableId(tablePath);
         List<BucketMetadata> bucketMetadataList;
         if (!tableIdOpt.isPresent()) {
-            // TODO no need to get assignment from zk if refactor client 
metadata cache. Trace by
-            // https://github.com/apache/fluss/issues/483
-            // get table assignment from zk.
-            bucketMetadataList =
-                    getTableMetadataFromZk(
-                            zkClient, tablePath, tableInfo.getTableId(), 
tableInfo.isPartitioned());
-        } else {
-            // get table assignment from cache.
-            bucketMetadataList =
-                    new ArrayList<>(
-                            
snapshot.getBucketMetadataForTable(tableIdOpt.getAsLong()).values());
+
+            return null;

Review Comment:
   Add `@Nullable` annotation to the return type of the method, or return 
`Optional` directly (it seems it is used as `Optional` later). 



##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java:
##########
@@ -155,9 +147,8 @@ public PartitionMetadata 
getPartitionMetadata(PhysicalTablePath partitionPath) {
                     partitionId,
                     new 
ArrayList<>(snapshot.getBucketMetadataForPartition(partitionId).values()));
         } else {
-            // TODO no need to get assignment from zk if refactor client 
metadata cache. Trace by
-            // https://github.com/apache/fluss/issues/483
-            return getPartitionMetadataFromZk(partitionPath, zkClient);
+
+            return null;

Review Comment:
   Add `@Nullable` annotation to the return type of the method, or return 
`Optional` directly (it seems it is used as `Optional` later). 



##########
fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.PartitionNotExistException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbPhysicalTablePath;
+import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.netty.server.Session;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.authorizer.Authorizer;
+import org.apache.fluss.server.metadata.BucketMetadata;
+import org.apache.fluss.server.metadata.MetadataFunctionProvider;
+import org.apache.fluss.server.metadata.PartitionMetadata;
+import org.apache.fluss.server.metadata.ServerMetadataCache;
+import org.apache.fluss.server.metadata.TableMetadata;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketAssignment;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.PartitionAssignment;
+import org.apache.fluss.server.zk.data.TableAssignment;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit test for {@link RpcServiceBase} async metadata methods. */
+class RpcServiceBaseTest {
+
+    private ZooKeeperClient mockZkClient;
+    private TablePath testTablePath;
+    private PhysicalTablePath testPartitionPath;
+    private long testTableId;
+    private long testPartitionId;
+    private List<BucketMetadata> testBucketMetadata;
+
+    @BeforeEach
+    void beforeEach() {
+        mockZkClient = mock(ZooKeeperClient.class);
+
+        testTablePath = TablePath.of("testDb", "testTable");
+        testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1");
+        testTableId = 1L;
+        testPartitionId = 100L;
+        testBucketMetadata =
+                Lists.newArrayList(
+                        new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)),
+                        new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6)));
+    }
+
+    private Session createTestSession() throws java.net.UnknownHostException {
+        return new Session(
+                (short) 1,
+                "default",
+                false,
+                java.net.InetAddress.getByName("127.0.0.1"),
+                FlussPrincipal.ANY);
+    }
+
+    // A test implementation of RpcServiceBase to access the protected 
processMetadataRequest method
+    private static class TestRpcServiceBase extends RpcServiceBase {
+        public TestRpcServiceBase() {
+            super(null, null, null, null, null);
+        }
+
+        @Override
+        public void processMetadataRequest(

Review Comment:
   Why override this method? It seems we didn't do anything except calling 
parent method?



##########
fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.RpcServiceBase;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/** The coordinator metadata function provider. */
+public class CoordinatorMetadataFunctionProvider implements 
MetadataFunctionProvider {
+
+    private final ZooKeeperClient zkClient;
+
+    private final CoordinatorMetadataCache metadataCache;
+
+    private final CoordinatorContext ctx;
+
+    private final MetadataManager metadataManager;
+
+    public CoordinatorMetadataFunctionProvider(
+            ZooKeeperClient zkClient,
+            CoordinatorMetadataCache metadataCache,
+            CoordinatorContext ctx,
+            MetadataManager metadataManager) {
+        this.zkClient = zkClient;
+        this.metadataCache = metadataCache;
+        this.ctx = ctx;
+        this.metadataManager = metadataManager;
+    }
+
+    @Override
+    public Optional<TableMetadata> getTableMetadataFromCache(TablePath 
tablePath) {
+        TableInfo tableInfo = metadataManager.getTable(tablePath);

Review Comment:
   1. we can move this after the `if` condition, to reduce the cost of zk 
operation. 
   2. this is still reading from zk instead of from cache, I think we can get 
TableInfo from `ctx` as well 



##########
fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.PartitionNotExistException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbPhysicalTablePath;
+import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.netty.server.Session;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.authorizer.Authorizer;
+import org.apache.fluss.server.metadata.BucketMetadata;
+import org.apache.fluss.server.metadata.MetadataFunctionProvider;
+import org.apache.fluss.server.metadata.PartitionMetadata;
+import org.apache.fluss.server.metadata.ServerMetadataCache;
+import org.apache.fluss.server.metadata.TableMetadata;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketAssignment;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.PartitionAssignment;
+import org.apache.fluss.server.zk.data.TableAssignment;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit test for {@link RpcServiceBase} async metadata methods. */
+class RpcServiceBaseTest {
+
+    private ZooKeeperClient mockZkClient;
+    private TablePath testTablePath;
+    private PhysicalTablePath testPartitionPath;
+    private long testTableId;
+    private long testPartitionId;
+    private List<BucketMetadata> testBucketMetadata;
+
+    @BeforeEach
+    void beforeEach() {
+        mockZkClient = mock(ZooKeeperClient.class);
+
+        testTablePath = TablePath.of("testDb", "testTable");
+        testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1");
+        testTableId = 1L;
+        testPartitionId = 100L;
+        testBucketMetadata =
+                Lists.newArrayList(
+                        new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)),
+                        new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6)));
+    }
+
+    private Session createTestSession() throws java.net.UnknownHostException {
+        return new Session(
+                (short) 1,
+                "default",
+                false,
+                java.net.InetAddress.getByName("127.0.0.1"),
+                FlussPrincipal.ANY);
+    }
+
+    // A test implementation of RpcServiceBase to access the protected 
processMetadataRequest method
+    private static class TestRpcServiceBase extends RpcServiceBase {
+        public TestRpcServiceBase() {
+            super(null, null, null, null, null);
+        }
+
+        @Override
+        public void processMetadataRequest(
+                MetadataRequest request,
+                String listenerName,
+                Session session,
+                Authorizer authorizer,
+                ServerMetadataCache metadataCache,
+                MetadataFunctionProvider functionProvider,
+                CompletableFuture<MetadataResponse> responseFuture) {
+            super.processMetadataRequest(
+                    request,
+                    listenerName,
+                    session,
+                    authorizer,
+                    metadataCache,
+                    functionProvider,
+                    responseFuture);
+        }
+
+        @Override
+        public String name() {
+            return "TestRpcService";
+        }
+
+        @Override
+        public void shutdown() {}
+
+        @Override
+        public CompletableFuture<MetadataResponse> metadata(MetadataRequest 
request) {
+            return null;
+        }
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_Success() throws Exception {

Review Comment:
   If we decide to move `RpcServiceBase.getTableMetadataFromZkAsync` and 
related methods to `ZookeeperClient`, we should also move all the related test 
methods to `ZookeeperClientTest` where we can reuse the embedded zk cluster for 
testing. 



##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -143,6 +155,16 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     private long tokenLastUpdateTimeMs = 0;
     private ObtainedSecurityToken securityToken = null;
 
+    private static final ExecutorService ZK_META_UPDATE_EXECUTOR =
+            Executors.newFixedThreadPool(8, new 
ExecutorThreadFactory("zk-metadata-update-"));
+
+    private static final ConcurrentHashMap<
+                    TableMetadataKey, CompletableFuture<List<BucketMetadata>>>
+            PENDING_TABLE_METADATA_FROM_ZK_FUTURES = 
MapUtils.newConcurrentHashMap();
+
+    private static final ConcurrentHashMap<PhysicalTablePath, 
CompletableFuture<PartitionMetadata>>
+            PENDING_PARTITION_METADATA_FROM_ZK_FUTURES = 
MapUtils.newConcurrentHashMap();
+

Review Comment:
   We should avoid introducing global resources which is hard to maintain 
(resource leak in production) and mock (override in tests). In order to support 
async zk operations, we can learn from Kafka that:
   
   1. Use the default executor service in zk client 
(`curatorFramework.inBackground(...)` in our case)
   2. Extend the `ZookeeperClient` to support **async batch** operations. We 
have already introduced a basic implementation in 
https://github.com/apache/fluss/commit/8b9d2937c35f47dce0d034785ee684d49ba04bc3,
 which is easy to extend for more methods. 
   3. Use the **async batch** API of `ZookeeperClient` for fetching metadatas 
of table/partition lists. 
   
   



##########
fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.PartitionNotExistException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbPhysicalTablePath;
+import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.netty.server.Session;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.authorizer.Authorizer;
+import org.apache.fluss.server.metadata.BucketMetadata;
+import org.apache.fluss.server.metadata.MetadataFunctionProvider;
+import org.apache.fluss.server.metadata.PartitionMetadata;
+import org.apache.fluss.server.metadata.ServerMetadataCache;
+import org.apache.fluss.server.metadata.TableMetadata;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketAssignment;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.PartitionAssignment;
+import org.apache.fluss.server.zk.data.TableAssignment;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit test for {@link RpcServiceBase} async metadata methods. */
+class RpcServiceBaseTest {
+
+    private ZooKeeperClient mockZkClient;
+    private TablePath testTablePath;
+    private PhysicalTablePath testPartitionPath;
+    private long testTableId;
+    private long testPartitionId;
+    private List<BucketMetadata> testBucketMetadata;
+
+    @BeforeEach
+    void beforeEach() {
+        mockZkClient = mock(ZooKeeperClient.class);
+
+        testTablePath = TablePath.of("testDb", "testTable");
+        testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1");
+        testTableId = 1L;
+        testPartitionId = 100L;
+        testBucketMetadata =
+                Lists.newArrayList(
+                        new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)),
+                        new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6)));
+    }
+
+    private Session createTestSession() throws java.net.UnknownHostException {
+        return new Session(
+                (short) 1,
+                "default",
+                false,
+                java.net.InetAddress.getByName("127.0.0.1"),
+                FlussPrincipal.ANY);
+    }
+
+    // A test implementation of RpcServiceBase to access the protected 
processMetadataRequest method
+    private static class TestRpcServiceBase extends RpcServiceBase {
+        public TestRpcServiceBase() {
+            super(null, null, null, null, null);
+        }
+
+        @Override
+        public void processMetadataRequest(
+                MetadataRequest request,
+                String listenerName,
+                Session session,
+                Authorizer authorizer,
+                ServerMetadataCache metadataCache,
+                MetadataFunctionProvider functionProvider,
+                CompletableFuture<MetadataResponse> responseFuture) {
+            super.processMetadataRequest(
+                    request,
+                    listenerName,
+                    session,
+                    authorizer,
+                    metadataCache,
+                    functionProvider,
+                    responseFuture);
+        }
+
+        @Override
+        public String name() {
+            return "TestRpcService";
+        }
+
+        @Override
+        public void shutdown() {}
+
+        @Override
+        public CompletableFuture<MetadataResponse> metadata(MetadataRequest 
request) {
+            return null;
+        }
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_Success() throws Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+        bucketAssignments.put(1, BucketAssignment.of(2, 3, 4));
+        TableAssignment tableAssignment = new 
TableAssignment(bucketAssignments);
+
+        // Mock ZooKeeper responses
+        
when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment));

Review Comment:
   Please do not use Mockito, it increases maintenance overhead in the long 
term, which should be avoided. In this case, we can simply register metadata 
into zk without mocking the zk client. 



##########
fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.PartitionNotExistException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbPhysicalTablePath;
+import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.netty.server.Session;
+import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.authorizer.Authorizer;
+import org.apache.fluss.server.metadata.BucketMetadata;
+import org.apache.fluss.server.metadata.MetadataFunctionProvider;
+import org.apache.fluss.server.metadata.PartitionMetadata;
+import org.apache.fluss.server.metadata.ServerMetadataCache;
+import org.apache.fluss.server.metadata.TableMetadata;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketAssignment;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.PartitionAssignment;
+import org.apache.fluss.server.zk.data.TableAssignment;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit test for {@link RpcServiceBase} async metadata methods. */
+class RpcServiceBaseTest {
+
+    private ZooKeeperClient mockZkClient;
+    private TablePath testTablePath;
+    private PhysicalTablePath testPartitionPath;
+    private long testTableId;
+    private long testPartitionId;
+    private List<BucketMetadata> testBucketMetadata;
+
+    @BeforeEach
+    void beforeEach() {
+        mockZkClient = mock(ZooKeeperClient.class);
+
+        testTablePath = TablePath.of("testDb", "testTable");
+        testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1");
+        testTableId = 1L;
+        testPartitionId = 100L;
+        testBucketMetadata =
+                Lists.newArrayList(
+                        new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)),
+                        new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6)));
+    }
+
+    private Session createTestSession() throws java.net.UnknownHostException {
+        return new Session(
+                (short) 1,
+                "default",
+                false,
+                java.net.InetAddress.getByName("127.0.0.1"),
+                FlussPrincipal.ANY);
+    }
+
+    // A test implementation of RpcServiceBase to access the protected 
processMetadataRequest method
+    private static class TestRpcServiceBase extends RpcServiceBase {
+        public TestRpcServiceBase() {
+            super(null, null, null, null, null);
+        }
+
+        @Override
+        public void processMetadataRequest(
+                MetadataRequest request,
+                String listenerName,
+                Session session,
+                Authorizer authorizer,
+                ServerMetadataCache metadataCache,
+                MetadataFunctionProvider functionProvider,
+                CompletableFuture<MetadataResponse> responseFuture) {
+            super.processMetadataRequest(
+                    request,
+                    listenerName,
+                    session,
+                    authorizer,
+                    metadataCache,
+                    functionProvider,
+                    responseFuture);
+        }
+
+        @Override
+        public String name() {
+            return "TestRpcService";
+        }
+
+        @Override
+        public void shutdown() {}
+
+        @Override
+        public CompletableFuture<MetadataResponse> metadata(MetadataRequest 
request) {
+            return null;
+        }
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_Success() throws Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+        bucketAssignments.put(1, BucketAssignment.of(2, 3, 4));
+        TableAssignment tableAssignment = new 
TableAssignment(bucketAssignments);
+
+        // Mock ZooKeeper responses
+        
when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment));
+        when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, null, 
0)))
+                .thenReturn(Optional.of(new LeaderAndIsr(1, 1, 
Arrays.asList(1, 2, 3), 1, 1)));
+        when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, null, 
1)))
+                .thenReturn(Optional.of(new LeaderAndIsr(2, 1, 
Arrays.asList(2, 3, 4), 1, 1)));
+
+        // Execute test
+        CompletableFuture<List<BucketMetadata>> future =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+
+        // Verify results
+        List<BucketMetadata> result = future.get();
+        assertThat(result).hasSize(2);
+
+        BucketMetadata bucket0 = result.get(0);
+        assertThat(bucket0.getBucketId()).isEqualTo(0);
+        assertThat(bucket0.getLeaderId()).hasValue(1);
+        assertThat(bucket0.getReplicas()).containsExactly(1, 2, 3);
+
+        BucketMetadata bucket1 = result.get(1);
+        assertThat(bucket1.getBucketId()).isEqualTo(1);
+        assertThat(bucket1.getLeaderId()).hasValue(2);
+        assertThat(bucket1.getReplicas()).containsExactly(2, 3, 4);
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_NoTableAssignment() throws Exception {
+        // Mock ZooKeeper to return empty table assignment
+        
when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.empty());
+
+        // Execute test
+        CompletableFuture<List<BucketMetadata>> future =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+
+        // Verify results - should return empty list for non-partitioned table
+        List<BucketMetadata> result = future.get();
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void 
testGetTableMetadataFromZkAsync_NoTableAssignmentForPartitionedTable() throws 
Exception {
+        // Mock ZooKeeper to return empty table assignment for partitioned 
table
+        
when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.empty());
+
+        // Execute test - should log warning but not throw exception
+        CompletableFuture<List<BucketMetadata>> future =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, true);
+
+        // Verify results - should return empty list
+        List<BucketMetadata> result = future.get();
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_ZooKeeperException() throws Exception 
{
+        // Mock ZooKeeper to throw exception
+        when(mockZkClient.getTableAssignment(testTableId))
+                .thenThrow(new RuntimeException("ZK connection failed"));
+
+        // Execute test
+        CompletableFuture<List<BucketMetadata>> future =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+
+        // Verify exception is thrown
+        assertThatThrownBy(() -> future.get())
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(FlussRuntimeException.class)
+                .hasMessageContaining("Failed to get metadata for table");
+    }
+
+    @Test
+    void testGetPartitionMetadataFromZkAsync_Success() throws Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+        bucketAssignments.put(1, BucketAssignment.of(2, 3, 4));
+        TableAssignment tableAssignment = new 
TableAssignment(bucketAssignments);
+
+        // Mock ZooKeeper responses
+        when(mockZkClient.getPartition(testTablePath, "partition1"))
+                .thenReturn(Optional.of(new TablePartition(testTableId, 
testPartitionId)));
+        when(mockZkClient.getPartitionAssignment(testPartitionId))
+                .thenReturn(Optional.of(new PartitionAssignment(testTableId, 
bucketAssignments)));
+        when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, 
testPartitionId, 0)))
+                .thenReturn(Optional.of(new LeaderAndIsr(1, 1, 
Arrays.asList(1, 2, 3), 1, 1)));
+        when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, 
testPartitionId, 1)))
+                .thenReturn(Optional.of(new LeaderAndIsr(2, 1, 
Arrays.asList(2, 3, 4), 1, 1)));
+
+        // Execute test
+        CompletableFuture<PartitionMetadata> future =
+                
RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient);
+
+        // Verify results
+        PartitionMetadata result = future.get();
+        assertThat(result.getPartitionId()).isEqualTo(testPartitionId);
+        assertThat(result.getTableId()).isEqualTo(testTableId);
+        assertThat(result.getBucketMetadataList()).hasSize(2);
+
+        BucketMetadata bucket0 = result.getBucketMetadataList().get(0);
+        assertThat(bucket0.getBucketId()).isEqualTo(0);
+        assertThat(bucket0.getLeaderId()).hasValue(1);
+        assertThat(bucket0.getReplicas()).containsExactly(1, 2, 3);
+
+        BucketMetadata bucket1 = result.getBucketMetadataList().get(1);
+        assertThat(bucket1.getBucketId()).isEqualTo(1);
+        assertThat(bucket1.getLeaderId()).hasValue(2);
+        assertThat(bucket1.getReplicas()).containsExactly(2, 3, 4);
+    }
+
+    @Test
+    void testGetPartitionMetadataFromZkAsync_PartitionNotExist() throws 
Exception {
+        // Mock ZooKeeper to return empty partition
+        when(mockZkClient.getPartition(testTablePath, 
"nonexistent")).thenReturn(Optional.empty());
+
+        // Execute test
+        PhysicalTablePath nonexistentPath = 
PhysicalTablePath.of(testTablePath, "nonexistent");
+        CompletableFuture<PartitionMetadata> future =
+                
RpcServiceBase.getPartitionMetadataFromZkAsync(nonexistentPath, mockZkClient);
+
+        // Verify exception is thrown
+        assertThatThrownBy(future::get)
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(PartitionNotExistException.class);
+    }
+
+    @Test
+    void testConcurrentGetTableMetadataFromZkAsync_SameKey() throws Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+        TableAssignment tableAssignment = new 
TableAssignment(bucketAssignments);
+
+        // Mock ZooKeeper responses
+        
when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment));
+        when(mockZkClient.getLeaderAndIsr(any(TableBucket.class)))
+                .thenReturn(Optional.of(new LeaderAndIsr(1, 1, 
Arrays.asList(1, 2, 3), 1, 1)));
+
+        // Execute concurrent requests with same key
+        CompletableFuture<List<BucketMetadata>> future1 =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+        CompletableFuture<List<BucketMetadata>> future2 =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+
+        // Verify both requests return the same result
+        List<BucketMetadata> result1 = future1.get();
+        List<BucketMetadata> result2 = future2.get();
+        assertThat(result1).isEqualTo(result2);
+        assertThat(result1).hasSize(1);
+    }
+
+    @Test
+    void testConcurrentGetPartitionMetadataFromZkAsync_SameKey() throws 
Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+
+        // Mock ZooKeeper responses
+        when(mockZkClient.getPartition(testTablePath, "partition1"))
+                .thenReturn(Optional.of(new TablePartition(testPartitionId, 
testTableId)));
+        when(mockZkClient.getPartitionAssignment(testPartitionId))
+                .thenReturn(Optional.of(new PartitionAssignment(testTableId, 
bucketAssignments)));
+        when(mockZkClient.getLeaderAndIsr(any(TableBucket.class)))
+                .thenReturn(Optional.of(new LeaderAndIsr(1, 1, 
Arrays.asList(1, 2, 3), 1, 1)));
+
+        // Execute concurrent requests with same key
+        CompletableFuture<PartitionMetadata> future1 =
+                
RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient);
+        CompletableFuture<PartitionMetadata> future2 =
+                
RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient);
+
+        // Verify both requests return the same result
+        PartitionMetadata result1 = future1.get();
+        PartitionMetadata result2 = future2.get();
+        assertThat(result1).isEqualTo(result2);
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_MapEntryRemovedOnSuccess() throws 
Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+        TableAssignment tableAssignment = new 
TableAssignment(bucketAssignments);
+
+        // Mock ZooKeeper responses
+        
when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment));
+        when(mockZkClient.getLeaderAndIsr(any(TableBucket.class)))
+                .thenReturn(Optional.of(new LeaderAndIsr(1, 1, 
Arrays.asList(1, 2, 3), 1, 1)));
+
+        // Check that the map is initially empty
+        assertThat(getPendingTableMetadataFutureMap()).isEmpty();
+
+        // Execute test
+        CompletableFuture<List<BucketMetadata>> future =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+
+        // The entry should be added to the map
+        assertThat(getPendingTableMetadataFutureMap()).hasSize(1);
+
+        // Wait for completion
+        future.get();
+
+        // After completion, the entry should be removed from the map
+        assertThat(getPendingTableMetadataFutureMap()).isEmpty();
+    }
+
+    @Test
+    void testGetTableMetadataFromZkAsync_MapEntryRemovedOnFailure() throws 
Exception {
+        // Mock ZooKeeper to throw exception
+        when(mockZkClient.getTableAssignment(testTableId))
+                .thenThrow(new RuntimeException("ZK connection failed"));
+
+        // Check that the map is initially empty
+        assertThat(getPendingTableMetadataFutureMap()).isEmpty();
+
+        // Execute test
+        CompletableFuture<List<BucketMetadata>> future =
+                RpcServiceBase.getTableMetadataFromZkAsync(
+                        mockZkClient, testTablePath, testTableId, false);
+
+        // The entry should be added to the map
+        assertThat(getPendingTableMetadataFutureMap()).hasSize(1);
+
+        // Wait for completion and expect exception
+        assertThatThrownBy(future::get)
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(FlussRuntimeException.class);
+
+        // After failure, the entry should be removed from the map
+        assertThat(getPendingTableMetadataFutureMap()).isEmpty();
+    }
+
+    @Test
+    void testGetPartitionMetadataFromZkAsync_MapEntryRemovedOnSuccess() throws 
Exception {
+        // Setup test data
+        Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>();
+        bucketAssignments.put(0, BucketAssignment.of(1, 2, 3));
+        TableAssignment tableAssignment = new 
TableAssignment(bucketAssignments);
+
+        // Mock ZooKeeper responses
+        when(mockZkClient.getPartition(testTablePath, "partition1"))
+                .thenReturn(Optional.of(new TablePartition(testPartitionId, 
testTableId)));
+        when(mockZkClient.getPartitionAssignment(testPartitionId))
+                .thenReturn(Optional.of(new PartitionAssignment(testTableId, 
bucketAssignments)));
+        when(mockZkClient.getLeaderAndIsr(any(TableBucket.class)))
+                .thenReturn(Optional.of(new LeaderAndIsr(1, 1, 
Arrays.asList(1, 2, 3), 1, 1)));
+
+        // Check that the map is initially empty
+        assertThat(getPendingPartitionMetadataFutureMap()).isEmpty();
+
+        // Execute test
+        CompletableFuture<PartitionMetadata> future =
+                
RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient);
+
+        // The entry should be added to the map
+        assertThat(getPendingPartitionMetadataFutureMap()).hasSize(1);
+
+        // Wait for completion
+        future.get();
+
+        // After completion, the entry should be removed from the map
+        assertThat(getPendingPartitionMetadataFutureMap()).isEmpty();
+    }
+
+    @Test
+    void testGetPartitionMetadataFromZkAsync_MapEntryRemovedOnFailure() throws 
Exception {
+        // Mock ZooKeeper to throw exception
+        when(mockZkClient.getPartition(testTablePath, "partition1"))
+                .thenThrow(new RuntimeException("ZK connection failed"));
+
+        // Check that the map is initially empty
+        assertThat(getPendingPartitionMetadataFutureMap()).isEmpty();
+
+        // Execute test
+        CompletableFuture<PartitionMetadata> future =
+                
RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient);
+
+        // The entry should be added to the map
+        assertThat(getPendingPartitionMetadataFutureMap()).hasSize(1);
+
+        // Wait for completion and expect exception
+        assertThatThrownBy(future::get)
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(FlussRuntimeException.class);
+
+        // After failure, the entry should be removed from the map
+        assertThat(getPendingPartitionMetadataFutureMap()).isEmpty();
+    }
+
+    @Test
+    void testProcessMetadataRequest_TableFromCache() throws Exception {
+        // Setup mocks
+        MetadataRequest request = new MetadataRequest();
+        PbTablePath pbTablePath = new 
PbTablePath().setDatabaseName("db").setTableName("table");
+        request.addTablePath().copyFrom(pbTablePath);
+
+        String listenerName = "default";
+        Session session = createTestSession();
+        Authorizer authorizer = mock(Authorizer.class);
+        ServerMetadataCache metadataCache = mock(ServerMetadataCache.class);
+        MetadataFunctionProvider functionProvider = 
mock(MetadataFunctionProvider.class);
+
+        when(authorizer.isAuthorized(any(), any(), any())).thenReturn(true);
+        TablePath tablePath = TablePath.of("db", "table");
+        TableInfo partitionTableInfo =
+                TableInfo.of(
+                        tablePath,
+                        testTableId,
+                        1,
+                        DATA1_PARTITIONED_TABLE_DESCRIPTOR,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis());
+        TableMetadata tableMetadata =
+                new TableMetadata(partitionTableInfo, Collections.emptyList());
+        when(functionProvider.getTableMetadataFromCache(eq(tablePath)))
+                .thenReturn(Optional.of(tableMetadata));

Review Comment:
   Please do no use Mockito, it increases maintenance overhead in the long 
term, which should be avoided. In this case, we can simply pass ` authorizer` 
as `null`, and create a testing implementation for `MetadataFunctionProvider` 
and `ServerMetadataCache`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to