omalley commented on code in PR #4311:
URL: https://github.com/apache/hadoop/pull/4311#discussion_r940558557
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {
- private final LongAccumulator lastSeenStateId =
- new LongAccumulator(Math::max, Long.MIN_VALUE);
+ private final NamespaceStateId lastSeenStateId;
+ private ByteString routerFederatedState;
+
+ public ClientGSIContext() {
+ this(new NamespaceStateId());
+ }
+
+ public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+ this.lastSeenStateId = lastSeenStateId;
+ routerFederatedState = null;
+ }
@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
}
+ public void updateLastSeenStateID(Long stateId) {
Review Comment:
I don't see anyone calling this.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -66,15 +79,23 @@ public void
updateResponseState(RpcResponseHeaderProto.Builder header) {
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
- lastSeenStateId.accumulate(header.getStateId());
+ lastSeenStateId.update(header.getStateId());
Review Comment:
I think we should branch on the hasRouterFederatedState and update one or
the other. This method should be synchronized because you are updating state in
shared variables.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
Review Comment:
HashMap isn't synchronized, so you need explicit synchronization.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+ }
+ }
+
+ public void updateNamespaceState(String nsId, Long stateId) {
+ if (!namespaceIdMap.containsKey(nsId)) {
+ namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+ }
+ namespaceIdMap.get(nsId).update(stateId);
+ }
+
+ public void
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder
headerBuilder) {
+ RouterFederatedStateProto.Builder federatedStateBuilder =
RouterFederatedStateProto.newBuilder();
+ namespaceIdMap
Review Comment:
This loop also needs a lock around it.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+ }
+ }
+
+ public void updateNamespaceState(String nsId, Long stateId) {
+ if (!namespaceIdMap.containsKey(nsId)) {
+ namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+ }
+ namespaceIdMap.get(nsId).update(stateId);
+ }
+
+ public void
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder
headerBuilder) {
+ RouterFederatedStateProto.Builder federatedStateBuilder =
RouterFederatedStateProto.newBuilder();
+ namespaceIdMap
+ .forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k,
v.get()));
+
headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+ }
+
+ public NamespaceStateId getNamespaceId(String nsId) {
Review Comment:
And this method should be synchronized.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {
- private final LongAccumulator lastSeenStateId =
- new LongAccumulator(Math::max, Long.MIN_VALUE);
+ private final NamespaceStateId lastSeenStateId;
+ private ByteString routerFederatedState;
+
+ public ClientGSIContext() {
+ this(new NamespaceStateId());
+ }
+
+ public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+ this.lastSeenStateId = lastSeenStateId;
+ routerFederatedState = null;
+ }
@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
}
+ public void updateLastSeenStateID(Long stateId) {
Review Comment:
I don't see anyone calling this. This method seems problematic in that it
only updates the lastSeenStateId, which shouldn't be changed if this a router
connection.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
Review Comment:
This class should be moved to RBF rather than down in the HDFS client.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
Review Comment:
What level of sharing is there between these? Is a single one used for a
single user? or is it per an incoming connection to the router? I assume they
shared across out going connections to the different NameNodes. A comment
saying how they are shared would be very useful.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
Review Comment:
This loop needs a lock around it.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+ }
+ }
+
+ public void updateNamespaceState(String nsId, Long stateId) {
Review Comment:
Make this method synchronized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]