[ 
https://issues.apache.org/jira/browse/HDFS-17640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17909007#comment-17909007
 ] 

ASF GitHub Bot commented on HDFS-17640:
---------------------------------------

Hexiaoqiao commented on code in PR #7188:
URL: https://github.com/apache/hadoop/pull/7188#discussion_r1899904630


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java:
##########
@@ -0,0 +1,1083 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
+import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import 
org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
+import 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncApplyFunction;
+import 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
+import 
org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.getCompletableFuture;
+
+/**
+ * Module that implements all the async RPC calls in {@link ClientProtocol} in 
the
+ * {@link RouterRpcServer}.
+ */
+public class RouterAsyncClientProtocol extends RouterClientProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterAsyncClientProtocol.class.getName());
+
+  private final RouterRpcServer rpcServer;
+  private final RouterRpcClient rpcClient;
+  private final RouterFederationRename rbfRename;
+  private final FileSubclusterResolver subclusterResolver;
+  private final ActiveNamenodeResolver namenodeResolver;
+  /** If it requires response from all subclusters. */
+  private final boolean allowPartialList;
+  /** Time out when getting the mount statistics. */
+  private long mountStatusTimeOut;
+  /** Identifier for the super user. */
+  private String superUser;
+  /** Identifier for the super group. */
+  private final String superGroup;
+  /**
+   * Caching server defaults so as to prevent redundant calls to namenode,
+   * similar to DFSClient, caching saves efforts when router connects
+   * to multiple clients.
+   */
+  private volatile FsServerDefaults serverDefaults;
+
+  public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer 
rpcServer) {
+    super(conf, rpcServer);
+    this.rpcServer = rpcServer;
+    this.rpcClient = rpcServer.getRPCClient();
+    this.rbfRename = getRbfRename();
+    this.subclusterResolver = getSubclusterResolver();
+    this.namenodeResolver = getNamenodeResolver();
+    this.allowPartialList = isAllowPartialList();
+    this.mountStatusTimeOut = getMountStatusTimeOut();
+    this.superUser = getSuperUser();
+    this.superGroup = getSuperGroup();
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+    long now = Time.monotonicNow();
+    if ((serverDefaults == null) || (now - getServerDefaultsLastUpdate()
+        > getServerDefaultsValidityPeriod())) {
+      RemoteMethod method = new RemoteMethod("getServerDefaults");
+      rpcServer.invokeAtAvailableNsAsync(method, FsServerDefaults.class);
+      asyncApply(o -> {
+        serverDefaults = (FsServerDefaults) o;
+        setServerDefaultsLastUpdate(now);
+        return serverDefaults;
+      });
+    } else {
+      asyncComplete(serverDefaults);
+    }
+    return asyncReturn(FsServerDefaults.class);
+  }
+
+  @Override
+  public HdfsFileStatus create(String src, FsPermission masked,
+      String clientName, EnumSetWritable<CreateFlag> flag,
+      boolean createParent, short replication, long blockSize,
+      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
+      String storagePolicy) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    if (createParent && rpcServer.isPathAll(src)) {
+      int index = src.lastIndexOf(Path.SEPARATOR);
+      String parent = src.substring(0, index);
+      LOG.debug("Creating {} requires creating parent {}", src, parent);
+      FsPermission parentPermissions = getParentPermission(masked);
+      mkdirs(parent, parentPermissions, createParent);
+      asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
+        if (!success) {
+          // This shouldn't happen as mkdirs returns true or exception
+          LOG.error("Couldn't create parents for {}", src);
+        }
+        return success;
+      });
+    }
+
+    RemoteMethod method = new RemoteMethod("create",
+        new Class<?>[] {String.class, FsPermission.class, String.class,
+            EnumSetWritable.class, boolean.class, short.class,
+            long.class, CryptoProtocolVersion[].class,
+            String.class, String.class},
+        new RemoteParam(), masked, clientName, flag, createParent,
+        replication, blockSize, supportedVersions, ecPolicyName, 
storagePolicy);
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, true);
+    final RemoteLocation[] createLocation = new RemoteLocation[1];
+    asyncTry(() -> {
+      rpcServer.getCreateLocationAsync(src, locations);
+      asyncApply((AsyncApplyFunction<RemoteLocation, Object>) remoteLocation 
-> {
+        createLocation[0] = remoteLocation;
+        rpcClient.invokeSingle(remoteLocation, method, HdfsFileStatus.class);
+        asyncApply((ApplyFunction<HdfsFileStatus, Object>) status -> {
+          status.setNamespace(remoteLocation.getNameserviceId());
+          return status;
+        });
+      });
+    });
+    asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
+      final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+          method, src, ioe, createLocation[0], locations);
+      rpcClient.invokeSequential(
+          newLocations, method, HdfsFileStatus.class, null);
+    }, IOException.class);
+
+    return asyncReturn(HdfsFileStatus.class);
+  }
+
+  @Override
+  public LastBlockWithStatus append(
+      String src, String clientName,
+      EnumSetWritable<CreateFlag> flag) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("append",
+        new Class<?>[] {String.class, String.class, EnumSetWritable.class},
+        new RemoteParam(), clientName, flag);
+    rpcClient.invokeSequential(method, locations, LastBlockWithStatus.class, 
null);
+    asyncApply((ApplyFunction<RemoteResult, LastBlockWithStatus>) result -> {
+      LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
+      
lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
+      return lbws;
+    });
+    return asyncReturn(LastBlockWithStatus.class);
+  }
+
+  @Deprecated
+  @Override
+  public boolean rename(final String src, final String dst)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    final List<RemoteLocation> srcLocations =
+        rpcServer.getLocationsForPath(src, true, false);
+    final List<RemoteLocation> dstLocations =
+        rpcServer.getLocationsForPath(dst, false, false);
+    // srcLocations may be trimmed by getRenameDestinations()
+    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
+    if (locs.isEmpty()) {
+      asyncComplete(
+          rbfRename.routerFedRename(src, dst, srcLocations, dstLocations));
+      return asyncReturn(Boolean.class);
+    }
+    RemoteMethod method = new RemoteMethod("rename",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(), dstParam);
+    isMultiDestDirectory(src);

Review Comment:
   Duplicate name here?





> [ARR] RouterClientProtocol supports asynchronous rpc.
> -----------------------------------------------------
>
>                 Key: HDFS-17640
>                 URL: https://issues.apache.org/jira/browse/HDFS-17640
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: rbf
>            Reporter: farmmamba
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> RouterClientProtocol should support asynchronous rpc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to