[ 
https://issues.apache.org/jira/browse/HDFS-17595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871373#comment-17871373
 ] 

ASF GitHub Bot commented on HDFS-17595:
---------------------------------------

KeeProMise commented on code in PR #6983:
URL: https://github.com/apache/hadoop/pull/6983#discussion_r1705724950


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
+import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class AsyncErasureCoding extends ErasureCoding{
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  public AsyncErasureCoding(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient =  this.rpcServer.getRPCClient();
+    this.namenodeResolver = this.rpcClient.getNamenodeResolver();
+  }
+
+  public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getErasureCodingPolicies");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    // Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]> ret =
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, ErasureCodingPolicyInfo[].class);
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, 
ErasureCodingPolicyInfo[]>, ErasureCodingPolicyInfo[]>) ret -> {
+      return merge(ret, ErasureCodingPolicyInfo.class);
+    });
+

Review Comment:
   Thank you for your contribution. You can delete the comment there and pay 
attention to the length of the line.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
+import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class AsyncErasureCoding extends ErasureCoding{
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  public AsyncErasureCoding(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient =  this.rpcServer.getRPCClient();
+    this.namenodeResolver = this.rpcClient.getNamenodeResolver();
+  }
+
+  public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getErasureCodingPolicies");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    // Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]> ret =
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, ErasureCodingPolicyInfo[].class);
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, 
ErasureCodingPolicyInfo[]>, ErasureCodingPolicyInfo[]>) ret -> {
+      return merge(ret, ErasureCodingPolicyInfo.class);
+    });
+
+    return asyncReturn(ErasureCodingPolicyInfo[].class);
+  }
+
+  @Override
+  public Map getErasureCodingCodecs() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getErasureCodingCodecs");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, Map.class);
+
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, Map>, Map>) 
retCodecs -> {
+      Map<String, String> ret = new HashMap<>();
+      Object obj = retCodecs;
+      @SuppressWarnings("unchecked")
+      Map<FederationNamespaceInfo, Map<String, String>> results =
+          (Map<FederationNamespaceInfo, Map<String, String>>)obj;
+      Collection<Map<String, String>> allCodecs = results.values();
+      for (Map<String, String> codecs : allCodecs) {
+        ret.putAll(codecs);
+      }
+      return ret;
+    });
+
+    return asyncReturn(Map.class);
+  }
+
+  @Override
+  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
+      ErasureCodingPolicy[] policies) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("addErasureCodingPolicies",
+        new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies});
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, AddErasureCodingPolicyResponse[].class);
+
+    asyncApply(
+        (ApplyFunction
+            <Map<FederationNamespaceInfo, AddErasureCodingPolicyResponse[]>,
+                AddErasureCodingPolicyResponse[]>) ret -> {
+          return merge(ret, AddErasureCodingPolicyResponse.class);
+        });
+    return asyncReturn(AddErasureCodingPolicyResponse[].class);
+  }
+
+  @Override
+  public ErasureCodingPolicy getErasureCodingPolicy(String src)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, false, false);
+    RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy",
+        new Class<?>[] {String.class}, new RemoteParam());
+    rpcClient.invokeSequential(
+        locations, remoteMethod, null, null);
+
+    asyncApply(ret -> {
+      return (ErasureCodingPolicy) ret;
+    });
+
+    return asyncReturn(ErasureCodingPolicy.class);
+  }
+
+
+  @Override
+  public ECTopologyVerifierResult getECTopologyResultForPolicies(
+      String[] policyNames) throws IOException {
+    RemoteMethod method = new RemoteMethod("getECTopologyResultForPolicies",
+        new Class<?>[] {String[].class}, new Object[] {policyNames});
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    if (nss.isEmpty()) {
+      throw new IOException("No namespace availaible.");
+    }
+
+    // Map<FederationNamespaceInfo, ECTopologyVerifierResult> ret
+    rpcClient.invokeConcurrent(nss, method, true, false,
+        ECTopologyVerifierResult.class);
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, 
ECTopologyVerifierResult>, ECTopologyVerifierResult>) ret -> {
+      for (Map.Entry<FederationNamespaceInfo, ECTopologyVerifierResult> entry 
: ret
+          .entrySet()) {
+        if (!entry.getValue().isSupported()) {
+          return entry.getValue();
+        }
+      }
+      // If no negative result, return the result from the first namespace.
+      return ret.get(nss.iterator().next());
+    });

Review Comment:
   Same as above.





> [ARR] ErasureCoding supports asynchronous rpc.
> ----------------------------------------------
>
>                 Key: HDFS-17595
>                 URL: https://issues.apache.org/jira/browse/HDFS-17595
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: rbf
>            Reporter: Jian Zhang
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> *Describe*
> The main new addition is AsyncErasureCoding, which extends ErasureCoding so 
> that supports asynchronous rpc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to