This is an automated email from the ASF dual-hosted git repository. simbadzina pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new ebbe9628d34 HDFS-17558. RBF: Make maxSizeOfFederatedStateToPropagate work on setResponseHeaderState. (#6902) ebbe9628d34 is described below commit ebbe9628d34476939343a94484528d3754e92eb9 Author: fuchaohong <1783129...@qq.com> AuthorDate: Thu Jul 18 00:43:00 2024 +0800 HDFS-17558. RBF: Make maxSizeOfFederatedStateToPropagate work on setResponseHeaderState. (#6902) Co-authored-by: fuchaohong <fuchaoh...@chinatelecom.cn> --- .../federation/router/RouterStateIdContext.java | 8 ++--- .../federation/router/TestObserverWithRouter.java | 40 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 5c9ceb865a5..e239e5e9059 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -104,7 +104,9 @@ class RouterStateIdContext implements AlignmentContext { builder.putNamespaceStateIds(k, v.get()); } }); - headerBuilder.setRouterFederatedState(builder.build().toByteString()); + if (builder.getNamespaceStateIdsCount() <= maxSizeOfFederatedStateToPropagate) { + headerBuilder.setRouterFederatedState(builder.build().toByteString()); + } } public LongAccumulator getNamespaceStateId(String nsId) { @@ -155,9 +157,7 @@ class RouterStateIdContext implements AlignmentContext { @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { - if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) { - setResponseHeaderState(header); - } + setResponseHeaderState(header); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index eaee5b8b146..3f773efd63d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -641,6 +641,46 @@ public class TestObserverWithRouter { Assertions.assertEquals(10L, latestFederateState.get("ns0")); } + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterResponseHeaderStateMaxSizeLimit() { + Configuration conf = new Configuration(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 1); + + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); + + ConcurrentHashMap<String, LongAccumulator> namespaceIdMap = + routerStateIdContext.getNamespaceIdMap(); + namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10)); + namespaceIdMap.put("ns1", new LongAccumulator(Math::max, Long.MIN_VALUE)); + + RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + + Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Validate that ns0 is still part of the header + Assertions.assertEquals(1, latestFederateState.size()); + + namespaceIdMap.put("ns2", new LongAccumulator(Math::max, 20)); + // Rebuild header + responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Validate that ns0 is still part of the header + Assertions.assertEquals(0, latestFederateState.size()); + } + @EnumSource(ConfigSetting.class) @ParameterizedTest public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org