This is an automated email from the ASF dual-hosted git repository. simbadzina pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 7d3b6a36b8aa HDFS-17306. RBF: Router should not return nameservices that does not enable observer nodes in RpcResponseHeaderProto (#6385) 7d3b6a36b8aa is described below commit 7d3b6a36b8aad3b417d806edbe6586c2b14e9fd5 Author: LiuGuH <444506...@qq.com> AuthorDate: Fri Jan 5 06:43:11 2024 +0800 HDFS-17306. RBF: Router should not return nameservices that does not enable observer nodes in RpcResponseHeaderProto (#6385) --- .../federation/router/RouterStateIdContext.java | 10 ++++++- .../federation/router/TestObserverWithRouter.java | 34 ++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index b3bab732c029..14adc16d99dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -85,7 +85,11 @@ class RouterStateIdContext implements AlignmentContext { return; } RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); - namespaceIdMap.forEach((k, v) -> builder.putNamespaceStateIds(k, v.get())); + namespaceIdMap.forEach((k, v) -> { + if (v.get() != Long.MIN_VALUE) { + builder.putNamespaceStateIds(k, v.get()); + } + }); headerBuilder.setRouterFederatedState(builder.build().toByteString()); } @@ -97,6 +101,10 @@ class RouterStateIdContext implements AlignmentContext { return Collections.list(namespaceIdMap.keys()); } + public ConcurrentHashMap<String, LongAccumulator> getNamespaceIdMap() { + return namespaceIdMap; + } + public void removeNamespaceStateId(String nsId) { namespaceIdMap.remove(nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index a81f773804fc..e20e3ad2a0a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAccumulator; @@ -586,6 +587,39 @@ public class TestObserverWithRouter { Assertions.assertEquals(10L, latestFederateState.get("ns0")); } + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterResponseHeaderState() { + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration()); + + ConcurrentHashMap<String, LongAccumulator> namespaceIdMap = + routerStateIdContext.getNamespaceIdMap(); + namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10)); + namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100)); + namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE)); + + Map<String, Long> mockMapping = new HashMap<>(); + mockMapping.put("ns0", 10L); + mockMapping.put("ns2", 100L); + mockMapping.put("ns3", Long.MIN_VALUE); + RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); + mockMapping.forEach(builder::putNamespaceStateIds); + + RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .setRouterFederatedState(builder.build().toByteString()); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + + Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + Assertions.assertEquals(2, latestFederateState.size()); + Assertions.assertEquals(10L, latestFederateState.get("ns0")); + Assertions.assertEquals(100L, latestFederateState.get("ns1")); + } + @EnumSource(ConfigSetting.class) @ParameterizedTest public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org