[ 
https://issues.apache.org/jira/browse/HDFS-13522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576940#comment-17576940
 ] 

ASF GitHub Bot commented on HDFS-13522:
---------------------------------------

omalley commented on code in PR #4311:
URL: https://github.com/apache/hadoop/pull/4311#discussion_r940558557


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
 @InterfaceStability.Evolving
 public class ClientGSIContext implements AlignmentContext {
 
-  private final LongAccumulator lastSeenStateId =
-      new LongAccumulator(Math::max, Long.MIN_VALUE);
+  private final NamespaceStateId lastSeenStateId;
+  private ByteString routerFederatedState;
+
+  public ClientGSIContext() {
+    this(new NamespaceStateId());
+  }
+
+  public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+    this.lastSeenStateId = lastSeenStateId;
+    routerFederatedState = null;
+  }
 
   @Override
   public long getLastSeenStateId() {
     return lastSeenStateId.get();
   }
 
+  public void updateLastSeenStateID(Long stateId) {

Review Comment:
   I don't see anyone calling this.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -66,15 +79,23 @@ public void 
updateResponseState(RpcResponseHeaderProto.Builder header) {
    */
   @Override
   public void receiveResponseState(RpcResponseHeaderProto header) {
-    lastSeenStateId.accumulate(header.getStateId());
+    lastSeenStateId.update(header.getStateId());

Review Comment:
   I think we should branch on the hasRouterFederatedState and update one or 
the other. This method should be synchronized because you are updating state in 
shared variables.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {

Review Comment:
   HashMap isn't synchronized, so you need explicit synchronization.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+    }
+  }
+
+  public void updateNamespaceState(String nsId, Long stateId) {
+    if (!namespaceIdMap.containsKey(nsId)) {
+      namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+    }
+    namespaceIdMap.get(nsId).update(stateId);
+  }
+
+  public void 
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder 
headerBuilder) {
+    RouterFederatedStateProto.Builder federatedStateBuilder = 
RouterFederatedStateProto.newBuilder();
+    namespaceIdMap

Review Comment:
   This loop also needs a lock around it.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+    }
+  }
+
+  public void updateNamespaceState(String nsId, Long stateId) {
+    if (!namespaceIdMap.containsKey(nsId)) {
+      namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+    }
+    namespaceIdMap.get(nsId).update(stateId);
+  }
+
+  public void 
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder 
headerBuilder) {
+    RouterFederatedStateProto.Builder federatedStateBuilder = 
RouterFederatedStateProto.newBuilder();
+    namespaceIdMap
+        .forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, 
v.get()));
+    
headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+  }
+
+  public NamespaceStateId getNamespaceId(String nsId) {

Review Comment:
   And this method should be synchronized.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
 @InterfaceStability.Evolving
 public class ClientGSIContext implements AlignmentContext {
 
-  private final LongAccumulator lastSeenStateId =
-      new LongAccumulator(Math::max, Long.MIN_VALUE);
+  private final NamespaceStateId lastSeenStateId;
+  private ByteString routerFederatedState;
+
+  public ClientGSIContext() {
+    this(new NamespaceStateId());
+  }
+
+  public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+    this.lastSeenStateId = lastSeenStateId;
+    routerFederatedState = null;
+  }
 
   @Override
   public long getLastSeenStateId() {
     return lastSeenStateId.get();
   }
 
+  public void updateLastSeenStateID(Long stateId) {

Review Comment:
   I don't see anyone calling this. This method seems problematic in that it 
only updates the lastSeenStateId, which shouldn't be changed if this a router 
connection.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**

Review Comment:
   This class should be moved to RBF rather than down in the HDFS client.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */

Review Comment:
   What level of sharing is there between these? Is a single one used for a 
single user? or is it per an incoming connection to the router? I assume they 
shared across out going connections to the different NameNodes. A comment 
saying how they are shared would be very useful.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);

Review Comment:
   This loop needs a lock around it.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+  private final Map<String, NamespaceStateId> namespaceIdMap = new 
ConcurrentHashMap<>();
+
+  public void 
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+    if (header.hasRouterFederatedState()) {
+      RouterFederatedStateProto federatedState = null;
+      try {
+        federatedState = 
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+      
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+    }
+  }
+
+  public void updateNamespaceState(String nsId, Long stateId) {

Review Comment:
   Make this method synchronized.





> RBF: Support observer node from Router-Based Federation
> -------------------------------------------------------
>
>                 Key: HDFS-13522
>                 URL: https://issues.apache.org/jira/browse/HDFS-13522
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: federation, namenode
>            Reporter: Erik Krogen
>            Assignee: Simbarashe Dzinamarira
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HDFS-13522.001.patch, HDFS-13522.002.patch, 
> HDFS-13522_WIP.patch, RBF_ Observer support.pdf, Router+Observer RPC 
> clogging.png, ShortTerm-Routers+Observer.png, 
> observer_reads_in_rbf_proposal_simbadzina_v1.pdf, 
> observer_reads_in_rbf_proposal_simbadzina_v2.pdf
>
>          Time Spent: 20h 50m
>  Remaining Estimate: 0h
>
> Changes will need to occur to the router to support the new observer node.
> One such change will be to make the router understand the observer state, 
> e.g. {{FederationNamenodeServiceState}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to