This is an automated email from the ASF dual-hosted git repository.

zanderxu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a9bdb1edc4 HDFS-16837. [RBF SBN] ClientGSIContext should merge 
RouterFederatedStates to get the max state id for each namespaces (#5123)
8a9bdb1edc4 is described below

commit 8a9bdb1edc41fd2a6a4d2ac497ee29e01d0f67aa
Author: ZanderXu <zande...@apache.org>
AuthorDate: Mon Dec 5 16:15:47 2022 +0800

    HDFS-16837. [RBF SBN] ClientGSIContext should merge RouterFederatedStates 
to get the max state id for each namespaces (#5123)
---
 .../org/apache/hadoop/hdfs/ClientGSIContext.java   | 47 +++++++++++++++++++++-
 .../hadoop-hdfs-client/src/main/proto/hdfs.proto   | 12 ++++++
 .../federation/router/RouterStateIdContext.java    | 13 +++---
 .../src/main/proto/FederationProtocol.proto        | 13 ------
 .../federation/router/TestConnectionManager.java   |  2 +-
 .../federation/router/TestObserverWithRouter.java  | 42 ++++++++++++++++++-
 .../router/TestRouterFederatedState.java           |  3 +-
 7 files changed, 108 insertions(+), 24 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
index bcbb4b96c2a..7b03e1f3518 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
@@ -20,13 +20,19 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.hadoop.thirdparty.protobuf.ByteString;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
 
 /**
  * Global State Id context for the client.
@@ -77,12 +83,46 @@ public class ClientGSIContext implements AlignmentContext {
   @Override
   public synchronized void receiveResponseState(RpcResponseHeaderProto header) 
{
     if (header.hasRouterFederatedState()) {
-      routerFederatedState = header.getRouterFederatedState();
+      routerFederatedState = mergeRouterFederatedState(
+          this.routerFederatedState, header.getRouterFederatedState());
     } else {
       lastSeenStateId.accumulate(header.getStateId());
     }
   }
 
+  /**
+   * Utility function to parse routerFederatedState field in RPC headers.
+   */
+  public static Map<String, Long> getRouterFederatedStateMap(ByteString 
byteString) {
+    if (byteString != null) {
+      try {
+        RouterFederatedStateProto federatedState = 
RouterFederatedStateProto.parseFrom(byteString);
+        return federatedState.getNamespaceStateIdsMap();
+      } catch (InvalidProtocolBufferException e) {
+        // Ignore this exception and will return an empty map
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Merge state1 and state2 to get the max value for each namespace.
+   * @param state1 input ByteString.
+   * @param state2 input ByteString.
+   * @return one ByteString object which contains the max value of each 
namespace.
+   */
+  public static ByteString mergeRouterFederatedState(ByteString state1, 
ByteString state2) {
+    Map<String, Long> mapping1 = new 
HashMap<>(getRouterFederatedStateMap(state1));
+    Map<String, Long> mapping2 = getRouterFederatedStateMap(state2);
+    mapping2.forEach((k, v) -> {
+      long localValue = mapping1.getOrDefault(k, 0L);
+      mapping1.put(k, Math.max(v, localValue));
+    });
+    RouterFederatedStateProto.Builder federatedBuilder = 
RouterFederatedStateProto.newBuilder();
+    mapping1.forEach(federatedBuilder::putNamespaceStateIds);
+    return federatedBuilder.build().toByteString();
+  }
+
   /**
    * Client side implementation for providing state alignment info in requests.
    */
@@ -106,4 +146,9 @@ public class ClientGSIContext implements AlignmentContext {
     // Do nothing.
     return 0;
   }
+
+  @VisibleForTesting
+  public ByteString getRouterFederatedState() {
+    return this.routerFederatedState;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index a4d36180c2c..e1e7f7d780d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -723,3 +723,15 @@ message BlockTokenSecretProto {
   repeated string storageIds = 8;
   optional bytes handshakeSecret = 9;
 }
+
+/////////////////////////////////////////////////
+// Alignment state for namespaces.
+/////////////////////////////////////////////////
+/**
+ * Clients should receive this message in RPC responses and forward it
+ * in RPC requests without interpreting it. It should be encoded
+ * as an obscure byte array when being sent to clients.
+ */
+message RouterFederatedStateProto {
+  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for 
multiple namespaces.
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
index 9d2b75b0b55..a15a0001e53 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java
@@ -28,8 +28,8 @@ import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.RetriableException;
@@ -83,10 +83,9 @@ class RouterStateIdContext implements AlignmentContext {
     if (namespaceIdMap.isEmpty()) {
       return;
     }
-    HdfsServerFederationProtos.RouterFederatedStateProto.Builder 
federatedStateBuilder =
-        HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder();
-    namespaceIdMap.forEach((k, v) -> 
federatedStateBuilder.putNamespaceStateIds(k, v.get()));
-    
headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+    RouterFederatedStateProto.Builder builder = 
RouterFederatedStateProto.newBuilder();
+    namespaceIdMap.forEach((k, v) -> builder.putNamespaceStateIds(k, v.get()));
+    headerBuilder.setRouterFederatedState(builder.build().toByteString());
   }
 
   public LongAccumulator getNamespaceStateId(String nsId) {
@@ -102,9 +101,9 @@ class RouterStateIdContext implements AlignmentContext {
    */
   public static Map<String, Long> getRouterFederatedStateMap(ByteString 
byteString) {
     if (byteString != null) {
-      HdfsServerFederationProtos.RouterFederatedStateProto federatedState;
+      RouterFederatedStateProto federatedState;
       try {
-        federatedState = 
HdfsServerFederationProtos.RouterFederatedStateProto.parseFrom(byteString);
+        federatedState = RouterFederatedStateProto.parseFrom(byteString);
       } catch (InvalidProtocolBufferException e) {
         throw new RuntimeException(e);
       }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index 7f61d80fe1a..c8636826c3c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -311,17 +311,4 @@ message GetDisabledNameservicesRequestProto {
 
 message GetDisabledNameservicesResponseProto {
   repeated string nameServiceIds = 1;
-}
-
-/////////////////////////////////////////////////
-// Alignment state for namespaces.
-/////////////////////////////////////////////////
-
-/**
- * Clients should receive this message in RPC responses and forward it
- * in RPC requests without interpreting it. It should be encoded
- * as an obscure byte array when being sent to clients.
- */
-message RouterFederatedStateProto {
-  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for 
multiple namespaces.
 }
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index 42288bcf53a..920c9c4e519 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
index e38b0b2a35a..23e72546aac 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
@@ -27,14 +27,18 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientGSIContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@@ -43,6 +47,8 @@ import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeConte
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -505,4 +511,38 @@ public class TestObserverWithRouter {
     // getList call should be sent to observer
     assertEquals("One call should be sent to observer", 1, 
rpcCountForObserver);
   }
-}
\ No newline at end of file
+
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testClientReceiveResponseState() {
+    ClientGSIContext clientGSIContext = new ClientGSIContext();
+
+    Map<String, Long> mockMapping = new HashMap<>();
+    mockMapping.put("ns0", 10L);
+    RouterFederatedStateProto.Builder builder = 
RouterFederatedStateProto.newBuilder();
+    mockMapping.forEach(builder::putNamespaceStateIds);
+    RpcHeaderProtos.RpcResponseHeaderProto header = 
RpcHeaderProtos.RpcResponseHeaderProto
+        .newBuilder()
+        .setCallId(1)
+        
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
+        .setRouterFederatedState(builder.build().toByteString())
+        .build();
+    clientGSIContext.receiveResponseState(header);
+
+    Map<String, Long> mockLowerMapping = new HashMap<>();
+    mockLowerMapping.put("ns0", 8L);
+    builder = RouterFederatedStateProto.newBuilder();
+    mockLowerMapping.forEach(builder::putNamespaceStateIds);
+    header = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder()
+        .setRouterFederatedState(builder.build().toByteString())
+        .setCallId(2)
+        
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
+        .build();
+    clientGSIContext.receiveResponseState(header);
+
+    Map<String, Long> latestFederateState = 
ClientGSIContext.getRouterFederatedStateMap(
+        clientGSIContext.getRouterFederatedState());
+    Assertions.assertEquals(1, latestFederateState.size());
+    Assertions.assertEquals(10L, latestFederateState.get("ns0"));
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
index 2bc8cfc21b2..be8fcf682bd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.util.HashMap;
 import java.util.Map;
+
+import 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.ClientId;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
-import 
org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
 import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.util.ProtoUtil;
 import org.junit.Test;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to