This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new e0974298cef HDFS-16826. [RBF SBN] ConnectionManager should advance the client stateId for each request (#5086) e0974298cef is described below commit e0974298cef072181ee2b639af8eac46cd712245 Author: ZanderXu <zande...@apache.org> AuthorDate: Fri Nov 25 09:23:33 2022 +0800 HDFS-16826. [RBF SBN] ConnectionManager should advance the client stateId for each request (#5086) --- .../federation/router/ConnectionManager.java | 5 ++- .../federation/router/PoolAlignmentContext.java | 7 ++++ .../federation/router/TestConnectionManager.java | 49 ++++++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index c6db9837c7c..eeaa9cd4b34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -226,13 +226,14 @@ public class ConnectionManager { this.pools.put(connectionId, pool); this.connectionPoolToNamespaceMap.put(connectionId, nsId); } - long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId); - pool.getPoolAlignmentContext().advanceClientStateId(clientStateId); } finally { writeLock.unlock(); } } + long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId); + pool.getPoolAlignmentContext().advanceClientStateId(clientStateId); + ConnectionContext conn = pool.getConnection(); // Add a new connection to the pool if it wasn't usable diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java index ccb7e9762af..1f2b12d445f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.io.IOException; import java.util.concurrent.atomic.LongAccumulator; + +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; @@ -99,4 +101,9 @@ public class PoolAlignmentContext implements AlignmentContext { public void advanceClientStateId(Long clientStateId) { poolLocalStateId.accumulate(clientStateId); } + + @VisibleForTesting + public long getPoolLocalStateId() { + return this.poolLocalStateId.get(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 067d43dabd5..42288bcf53a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -31,6 +34,7 @@ import org.junit.Rule; import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -305,6 +309,51 @@ public class TestConnectionManager { } } + @Test + public void testAdvanceClientStateId() throws IOException { + // Start one ConnectionManager + Configuration tmpConf = new Configuration(); + ConnectionManager tmpConnManager = new ConnectionManager(tmpConf); + tmpConnManager.start(); + Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools(); + + // Mock one Server.Call with FederatedNamespaceState that ns0 = 1L. + Server.Call mockCall1 = new Server.Call(1, 1, null, null, + RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}); + Map<String, Long> nsStateId = new HashMap<>(); + nsStateId.put("ns0", 1L); + RouterFederatedStateProto.Builder stateBuilder = RouterFederatedStateProto.newBuilder(); + nsStateId.forEach(stateBuilder::putNamespaceStateIds); + mockCall1.setFederatedNamespaceState(stateBuilder.build().toByteString()); + + Server.getCurCall().set(mockCall1); + + // Create one new connection pool + tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0"); + assertEquals(1, poolMap.size()); + ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, + TEST_NN_ADDRESS, NamenodeProtocol.class); + ConnectionPool pool = poolMap.get(connectionPoolId); + assertEquals(1L, pool.getPoolAlignmentContext().getPoolLocalStateId()); + + // Mock one Server.Call with FederatedNamespaceState that ns0 = 2L. + Server.Call mockCall2 = new Server.Call(2, 1, null, null, + RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}); + nsStateId.clear(); + nsStateId.put("ns0", 2L); + stateBuilder = RouterFederatedStateProto.newBuilder(); + nsStateId.forEach(stateBuilder::putNamespaceStateIds); + mockCall2.setFederatedNamespaceState(stateBuilder.build().toByteString()); + + Server.getCurCall().set(mockCall2); + + // Get one existed connection for ns0 + tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0"); + assertEquals(1, poolMap.size()); + pool = poolMap.get(connectionPoolId); + assertEquals(2L, pool.getPoolAlignmentContext().getPoolLocalStateId()); + } + @Test public void testConfigureConnectionActiveRatio() throws IOException { // test 1 conn below the threshold and these conns are closed --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org