[ 
https://issues.apache.org/jira/browse/HDFS-17595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889567#comment-17889567
 ] 

ASF GitHub Bot commented on HDFS-17595:
---------------------------------------

Hexiaoqiao commented on code in PR #6983:
URL: https://github.com/apache/hadoop/pull/6983#discussion_r1800787847


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
+import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class AsyncErasureCoding extends ErasureCoding {
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  public AsyncErasureCoding(RouterRpcServer server) {
+    super(server);
+    this.rpcServer = server;
+    this.rpcClient =  this.rpcServer.getRPCClient();
+    this.namenodeResolver = this.rpcClient.getNamenodeResolver();
+  }
+
+  public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getErasureCodingPolicies");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, ErasureCodingPolicyInfo[].class);
+    asyncApply(
+        (ApplyFunction<Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]>,
+            ErasureCodingPolicyInfo[]>) ret -> merge(ret, 
ErasureCodingPolicyInfo.class));
+
+    return asyncReturn(ErasureCodingPolicyInfo[].class);
+  }
+
+  @Override
+  public Map<String, String> getErasureCodingCodecs() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getErasureCodingCodecs");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, Map.class);
+
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo,
+        Map<String, String>>, Map<String, String>>) retCodecs -> {
+        Map<String, String> ret = new HashMap<>();
+        Object obj = retCodecs;
+        @SuppressWarnings("unchecked")
+        Map<FederationNamespaceInfo, Map<String, String>> results =
+            (Map<FederationNamespaceInfo, Map<String, String>>)obj;
+        Collection<Map<String, String>> allCodecs = results.values();
+        for (Map<String, String> codecs : allCodecs) {
+          ret.putAll(codecs);
+        }
+        return ret;
+      });
+
+    return asyncReturn(Map.class);
+  }
+
+  @Override
+  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
+      ErasureCodingPolicy[] policies) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("addErasureCodingPolicies",
+        new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies});
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+
+    rpcClient.invokeConcurrent(
+        nss, method, true, false, AddErasureCodingPolicyResponse[].class);
+
+    asyncApply(
+        (ApplyFunction<Map<FederationNamespaceInfo, 
AddErasureCodingPolicyResponse[]>,
+            AddErasureCodingPolicyResponse[]>) ret -> {
+          return merge(ret, AddErasureCodingPolicyResponse.class);
+        });
+    return asyncReturn(AddErasureCodingPolicyResponse[].class);
+  }
+
+  @Override
+  public ErasureCodingPolicy getErasureCodingPolicy(String src)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, false, false);
+    RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy",
+        new Class<?>[] {String.class}, new RemoteParam());
+    rpcClient.invokeSequential(
+        locations, remoteMethod, null, null);
+
+    asyncApply(ret -> {
+      return (ErasureCodingPolicy) ret;
+    });
+
+    return asyncReturn(ErasureCodingPolicy.class);
+  }
+
+

Review Comment:
   extra blank line.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
+import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+
+public class AsyncErasureCoding extends ErasureCoding {

Review Comment:
   Some other interface such as `removeErasureCodingPolicy` do not need to 
override with async?





> [ARR] ErasureCoding supports asynchronous rpc.
> ----------------------------------------------
>
>                 Key: HDFS-17595
>                 URL: https://issues.apache.org/jira/browse/HDFS-17595
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: rbf
>            Reporter: Jian Zhang
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> *Describe*
> The main new addition is AsyncErasureCoding, which extends ErasureCoding so 
> that supports asynchronous rpc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to