[
https://issues.apache.org/jira/browse/HDFS-13522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576940#comment-17576940
]
ASF GitHub Bot commented on HDFS-13522:
---------------------------------------
omalley commented on code in PR #4311:
URL: https://github.com/apache/hadoop/pull/4311#discussion_r940558557
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {
- private final LongAccumulator lastSeenStateId =
- new LongAccumulator(Math::max, Long.MIN_VALUE);
+ private final NamespaceStateId lastSeenStateId;
+ private ByteString routerFederatedState;
+
+ public ClientGSIContext() {
+ this(new NamespaceStateId());
+ }
+
+ public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+ this.lastSeenStateId = lastSeenStateId;
+ routerFederatedState = null;
+ }
@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
}
+ public void updateLastSeenStateID(Long stateId) {
Review Comment:
I don't see anyone calling this.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -66,15 +79,23 @@ public void
updateResponseState(RpcResponseHeaderProto.Builder header) {
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
- lastSeenStateId.accumulate(header.getStateId());
+ lastSeenStateId.update(header.getStateId());
Review Comment:
I think we should branch on the hasRouterFederatedState and update one or
the other. This method should be synchronized because you are updating state in
shared variables.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
Review Comment:
HashMap isn't synchronized, so you need explicit synchronization.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+ }
+ }
+
+ public void updateNamespaceState(String nsId, Long stateId) {
+ if (!namespaceIdMap.containsKey(nsId)) {
+ namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+ }
+ namespaceIdMap.get(nsId).update(stateId);
+ }
+
+ public void
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder
headerBuilder) {
+ RouterFederatedStateProto.Builder federatedStateBuilder =
RouterFederatedStateProto.newBuilder();
+ namespaceIdMap
Review Comment:
This loop also needs a lock around it.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+ }
+ }
+
+ public void updateNamespaceState(String nsId, Long stateId) {
+ if (!namespaceIdMap.containsKey(nsId)) {
+ namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId());
+ }
+ namespaceIdMap.get(nsId).update(stateId);
+ }
+
+ public void
setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder
headerBuilder) {
+ RouterFederatedStateProto.Builder federatedStateBuilder =
RouterFederatedStateProto.newBuilder();
+ namespaceIdMap
+ .forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k,
v.get()));
+
headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString());
+ }
+
+ public NamespaceStateId getNamespaceId(String nsId) {
Review Comment:
And this method should be synchronized.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java:
##########
@@ -37,14 +37,27 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {
- private final LongAccumulator lastSeenStateId =
- new LongAccumulator(Math::max, Long.MIN_VALUE);
+ private final NamespaceStateId lastSeenStateId;
+ private ByteString routerFederatedState;
+
+ public ClientGSIContext() {
+ this(new NamespaceStateId());
+ }
+
+ public ClientGSIContext(NamespaceStateId lastSeenStateId) {
+ this.lastSeenStateId = lastSeenStateId;
+ routerFederatedState = null;
+ }
@Override
public long getLastSeenStateId() {
return lastSeenStateId.get();
}
+ public void updateLastSeenStateID(Long stateId) {
Review Comment:
I don't see anyone calling this. This method seems problematic in that it
only updates the lastSeenStateId, which shouldn't be changed if this a router
connection.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
Review Comment:
This class should be moved to RBF rather than down in the HDFS client.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
Review Comment:
What level of sharing is there between these? Is a single one used for a
single user? or is it per an incoming connection to the router? I assume they
shared across out going connections to the different NameNodes. A comment
saying how they are shared would be very useful.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
Review Comment:
This loop needs a lock around it.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FederatedNamespaceIds.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RouterFederatedStateProto;
+import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+
+
+/** Collection of last-seen namespace state Ids for a set of namespaces. */
+public class FederatedNamespaceIds {
+ private final Map<String, NamespaceStateId> namespaceIdMap = new
ConcurrentHashMap<>();
+
+ public void
updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) {
+ if (header.hasRouterFederatedState()) {
+ RouterFederatedStateProto federatedState = null;
+ try {
+ federatedState =
RouterFederatedStateProto.parseFrom(header.getRouterFederatedState());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
federatedState.getNamespaceStateIdsMap().forEach(this::updateNamespaceState);
+ }
+ }
+
+ public void updateNamespaceState(String nsId, Long stateId) {
Review Comment:
Make this method synchronized.
> RBF: Support observer node from Router-Based Federation
> -------------------------------------------------------
>
> Key: HDFS-13522
> URL: https://issues.apache.org/jira/browse/HDFS-13522
> Project: Hadoop HDFS
> Issue Type: Sub-task
> Components: federation, namenode
> Reporter: Erik Krogen
> Assignee: Simbarashe Dzinamarira
> Priority: Major
> Labels: pull-request-available
> Attachments: HDFS-13522.001.patch, HDFS-13522.002.patch,
> HDFS-13522_WIP.patch, RBF_ Observer support.pdf, Router+Observer RPC
> clogging.png, ShortTerm-Routers+Observer.png,
> observer_reads_in_rbf_proposal_simbadzina_v1.pdf,
> observer_reads_in_rbf_proposal_simbadzina_v2.pdf
>
> Time Spent: 20h 50m
> Remaining Estimate: 0h
>
> Changes will need to occur to the router to support the new observer node.
> One such change will be to make the router understand the observer state,
> e.g. {{FederationNamenodeServiceState}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]