omalley commented on code in PR #4311:
URL: https://github.com/apache/hadoop/pull/4311#discussion_r940558557


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
 @InterfaceStability.Evolving
 public class ClientGSIContext implements AlignmentContext {
 
-  private final LongAccumulator lastSeenStateId =
-      new LongAccumulator(Math::max, Long.MIN_VALUE);
+  private final NamespaceStateId lastSeenStateId;
+  private ByteString routerFederatedState;
+
+  public ClientGSIContext() {
+    this(new NamespaceStateId());
+  }
+
+  public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+    this.lastSeenStateId = lastSeenStateId;
+    routerFederatedState = null;
+  }
 
   @Override
   public long getLastSeenStateId() {
     return lastSeenStateId.get();
   }
 
+  public void updateLastSeenStateID(Long stateId) {

Review Comment:
   I don't see anyone calling this.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -66,15 +79,23 @@ public void 
updateResponseState(RpcResponseHeaderProto.Builder header) {
    */
   @Override
   public void receiveResponseState(RpcResponseHeaderProto header) {
-    lastSeenStateId.accumulate(header.getStateId());
+    lastSeenStateId.update(header.getStateId());

Review Comment:
   I think we should branch on the hasRouterFederatedState and update one or 
the other. This method should be synchronized because you are updating state in 
shared variables.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {

Review Comment:
   HashMap isn't synchronized, so you need explicit synchronization.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+    }
+  }
+
+  public void updateNamespaceState(String nsId, Long stateId) {
+    if (!namespaceIdMap.containsKey(nsId)) {
+      namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+    }
+    namespaceIdMap.get(nsId).update(stateId);
+  }
+
+  public void 
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder 
headerBuilder) {
+    RouterFederatedStateProto.Builder federatedStateBuilder = 
RouterFederatedStateProto.newBuilder();
+    namespaceIdMap

Review Comment:
   This loop also needs a lock around it.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+    }
+  }
+
+  public void updateNamespaceState(String nsId, Long stateId) {
+    if (!namespaceIdMap.containsKey(nsId)) {
+      namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+    }
+    namespaceIdMap.get(nsId).update(stateId);
+  }
+
+  public void 
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder 
headerBuilder) {
+    RouterFederatedStateProto.Builder federatedStateBuilder = 
RouterFederatedStateProto.newBuilder();
+    namespaceIdMap
+        .forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, 
v.get()));
+    
headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+  }
+
+  public NamespaceStateId getNamespaceId(String nsId) {

Review Comment:
   And this method should be synchronized.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
 @InterfaceStability.Evolving
 public class ClientGSIContext implements AlignmentContext {
 
-  private final LongAccumulator lastSeenStateId =
-      new LongAccumulator(Math::max, Long.MIN_VALUE);
+  private final NamespaceStateId lastSeenStateId;
+  private ByteString routerFederatedState;
+
+  public ClientGSIContext() {
+    this(new NamespaceStateId());
+  }
+
+  public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+    this.lastSeenStateId = lastSeenStateId;
+    routerFederatedState = null;
+  }
 
   @Override
   public long getLastSeenStateId() {
     return lastSeenStateId.get();
   }
 
+  public void updateLastSeenStateID(Long stateId) {

Review Comment:
   I don't see anyone calling this. This method seems problematic in that it 
only updates the lastSeenStateId, which shouldn't be changed if this a router 
connection.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**

Review Comment:
   This class should be moved to RBF rather than down in the HDFS client.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */

Review Comment:
   What level of sharing is there between these? Is a single one used for a 
single user? or is it per an incoming connection to the router? I assume they 
shared across out going connections to the different NameNodes. A comment 
saying how they are shared would be very useful.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);

Review Comment:
   This loop needs a lock around it.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+    }
+  }
+
+  public void updateNamespaceState(String nsId, Long stateId) {

Review Comment:
   Make this method synchronized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to