[ 
https://issues.apache.org/jira/browse/HDFS-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637925#comment-17637925
 ] 

ASF GitHub Bot commented on HDFS-16837:
---------------------------------------

goiri commented on code in PR #5123:
URL: https://github.com/apache/hadoop/pull/5123#discussion_r1030803567


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -77,11 +83,45 @@ public void 
updateResponseState(RpcResponseHeaderProto.Builder header) {
   @Override
   public synchronized void receiveResponseState(RpcResponseHeaderProto header) 
{
     if (header.hasRouterFederatedState()) {
-      routerFederatedState = header.getRouterFederatedState();
+      routerFederatedState = 
mergeRouterFederatedState(header.getRouterFederatedState());
     } else {
       lastSeenStateId.accumulate(header.getStateId());
     }
   }
+  /**
+   * Utility function to parse routerFederatedState field in RPC headers.
+   */
+  public static Map<String, Long> getRouterFederatedStateMap(ByteString 
byteString) {
+    if (byteString == null) {
+      return Collections.emptyMap();
+    }
+
+    RouterFederatedStateProto federatedState;
+    try {
+      federatedState = RouterFederatedStateProto.parseFrom(byteString);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
+    return federatedState.getNamespaceStateIdsMap();

Review Comment:
   Might be cleaner to do this inside of the try and move the declaration of 
the var too.
   Then you can return Collections.emptyMap(); at the end



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java:
##########
@@ -83,8 +83,8 @@ public void 
setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
     if (namespaceIdMap.isEmpty()) {
       return;
     }
-    HdfsServerFederationProtos.RouterFederatedStateProto.Builder 
federatedStateBuilder =
-        HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder();
+    RouterFederatedStateProto.Builder federatedStateBuilder =

Review Comment:
   Single line?



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -77,11 +83,45 @@ public void 
updateResponseState(RpcResponseHeaderProto.Builder header) {
   @Override
   public synchronized void receiveResponseState(RpcResponseHeaderProto header) 
{
     if (header.hasRouterFederatedState()) {
-      routerFederatedState = header.getRouterFederatedState();
+      routerFederatedState = 
mergeRouterFederatedState(header.getRouterFederatedState());
     } else {
       lastSeenStateId.accumulate(header.getStateId());
     }
   }
+  /**
+   * Utility function to parse routerFederatedState field in RPC headers.
+   */
+  public static Map<String, Long> getRouterFederatedStateMap(ByteString 
byteString) {
+    if (byteString == null) {
+      return Collections.emptyMap();
+    }
+
+    RouterFederatedStateProto federatedState;
+    try {
+      federatedState = RouterFederatedStateProto.parseFrom(byteString);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
+    return federatedState.getNamespaceStateIdsMap();
+  }
+
+  /**
+   * Merge the local FederatedState and RemoteFederateState to get the max 
value for each namespace.
+   * @param remoteState the remote RouterFederatedState.
+   * @return one ByteString object which contains the max value of each 
namespace.
+   */
+  private ByteString mergeRouterFederatedState(ByteString remoteState) {

Review Comment:
   Can this be static?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java:
##########
@@ -439,4 +445,39 @@ public void testRouterMsync() throws Exception {
     assertEquals("Four calls should be sent to active", 4,
         rpcCountForActive);
   }
+
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testClientReceiveResponseState() {
+    ClientGSIContext clientGSIContext = new ClientGSIContext();
+
+    Map<String, Long> mockMapping = new HashMap<>();
+    mockMapping.put("ns0", 10L);
+    RouterFederatedStateProto.Builder federatedStateBuilder =

Review Comment:
   Single line?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto:
##########
@@ -311,17 +311,4 @@ message GetDisabledNameservicesRequestProto {
 
 message GetDisabledNameservicesResponseProto {
   repeated string nameServiceIds = 1;
-}
-
-/////////////////////////////////////////////////
-// Alignment state for namespaces.
-/////////////////////////////////////////////////
-
-/**
- * Clients should receive this message in RPC responses and forward it
- * in RPC requests without interpreting it. It should be encoded
- * as an obscure byte array when being sent to clients.
- */
-message RouterFederatedStateProto {
-  map<string, int64> namespaceStateIds = 1; // Last seen state IDs for 
multiple namespaces.

Review Comment:
   Can we change the protocol so freely?





> [RBF SBN] ClientGSIContext should merge RouterFederatedStates to get the max 
> state id for each namespace
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HDFS-16837
>                 URL: https://issues.apache.org/jira/browse/HDFS-16837
>             Project: Hadoop HDFS
>          Issue Type: Bug
>            Reporter: ZanderXu
>            Assignee: ZanderXu
>            Priority: Major
>              Labels: pull-request-available
>
> ClientGSIContext should merge local and remote RouterFederatedState to get 
> the max state id for each namespace.
> And the related code as bellows:
> {code:java}
> @Override
> public synchronized void receiveResponseState(RpcResponseHeaderProto header) {
>   if (header.hasRouterFederatedState()) {
>     // BUG here
>     routerFederatedState = header.getRouterFederatedState();
>   } else {
>     lastSeenStateId.accumulate(header.getStateId());
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to