This is an automated email from the ASF dual-hosted git repository.

simbadzina pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebbe9628d34 HDFS-17558. RBF: Make maxSizeOfFederatedStateToPropagate 
work on setResponseHeaderState. (#6902)
ebbe9628d34 is described below

commit ebbe9628d34476939343a94484528d3754e92eb9
Author: fuchaohong <1783129...@qq.com>
AuthorDate: Thu Jul 18 00:43:00 2024 +0800

    HDFS-17558. RBF: Make maxSizeOfFederatedStateToPropagate work on 
setResponseHeaderState. (#6902)
    
    Co-authored-by: fuchaohong <fuchaoh...@chinatelecom.cn>
---
 .../federation/router/RouterStateIdContext.java    |  8 ++---
 .../federation/router/TestObserverWithRouter.java  | 40 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
index 5c9ceb865a5..e239e5e9059 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
@@ -104,7 +104,9 @@ class RouterStateIdContext implements AlignmentContext {
         builder.putNamespaceStateIds(k, v.get());
       }
     });
-    headerBuilder.setRouterFederatedState(builder.build().toByteString());
+    if (builder.getNamespaceStateIdsCount() <= 
maxSizeOfFederatedStateToPropagate) {
+      headerBuilder.setRouterFederatedState(builder.build().toByteString());
+    }
   }
 
   public LongAccumulator getNamespaceStateId(String nsId) {
@@ -155,9 +157,7 @@ class RouterStateIdContext implements AlignmentContext {
 
   @Override
   public void updateResponseState(RpcResponseHeaderProto.Builder header) {
-    if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) {
-      setResponseHeaderState(header);
-    }
+    setResponseHeaderState(header);
   }
 
   @Override
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
index eaee5b8b146..3f773efd63d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
@@ -641,6 +641,46 @@ public class TestObserverWithRouter {
     Assertions.assertEquals(10L, latestFederateState.get("ns0"));
   }
 
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testRouterResponseHeaderStateMaxSizeLimit() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
+    
conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE,
 1);
+
+    RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
+
+    ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
+        routerStateIdContext.getNamespaceIdMap();
+    namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10));
+    namespaceIdMap.put("ns1", new LongAccumulator(Math::max, Long.MIN_VALUE));
+
+    RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
+        RpcHeaderProtos.RpcResponseHeaderProto
+            .newBuilder()
+            .setCallId(1)
+            
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
+    routerStateIdContext.updateResponseState(responseHeaderBuilder);
+
+    Map<String, Long> latestFederateState = 
RouterStateIdContext.getRouterFederatedStateMap(
+        responseHeaderBuilder.build().getRouterFederatedState());
+    // Validate that ns0 is still part of the header
+    Assertions.assertEquals(1, latestFederateState.size());
+
+    namespaceIdMap.put("ns2", new LongAccumulator(Math::max, 20));
+    // Rebuild header
+    responseHeaderBuilder =
+        RpcHeaderProtos.RpcResponseHeaderProto
+            .newBuilder()
+            .setCallId(1)
+            
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
+    routerStateIdContext.updateResponseState(responseHeaderBuilder);
+    latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
+        responseHeaderBuilder.build().getRouterFederatedState());
+    // Validate that ns0 is still part of the header
+    Assertions.assertEquals(0, latestFederateState.size());
+  }
+
   @EnumSource(ConfigSetting.class)
   @ParameterizedTest
   public void testStateIdProgressionInRouter(ConfigSetting configSetting) 
throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to