This is an automated email from the ASF dual-hosted git repository.

simbadzina pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8193a8402030 HDFS-17324. RBF: Router should not return nameservices 
that not enable observer r… (#6412)
8193a8402030 is described below

commit 8193a8402030dceac3c6ab4f4b599349eb114744
Author: LiuGuH <444506...@qq.com>
AuthorDate: Wed Jan 24 04:30:08 2024 +0800

    HDFS-17324. RBF: Router should not return nameservices that not enable 
observer r… (#6412)
---
 .../federation/router/RouterStateIdContext.java    | 25 +++++++++++++++++++++-
 .../federation/router/TestObserverWithRouter.java  | 21 ++++++++----------
 2 files changed, 33 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
index 14adc16d99dd..5c9ceb865a5b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 
@@ -58,6 +59,10 @@ class RouterStateIdContext implements AlignmentContext {
   private final ConcurrentHashMap<String, LongAccumulator> namespaceIdMap;
   // Size limit for the map of state Ids to send to clients.
   private final int maxSizeOfFederatedStateToPropagate;
+  /** Observer read enabled. Default for all nameservices. */
+  private final boolean observerReadEnabledDefault;
+  /** Nameservice specific overrides of the default setting for enabling 
observer reads. */
+  private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
 
   RouterStateIdContext(Configuration conf) {
     this.coordinatedMethods = new HashSet<>();
@@ -75,6 +80,15 @@ class RouterStateIdContext implements AlignmentContext {
     maxSizeOfFederatedStateToPropagate =
         
conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE,
         
RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT);
+
+    this.observerReadEnabledDefault = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
+    String[] observerReadOverrides =
+        conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
+    if (observerReadOverrides != null) {
+      
observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
+    }
   }
 
   /**
@@ -86,7 +100,7 @@ class RouterStateIdContext implements AlignmentContext {
     }
     RouterFederatedStateProto.Builder builder = 
RouterFederatedStateProto.newBuilder();
     namespaceIdMap.forEach((k, v) -> {
-      if (v.get() != Long.MIN_VALUE) {
+      if ((v.get() != Long.MIN_VALUE) && isNamespaceObserverReadEligible(k)) {
         builder.putNamespaceStateIds(k, v.get());
       }
     });
@@ -177,4 +191,13 @@ class RouterStateIdContext implements AlignmentContext {
     return protocolName.equals(ClientProtocol.class.getCanonicalName())
         && coordinatedMethods.contains(methodName);
   }
+
+  /**
+   * Check if a namespace is eligible for observer reads.
+   * @param nsId namespaceID
+   * @return whether the 'namespace' has observer reads enabled.
+   */
+  boolean isNamespaceObserverReadEligible(String nsId) {
+    return observerReadEnabledDefault != 
observerReadEnabledOverrides.contains(nsId);
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
index e20e3ad2a0a6..0ff166e0c3b3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
@@ -590,7 +590,12 @@ public class TestObserverWithRouter {
   @Test
   @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
   public void testRouterResponseHeaderState() {
-    RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new 
Configuration());
+    // This conf makes ns1 that is not eligible for observer reads.
+    Configuration conf = new Configuration();
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
+    conf.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns1");
+
+    RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
 
     ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
         routerStateIdContext.getNamespaceIdMap();
@@ -598,26 +603,18 @@ public class TestObserverWithRouter {
     namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100));
     namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));
 
-    Map<String, Long> mockMapping = new HashMap<>();
-    mockMapping.put("ns0", 10L);
-    mockMapping.put("ns2", 100L);
-    mockMapping.put("ns3", Long.MIN_VALUE);
-    RouterFederatedStateProto.Builder builder = 
RouterFederatedStateProto.newBuilder();
-    mockMapping.forEach(builder::putNamespaceStateIds);
-
     RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
         RpcHeaderProtos.RpcResponseHeaderProto
             .newBuilder()
             .setCallId(1)
-            
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
-            .setRouterFederatedState(builder.build().toByteString());
+            
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
     routerStateIdContext.updateResponseState(responseHeaderBuilder);
 
     Map<String, Long> latestFederateState = 
RouterStateIdContext.getRouterFederatedStateMap(
         responseHeaderBuilder.build().getRouterFederatedState());
-    Assertions.assertEquals(2, latestFederateState.size());
+    // Only ns0 will be in latestFederateState
+    Assertions.assertEquals(1, latestFederateState.size());
     Assertions.assertEquals(10L, latestFederateState.get("ns0"));
-    Assertions.assertEquals(100L, latestFederateState.get("ns1"));
   }
 
   @EnumSource(ConfigSetting.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to