wuchong commented on code in PR #1463: URL: https://github.com/apache/fluss/pull/1463#discussion_r2354709190
########## fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.RpcServiceBase; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.MetadataManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** The coordinator metadata function provider. */ +public class CoordinatorMetadataFunctionProvider implements MetadataFunctionProvider { Review Comment: We can simplify the interface to `MetadataProvider` and `CoordinatorMetadataProvider`, `TabletServerMetadataProvider`. ########## fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.RpcServiceBase; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.MetadataManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** The coordinator metadata function provider. */ +public class CoordinatorMetadataFunctionProvider implements MetadataFunctionProvider { + + private final ZooKeeperClient zkClient; + + private final CoordinatorMetadataCache metadataCache; + + private final CoordinatorContext ctx; + + private final MetadataManager metadataManager; + + public CoordinatorMetadataFunctionProvider( + ZooKeeperClient zkClient, + CoordinatorMetadataCache metadataCache, + CoordinatorContext ctx, + MetadataManager metadataManager) { + this.zkClient = zkClient; + this.metadataCache = metadataCache; + this.ctx = ctx; + this.metadataManager = metadataManager; + } + + @Override + public Optional<TableMetadata> getTableMetadataFromCache(TablePath tablePath) { + TableInfo tableInfo = metadataManager.getTable(tablePath); + long tableId = ctx.getTableIdByPath(tablePath); + List<BucketMetadata> bucketMetadataList; + if (tableId == TableInfo.UNKNOWN_TABLE_ID) { + return Optional.empty(); + } + bucketMetadataList = + getBucketMetadataFromContext(ctx, tableId, null, ctx.getTableAssignment(tableId)); + return Optional.of(new TableMetadata(tableInfo, bucketMetadataList)); + } + + @Override + public CompletableFuture<TableMetadata> getTableMetadataFromZk(TablePath tablePath) { + TableInfo tableInfo = metadataManager.getTable(tablePath); + return RpcServiceBase.getTableMetadataFromZkAsync( Review Comment: `RpcServiceBase.getTableMetadataFromZkAsync` is a purely zk operation, and is not related to `RpcServiceBase`, we should put it in `ZookeeperClient`. The same to the `getTableMetadataFromZkAsync` and `batchGetPartitionMetadataFromZkAsync`. ########## fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.RpcServiceBase; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.MetadataManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** The coordinator metadata function provider. */ +public class CoordinatorMetadataFunctionProvider implements MetadataFunctionProvider { + + private final ZooKeeperClient zkClient; + + private final CoordinatorMetadataCache metadataCache; Review Comment: The `CoordinatorMetadataCache` is not used here. ########## fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.RpcServiceBase; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.MetadataManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** The coordinator metadata function provider. */ +public class CoordinatorMetadataFunctionProvider implements MetadataFunctionProvider { + + private final ZooKeeperClient zkClient; + + private final CoordinatorMetadataCache metadataCache; + + private final CoordinatorContext ctx; + + private final MetadataManager metadataManager; + + public CoordinatorMetadataFunctionProvider( + ZooKeeperClient zkClient, + CoordinatorMetadataCache metadataCache, + CoordinatorContext ctx, + MetadataManager metadataManager) { + this.zkClient = zkClient; + this.metadataCache = metadataCache; + this.ctx = ctx; + this.metadataManager = metadataManager; + } + + @Override + public Optional<TableMetadata> getTableMetadataFromCache(TablePath tablePath) { + TableInfo tableInfo = metadataManager.getTable(tablePath); + long tableId = ctx.getTableIdByPath(tablePath); + List<BucketMetadata> bucketMetadataList; + if (tableId == TableInfo.UNKNOWN_TABLE_ID) { + return Optional.empty(); + } + bucketMetadataList = + getBucketMetadataFromContext(ctx, tableId, null, ctx.getTableAssignment(tableId)); + return Optional.of(new TableMetadata(tableInfo, bucketMetadataList)); + } + + @Override + public CompletableFuture<TableMetadata> getTableMetadataFromZk(TablePath tablePath) { + TableInfo tableInfo = metadataManager.getTable(tablePath); + return RpcServiceBase.getTableMetadataFromZkAsync( + zkClient, tablePath, tableInfo.getTableId(), tableInfo.isPartitioned()) + .thenApply(bucketMetadata -> new TableMetadata(tableInfo, bucketMetadata)); + } + + @Override + public Optional<PhysicalTablePath> getPhysicalTablePathFromCache(long partitionId) { + return ctx.getPhysicalTablePath(partitionId); + } + + @Override + public Optional<PartitionMetadata> getPartitionMetadataFromCache( + PhysicalTablePath partitionPath) { + TablePath tablePath = + new TablePath(partitionPath.getDatabaseName(), partitionPath.getTableName()); Review Comment: ```suggestion TablePath tablePath = partitionPath.getTablePath(); ``` ########## fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java: ########## @@ -122,18 +120,12 @@ public TableMetadata getTableMetadata(TablePath tablePath) { OptionalLong tableIdOpt = snapshot.getTableId(tablePath); List<BucketMetadata> bucketMetadataList; if (!tableIdOpt.isPresent()) { - // TODO no need to get assignment from zk if refactor client metadata cache. Trace by - // https://github.com/apache/fluss/issues/483 - // get table assignment from zk. - bucketMetadataList = - getTableMetadataFromZk( - zkClient, tablePath, tableInfo.getTableId(), tableInfo.isPartitioned()); - } else { - // get table assignment from cache. - bucketMetadataList = - new ArrayList<>( - snapshot.getBucketMetadataForTable(tableIdOpt.getAsLong()).values()); + + return null; Review Comment: Add `@Nullable` annotation to the return type of the method, or return `Optional` directly (it seems it is used as `Optional` later). ########## fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java: ########## @@ -155,9 +147,8 @@ public PartitionMetadata getPartitionMetadata(PhysicalTablePath partitionPath) { partitionId, new ArrayList<>(snapshot.getBucketMetadataForPartition(partitionId).values())); } else { - // TODO no need to get assignment from zk if refactor client metadata cache. Trace by - // https://github.com/apache/fluss/issues/483 - return getPartitionMetadataFromZk(partitionPath, zkClient); + + return null; Review Comment: Add `@Nullable` annotation to the return type of the method, or return `Optional` directly (it seems it is used as `Optional` later). ########## fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.PbPhysicalTablePath; +import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.netty.server.Session; +import org.apache.fluss.security.acl.FlussPrincipal; +import org.apache.fluss.server.authorizer.Authorizer; +import org.apache.fluss.server.metadata.BucketMetadata; +import org.apache.fluss.server.metadata.MetadataFunctionProvider; +import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.ServerMetadataCache; +import org.apache.fluss.server.metadata.TableMetadata; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit test for {@link RpcServiceBase} async metadata methods. */ +class RpcServiceBaseTest { + + private ZooKeeperClient mockZkClient; + private TablePath testTablePath; + private PhysicalTablePath testPartitionPath; + private long testTableId; + private long testPartitionId; + private List<BucketMetadata> testBucketMetadata; + + @BeforeEach + void beforeEach() { + mockZkClient = mock(ZooKeeperClient.class); + + testTablePath = TablePath.of("testDb", "testTable"); + testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1"); + testTableId = 1L; + testPartitionId = 100L; + testBucketMetadata = + Lists.newArrayList( + new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)), + new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6))); + } + + private Session createTestSession() throws java.net.UnknownHostException { + return new Session( + (short) 1, + "default", + false, + java.net.InetAddress.getByName("127.0.0.1"), + FlussPrincipal.ANY); + } + + // A test implementation of RpcServiceBase to access the protected processMetadataRequest method + private static class TestRpcServiceBase extends RpcServiceBase { + public TestRpcServiceBase() { + super(null, null, null, null, null); + } + + @Override + public void processMetadataRequest( Review Comment: Why override this method? It seems we didn't do anything except calling parent method? ########## fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataFunctionProvider.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.RpcServiceBase; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.MetadataManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** The coordinator metadata function provider. */ +public class CoordinatorMetadataFunctionProvider implements MetadataFunctionProvider { + + private final ZooKeeperClient zkClient; + + private final CoordinatorMetadataCache metadataCache; + + private final CoordinatorContext ctx; + + private final MetadataManager metadataManager; + + public CoordinatorMetadataFunctionProvider( + ZooKeeperClient zkClient, + CoordinatorMetadataCache metadataCache, + CoordinatorContext ctx, + MetadataManager metadataManager) { + this.zkClient = zkClient; + this.metadataCache = metadataCache; + this.ctx = ctx; + this.metadataManager = metadataManager; + } + + @Override + public Optional<TableMetadata> getTableMetadataFromCache(TablePath tablePath) { + TableInfo tableInfo = metadataManager.getTable(tablePath); Review Comment: 1. we can move this after the `if` condition, to reduce the cost of zk operation. 2. this is still reading from zk instead of from cache, I think we can get TableInfo from `ctx` as well ########## fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.PbPhysicalTablePath; +import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.netty.server.Session; +import org.apache.fluss.security.acl.FlussPrincipal; +import org.apache.fluss.server.authorizer.Authorizer; +import org.apache.fluss.server.metadata.BucketMetadata; +import org.apache.fluss.server.metadata.MetadataFunctionProvider; +import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.ServerMetadataCache; +import org.apache.fluss.server.metadata.TableMetadata; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit test for {@link RpcServiceBase} async metadata methods. */ +class RpcServiceBaseTest { + + private ZooKeeperClient mockZkClient; + private TablePath testTablePath; + private PhysicalTablePath testPartitionPath; + private long testTableId; + private long testPartitionId; + private List<BucketMetadata> testBucketMetadata; + + @BeforeEach + void beforeEach() { + mockZkClient = mock(ZooKeeperClient.class); + + testTablePath = TablePath.of("testDb", "testTable"); + testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1"); + testTableId = 1L; + testPartitionId = 100L; + testBucketMetadata = + Lists.newArrayList( + new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)), + new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6))); + } + + private Session createTestSession() throws java.net.UnknownHostException { + return new Session( + (short) 1, + "default", + false, + java.net.InetAddress.getByName("127.0.0.1"), + FlussPrincipal.ANY); + } + + // A test implementation of RpcServiceBase to access the protected processMetadataRequest method + private static class TestRpcServiceBase extends RpcServiceBase { + public TestRpcServiceBase() { + super(null, null, null, null, null); + } + + @Override + public void processMetadataRequest( + MetadataRequest request, + String listenerName, + Session session, + Authorizer authorizer, + ServerMetadataCache metadataCache, + MetadataFunctionProvider functionProvider, + CompletableFuture<MetadataResponse> responseFuture) { + super.processMetadataRequest( + request, + listenerName, + session, + authorizer, + metadataCache, + functionProvider, + responseFuture); + } + + @Override + public String name() { + return "TestRpcService"; + } + + @Override + public void shutdown() {} + + @Override + public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) { + return null; + } + } + + @Test + void testGetTableMetadataFromZkAsync_Success() throws Exception { Review Comment: If we decide to move `RpcServiceBase.getTableMetadataFromZkAsync` and related methods to `ZookeeperClient`, we should also move all the related test methods to `ZookeeperClientTest` where we can reuse the embedded zk cluster for testing. ########## fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java: ########## @@ -143,6 +155,16 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR private long tokenLastUpdateTimeMs = 0; private ObtainedSecurityToken securityToken = null; + private static final ExecutorService ZK_META_UPDATE_EXECUTOR = + Executors.newFixedThreadPool(8, new ExecutorThreadFactory("zk-metadata-update-")); + + private static final ConcurrentHashMap< + TableMetadataKey, CompletableFuture<List<BucketMetadata>>> + PENDING_TABLE_METADATA_FROM_ZK_FUTURES = MapUtils.newConcurrentHashMap(); + + private static final ConcurrentHashMap<PhysicalTablePath, CompletableFuture<PartitionMetadata>> + PENDING_PARTITION_METADATA_FROM_ZK_FUTURES = MapUtils.newConcurrentHashMap(); + Review Comment: We should avoid introducing global resources which is hard to maintain (resource leak in production) and mock (override in tests). In order to support async zk operations, we can learn from Kafka that: 1. Use the default executor service in zk client (`curatorFramework.inBackground(...)` in our case) 2. Extend the `ZookeeperClient` to support **async batch** operations. We have already introduced a basic implementation in https://github.com/apache/fluss/commit/8b9d2937c35f47dce0d034785ee684d49ba04bc3, which is easy to extend for more methods. 3. Use the **async batch** API of `ZookeeperClient` for fetching metadatas of table/partition lists. ########## fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.PbPhysicalTablePath; +import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.netty.server.Session; +import org.apache.fluss.security.acl.FlussPrincipal; +import org.apache.fluss.server.authorizer.Authorizer; +import org.apache.fluss.server.metadata.BucketMetadata; +import org.apache.fluss.server.metadata.MetadataFunctionProvider; +import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.ServerMetadataCache; +import org.apache.fluss.server.metadata.TableMetadata; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit test for {@link RpcServiceBase} async metadata methods. */ +class RpcServiceBaseTest { + + private ZooKeeperClient mockZkClient; + private TablePath testTablePath; + private PhysicalTablePath testPartitionPath; + private long testTableId; + private long testPartitionId; + private List<BucketMetadata> testBucketMetadata; + + @BeforeEach + void beforeEach() { + mockZkClient = mock(ZooKeeperClient.class); + + testTablePath = TablePath.of("testDb", "testTable"); + testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1"); + testTableId = 1L; + testPartitionId = 100L; + testBucketMetadata = + Lists.newArrayList( + new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)), + new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6))); + } + + private Session createTestSession() throws java.net.UnknownHostException { + return new Session( + (short) 1, + "default", + false, + java.net.InetAddress.getByName("127.0.0.1"), + FlussPrincipal.ANY); + } + + // A test implementation of RpcServiceBase to access the protected processMetadataRequest method + private static class TestRpcServiceBase extends RpcServiceBase { + public TestRpcServiceBase() { + super(null, null, null, null, null); + } + + @Override + public void processMetadataRequest( + MetadataRequest request, + String listenerName, + Session session, + Authorizer authorizer, + ServerMetadataCache metadataCache, + MetadataFunctionProvider functionProvider, + CompletableFuture<MetadataResponse> responseFuture) { + super.processMetadataRequest( + request, + listenerName, + session, + authorizer, + metadataCache, + functionProvider, + responseFuture); + } + + @Override + public String name() { + return "TestRpcService"; + } + + @Override + public void shutdown() {} + + @Override + public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) { + return null; + } + } + + @Test + void testGetTableMetadataFromZkAsync_Success() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + bucketAssignments.put(1, BucketAssignment.of(2, 3, 4)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + + // Mock ZooKeeper responses + when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment)); Review Comment: Please do not use Mockito, it increases maintenance overhead in the long term, which should be avoided. In this case, we can simply register metadata into zk without mocking the zk client. ########## fluss-server/src/test/java/org/apache/fluss/server/RpcServiceBaseTest.java: ########## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.PbPhysicalTablePath; +import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.netty.server.Session; +import org.apache.fluss.security.acl.FlussPrincipal; +import org.apache.fluss.server.authorizer.Authorizer; +import org.apache.fluss.server.metadata.BucketMetadata; +import org.apache.fluss.server.metadata.MetadataFunctionProvider; +import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.ServerMetadataCache; +import org.apache.fluss.server.metadata.TableMetadata; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit test for {@link RpcServiceBase} async metadata methods. */ +class RpcServiceBaseTest { + + private ZooKeeperClient mockZkClient; + private TablePath testTablePath; + private PhysicalTablePath testPartitionPath; + private long testTableId; + private long testPartitionId; + private List<BucketMetadata> testBucketMetadata; + + @BeforeEach + void beforeEach() { + mockZkClient = mock(ZooKeeperClient.class); + + testTablePath = TablePath.of("testDb", "testTable"); + testPartitionPath = PhysicalTablePath.of(testTablePath, "partition1"); + testTableId = 1L; + testPartitionId = 100L; + testBucketMetadata = + Lists.newArrayList( + new BucketMetadata(0, 1, 1, Arrays.asList(1, 2, 3)), + new BucketMetadata(1, 2, 2, Arrays.asList(4, 5, 6))); + } + + private Session createTestSession() throws java.net.UnknownHostException { + return new Session( + (short) 1, + "default", + false, + java.net.InetAddress.getByName("127.0.0.1"), + FlussPrincipal.ANY); + } + + // A test implementation of RpcServiceBase to access the protected processMetadataRequest method + private static class TestRpcServiceBase extends RpcServiceBase { + public TestRpcServiceBase() { + super(null, null, null, null, null); + } + + @Override + public void processMetadataRequest( + MetadataRequest request, + String listenerName, + Session session, + Authorizer authorizer, + ServerMetadataCache metadataCache, + MetadataFunctionProvider functionProvider, + CompletableFuture<MetadataResponse> responseFuture) { + super.processMetadataRequest( + request, + listenerName, + session, + authorizer, + metadataCache, + functionProvider, + responseFuture); + } + + @Override + public String name() { + return "TestRpcService"; + } + + @Override + public void shutdown() {} + + @Override + public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) { + return null; + } + } + + @Test + void testGetTableMetadataFromZkAsync_Success() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + bucketAssignments.put(1, BucketAssignment.of(2, 3, 4)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + + // Mock ZooKeeper responses + when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment)); + when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, null, 0))) + .thenReturn(Optional.of(new LeaderAndIsr(1, 1, Arrays.asList(1, 2, 3), 1, 1))); + when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, null, 1))) + .thenReturn(Optional.of(new LeaderAndIsr(2, 1, Arrays.asList(2, 3, 4), 1, 1))); + + // Execute test + CompletableFuture<List<BucketMetadata>> future = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + + // Verify results + List<BucketMetadata> result = future.get(); + assertThat(result).hasSize(2); + + BucketMetadata bucket0 = result.get(0); + assertThat(bucket0.getBucketId()).isEqualTo(0); + assertThat(bucket0.getLeaderId()).hasValue(1); + assertThat(bucket0.getReplicas()).containsExactly(1, 2, 3); + + BucketMetadata bucket1 = result.get(1); + assertThat(bucket1.getBucketId()).isEqualTo(1); + assertThat(bucket1.getLeaderId()).hasValue(2); + assertThat(bucket1.getReplicas()).containsExactly(2, 3, 4); + } + + @Test + void testGetTableMetadataFromZkAsync_NoTableAssignment() throws Exception { + // Mock ZooKeeper to return empty table assignment + when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.empty()); + + // Execute test + CompletableFuture<List<BucketMetadata>> future = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + + // Verify results - should return empty list for non-partitioned table + List<BucketMetadata> result = future.get(); + assertThat(result).isEmpty(); + } + + @Test + void testGetTableMetadataFromZkAsync_NoTableAssignmentForPartitionedTable() throws Exception { + // Mock ZooKeeper to return empty table assignment for partitioned table + when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.empty()); + + // Execute test - should log warning but not throw exception + CompletableFuture<List<BucketMetadata>> future = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, true); + + // Verify results - should return empty list + List<BucketMetadata> result = future.get(); + assertThat(result).isEmpty(); + } + + @Test + void testGetTableMetadataFromZkAsync_ZooKeeperException() throws Exception { + // Mock ZooKeeper to throw exception + when(mockZkClient.getTableAssignment(testTableId)) + .thenThrow(new RuntimeException("ZK connection failed")); + + // Execute test + CompletableFuture<List<BucketMetadata>> future = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + + // Verify exception is thrown + assertThatThrownBy(() -> future.get()) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(FlussRuntimeException.class) + .hasMessageContaining("Failed to get metadata for table"); + } + + @Test + void testGetPartitionMetadataFromZkAsync_Success() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + bucketAssignments.put(1, BucketAssignment.of(2, 3, 4)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + + // Mock ZooKeeper responses + when(mockZkClient.getPartition(testTablePath, "partition1")) + .thenReturn(Optional.of(new TablePartition(testTableId, testPartitionId))); + when(mockZkClient.getPartitionAssignment(testPartitionId)) + .thenReturn(Optional.of(new PartitionAssignment(testTableId, bucketAssignments))); + when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, testPartitionId, 0))) + .thenReturn(Optional.of(new LeaderAndIsr(1, 1, Arrays.asList(1, 2, 3), 1, 1))); + when(mockZkClient.getLeaderAndIsr(new TableBucket(testTableId, testPartitionId, 1))) + .thenReturn(Optional.of(new LeaderAndIsr(2, 1, Arrays.asList(2, 3, 4), 1, 1))); + + // Execute test + CompletableFuture<PartitionMetadata> future = + RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient); + + // Verify results + PartitionMetadata result = future.get(); + assertThat(result.getPartitionId()).isEqualTo(testPartitionId); + assertThat(result.getTableId()).isEqualTo(testTableId); + assertThat(result.getBucketMetadataList()).hasSize(2); + + BucketMetadata bucket0 = result.getBucketMetadataList().get(0); + assertThat(bucket0.getBucketId()).isEqualTo(0); + assertThat(bucket0.getLeaderId()).hasValue(1); + assertThat(bucket0.getReplicas()).containsExactly(1, 2, 3); + + BucketMetadata bucket1 = result.getBucketMetadataList().get(1); + assertThat(bucket1.getBucketId()).isEqualTo(1); + assertThat(bucket1.getLeaderId()).hasValue(2); + assertThat(bucket1.getReplicas()).containsExactly(2, 3, 4); + } + + @Test + void testGetPartitionMetadataFromZkAsync_PartitionNotExist() throws Exception { + // Mock ZooKeeper to return empty partition + when(mockZkClient.getPartition(testTablePath, "nonexistent")).thenReturn(Optional.empty()); + + // Execute test + PhysicalTablePath nonexistentPath = PhysicalTablePath.of(testTablePath, "nonexistent"); + CompletableFuture<PartitionMetadata> future = + RpcServiceBase.getPartitionMetadataFromZkAsync(nonexistentPath, mockZkClient); + + // Verify exception is thrown + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(PartitionNotExistException.class); + } + + @Test + void testConcurrentGetTableMetadataFromZkAsync_SameKey() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + + // Mock ZooKeeper responses + when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment)); + when(mockZkClient.getLeaderAndIsr(any(TableBucket.class))) + .thenReturn(Optional.of(new LeaderAndIsr(1, 1, Arrays.asList(1, 2, 3), 1, 1))); + + // Execute concurrent requests with same key + CompletableFuture<List<BucketMetadata>> future1 = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + CompletableFuture<List<BucketMetadata>> future2 = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + + // Verify both requests return the same result + List<BucketMetadata> result1 = future1.get(); + List<BucketMetadata> result2 = future2.get(); + assertThat(result1).isEqualTo(result2); + assertThat(result1).hasSize(1); + } + + @Test + void testConcurrentGetPartitionMetadataFromZkAsync_SameKey() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + + // Mock ZooKeeper responses + when(mockZkClient.getPartition(testTablePath, "partition1")) + .thenReturn(Optional.of(new TablePartition(testPartitionId, testTableId))); + when(mockZkClient.getPartitionAssignment(testPartitionId)) + .thenReturn(Optional.of(new PartitionAssignment(testTableId, bucketAssignments))); + when(mockZkClient.getLeaderAndIsr(any(TableBucket.class))) + .thenReturn(Optional.of(new LeaderAndIsr(1, 1, Arrays.asList(1, 2, 3), 1, 1))); + + // Execute concurrent requests with same key + CompletableFuture<PartitionMetadata> future1 = + RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient); + CompletableFuture<PartitionMetadata> future2 = + RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient); + + // Verify both requests return the same result + PartitionMetadata result1 = future1.get(); + PartitionMetadata result2 = future2.get(); + assertThat(result1).isEqualTo(result2); + } + + @Test + void testGetTableMetadataFromZkAsync_MapEntryRemovedOnSuccess() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + + // Mock ZooKeeper responses + when(mockZkClient.getTableAssignment(testTableId)).thenReturn(Optional.of(tableAssignment)); + when(mockZkClient.getLeaderAndIsr(any(TableBucket.class))) + .thenReturn(Optional.of(new LeaderAndIsr(1, 1, Arrays.asList(1, 2, 3), 1, 1))); + + // Check that the map is initially empty + assertThat(getPendingTableMetadataFutureMap()).isEmpty(); + + // Execute test + CompletableFuture<List<BucketMetadata>> future = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + + // The entry should be added to the map + assertThat(getPendingTableMetadataFutureMap()).hasSize(1); + + // Wait for completion + future.get(); + + // After completion, the entry should be removed from the map + assertThat(getPendingTableMetadataFutureMap()).isEmpty(); + } + + @Test + void testGetTableMetadataFromZkAsync_MapEntryRemovedOnFailure() throws Exception { + // Mock ZooKeeper to throw exception + when(mockZkClient.getTableAssignment(testTableId)) + .thenThrow(new RuntimeException("ZK connection failed")); + + // Check that the map is initially empty + assertThat(getPendingTableMetadataFutureMap()).isEmpty(); + + // Execute test + CompletableFuture<List<BucketMetadata>> future = + RpcServiceBase.getTableMetadataFromZkAsync( + mockZkClient, testTablePath, testTableId, false); + + // The entry should be added to the map + assertThat(getPendingTableMetadataFutureMap()).hasSize(1); + + // Wait for completion and expect exception + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(FlussRuntimeException.class); + + // After failure, the entry should be removed from the map + assertThat(getPendingTableMetadataFutureMap()).isEmpty(); + } + + @Test + void testGetPartitionMetadataFromZkAsync_MapEntryRemovedOnSuccess() throws Exception { + // Setup test data + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(1, 2, 3)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + + // Mock ZooKeeper responses + when(mockZkClient.getPartition(testTablePath, "partition1")) + .thenReturn(Optional.of(new TablePartition(testPartitionId, testTableId))); + when(mockZkClient.getPartitionAssignment(testPartitionId)) + .thenReturn(Optional.of(new PartitionAssignment(testTableId, bucketAssignments))); + when(mockZkClient.getLeaderAndIsr(any(TableBucket.class))) + .thenReturn(Optional.of(new LeaderAndIsr(1, 1, Arrays.asList(1, 2, 3), 1, 1))); + + // Check that the map is initially empty + assertThat(getPendingPartitionMetadataFutureMap()).isEmpty(); + + // Execute test + CompletableFuture<PartitionMetadata> future = + RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient); + + // The entry should be added to the map + assertThat(getPendingPartitionMetadataFutureMap()).hasSize(1); + + // Wait for completion + future.get(); + + // After completion, the entry should be removed from the map + assertThat(getPendingPartitionMetadataFutureMap()).isEmpty(); + } + + @Test + void testGetPartitionMetadataFromZkAsync_MapEntryRemovedOnFailure() throws Exception { + // Mock ZooKeeper to throw exception + when(mockZkClient.getPartition(testTablePath, "partition1")) + .thenThrow(new RuntimeException("ZK connection failed")); + + // Check that the map is initially empty + assertThat(getPendingPartitionMetadataFutureMap()).isEmpty(); + + // Execute test + CompletableFuture<PartitionMetadata> future = + RpcServiceBase.getPartitionMetadataFromZkAsync(testPartitionPath, mockZkClient); + + // The entry should be added to the map + assertThat(getPendingPartitionMetadataFutureMap()).hasSize(1); + + // Wait for completion and expect exception + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(FlussRuntimeException.class); + + // After failure, the entry should be removed from the map + assertThat(getPendingPartitionMetadataFutureMap()).isEmpty(); + } + + @Test + void testProcessMetadataRequest_TableFromCache() throws Exception { + // Setup mocks + MetadataRequest request = new MetadataRequest(); + PbTablePath pbTablePath = new PbTablePath().setDatabaseName("db").setTableName("table"); + request.addTablePath().copyFrom(pbTablePath); + + String listenerName = "default"; + Session session = createTestSession(); + Authorizer authorizer = mock(Authorizer.class); + ServerMetadataCache metadataCache = mock(ServerMetadataCache.class); + MetadataFunctionProvider functionProvider = mock(MetadataFunctionProvider.class); + + when(authorizer.isAuthorized(any(), any(), any())).thenReturn(true); + TablePath tablePath = TablePath.of("db", "table"); + TableInfo partitionTableInfo = + TableInfo.of( + tablePath, + testTableId, + 1, + DATA1_PARTITIONED_TABLE_DESCRIPTOR, + System.currentTimeMillis(), + System.currentTimeMillis()); + TableMetadata tableMetadata = + new TableMetadata(partitionTableInfo, Collections.emptyList()); + when(functionProvider.getTableMetadataFromCache(eq(tablePath))) + .thenReturn(Optional.of(tableMetadata)); Review Comment: Please do no use Mockito, it increases maintenance overhead in the long term, which should be avoided. In this case, we can simply pass ` authorizer` as `null`, and create a testing implementation for `MetadataFunctionProvider` and `ServerMetadataCache`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
