Revert "Revert "HDFS-10224. Implement asynchronous rename for 
DistributedFileSystem.  Contributed by Xiaobing Zhou""

This reverts commit 106234d873c60fa52cd0d812fb1cdc0c6b998a6d.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eded3d10
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eded3d10
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eded3d10

Branch: refs/heads/HDFS-7240
Commit: eded3d109e4c5225d8c5cd3c2d82e7ac93841263
Parents: 106234d
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Mon Jun 6 16:28:21 2016 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Mon Jun 6 16:28:21 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileSystem.java   |   1 -
 .../main/java/org/apache/hadoop/ipc/Client.java |  11 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  34 ++-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |   2 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 ++++++++
 .../hadoop/hdfs/DistributedFileSystem.java      |  22 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  45 +++-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 258 +++++++++++++++++++
 8 files changed, 463 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0ecd8b7..9e13a7a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1252,7 +1252,6 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   /**
    * Renames Path src to Path dst
    * <ul>
-   * <li
    * <li>Fails if src is a file and dst is a directory.
    * <li>Fails if src is a directory and dst is a file.
    * <li>Fails if the parent of dst does not exist or is a file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index f206861..d59aeb89 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -119,7 +119,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new 
ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new 
ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>> returnValue = new 
ThreadLocal<>();
+  private static final ThreadLocal<Future<?>>
+      RETURN_RPC_RESPONSE = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -130,8 +131,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getReturnValue() {
-    return (Future<T>) returnValue.get();
+  public static <T> Future<T> getReturnRpcResponse() {
+    return (Future<T>) RETURN_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1396,7 +1397,7 @@ public class Client implements AutoCloseable {
         }
       };
 
-      returnValue.set(returnFuture);
+      RETURN_RPC_RESPONSE.set(returnFuture);
       return null;
     } else {
       return getRpcResponse(call, connection);
@@ -1410,7 +1411,7 @@ public class Client implements AutoCloseable {
    *          synchronous mode.
    */
   @Unstable
-  static boolean isAsynchronousMode() {
+  public static boolean isAsynchronousMode() {
     return asynchronousMode.get();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 071e2e8..8fcdb78 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -26,7 +26,9 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputOutputStream;
 import org.apache.hadoop.io.Writable;
@@ -67,7 +70,9 @@ import com.google.protobuf.TextFormat;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  
+  private static final ThreadLocal<Callable<?>>
+      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
+
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
         RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@@ -76,6 +81,12 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
+  @SuppressWarnings("unchecked")
+  @Unstable
+  public static <T> Callable<T> getReturnMessageCallback() {
+    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
+  }
+
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
@@ -189,7 +200,7 @@ public class ProtobufRpcEngine implements RpcEngine {
      * the server.
      */
     @Override
-    public Object invoke(Object proxy, Method method, Object[] args)
+    public Object invoke(Object proxy, final Method method, Object[] args)
         throws ServiceException {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
@@ -251,6 +262,23 @@ public class ProtobufRpcEngine implements RpcEngine {
         LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
       }
       
+      if (Client.isAsynchronousMode()) {
+        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+        Callable<Message> callback = new Callable<Message>() {
+          @Override
+          public Message call() throws Exception {
+            return getReturnMessage(method, frrw.get());
+          }
+        };
+        RETURN_MESSAGE_CALLBACK.set(callback);
+        return null;
+      } else {
+        return getReturnMessage(method, val);
+      }
+    }
+
+    private Message getReturnMessage(final Method method,
+        final RpcResponseWrapper rrw) throws ServiceException {
       Message prototype = null;
       try {
         prototype = getReturnProtoType(method);
@@ -260,7 +288,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       Message returnMessage;
       try {
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(val.theResponseRead).build();
+            .mergeFrom(rrw.theResponseRead).build();
 
         if (LOG.isTraceEnabled()) {
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index de4395e..6cf75c7 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -84,7 +84,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getReturnValue();
+          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
new file mode 100644
index 0000000..37899aa
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.ipc.Client;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+/****************************************************************
+ * Implementation of the asynchronous distributed file system.
+ * This instance of this class is the way end-user code interacts
+ * with a Hadoop DistributedFileSystem in an asynchronous manner.
+ *
+ *****************************************************************/
+@Unstable
+public class AsyncDistributedFileSystem {
+
+  private final DistributedFileSystem dfs;
+
+  AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
+    this.dfs = dfs;
+  }
+
+  static <T> Future<T> getReturnValue() {
+    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+        .getReturnValueCallback();
+    Future<T> returnFuture = new AbstractFuture<T>() {
+      public T get() throws InterruptedException, ExecutionException {
+        try {
+          set(returnValueCallback.call());
+        } catch (Exception e) {
+          setException(e);
+        }
+        return super.get();
+      }
+    };
+    return returnFuture;
+  }
+
+  /**
+   * Renames Path src to Path dst
+   * <ul>
+   * <li>Fails if src is a file and dst is a directory.
+   * <li>Fails if src is a directory and dst is a file.
+   * <li>Fails if the parent of dst does not exist or is a file.
+   * </ul>
+   * <p>
+   * If OVERWRITE option is not passed as an argument, rename fails if the dst
+   * already exists.
+   * <p>
+   * If OVERWRITE option is passed as an argument, rename overwrites the dst if
+   * it is a file or an empty directory. Rename fails if dst is a non-empty
+   * directory.
+   * <p>
+   * Note that atomicity of rename is dependent on the file system
+   * implementation. Please refer to the file system documentation for details.
+   * This default implementation is non atomic.
+   *
+   * @param src
+   *          path to be renamed
+   * @param dst
+   *          new path after rename
+   * @throws IOException
+   *           on failure
+   * @return an instance of Future, #get of which is invoked to wait for
+   *         asynchronous call being finished.
+   */
+  public Future<Void> rename(Path src, Path dst,
+      final Options.Rename... options) throws IOException {
+    dfs.getFsStatistics().incrementWriteOps(1);
+
+    final Path absSrc = dfs.fixRelativePart(src);
+    final Path absDst = dfs.fixRelativePart(dst);
+
+    final boolean isAsync = Client.isAsynchronousMode();
+    Client.setAsynchronousMode(true);
+    try {
+      dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
+          options);
+      return getReturnValue();
+    } finally {
+      Client.setAsynchronousMode(isAsync);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 5e54edd..0ae4d70 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStoragePolicySpi;
@@ -204,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
    * @return path component of {file}
    * @throws IllegalArgumentException if URI does not belong to this DFS
    */
-  private String getPathName(Path file) {
+  String getPathName(Path file) {
     checkPath(file);
     String result = file.toUri().getPath();
     if (!DFSUtilClient.isValidName(result)) {
@@ -2509,4 +2510,23 @@ public class DistributedFileSystem extends FileSystem {
     }
     return ret;
   }
+
+  private final AsyncDistributedFileSystem adfs =
+      new AsyncDistributedFileSystem(this);
+
+  /** @return an {@link AsyncDistributedFileSystem} object. */
+  @Unstable
+  public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
+    return adfs;
+  }
+
+  @Override
+  protected Path fixRelativePart(Path p) {
+    return super.fixRelativePart(p);
+  }
+
+  Statistics getFsStatistics() {
+    return statistics;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 513a5e3..f4074b6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,11 +24,14 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -135,7 +139,6 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@@ -153,13 +156,15 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.*;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
 import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@@ -177,8 +182,9 @@ import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
@@ -190,12 +196,9 @@ import 
org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 
-import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
-import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
-    .EncryptionZoneProto;
-
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
  * while translating from the parameter types used in ClientProtocol to the
@@ -206,6 +209,8 @@ import static 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
+  private static final ThreadLocal<Callable<?>>
+      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -239,6 +244,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
+  @SuppressWarnings("unchecked")
+  @Unstable
+  public static <T> Callable<T> getReturnValueCallback() {
+    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
+  }
+
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -475,6 +486,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     RenameRequestProto req = RenameRequestProto.newBuilder()
         .setSrc(src)
         .setDst(dst).build();
+
     try {
       return rpcProxy.rename(null, req).getResult();
     } catch (ServiceException e) {
@@ -499,7 +511,22 @@ public class ClientNamenodeProtocolTranslatorPB implements
         setDst(dst).setOverwriteDest(overwrite).
         build();
     try {
-      rpcProxy.rename2(null, req);
+      if (Client.isAsynchronousMode()) {
+        rpcProxy.rename2(null, req);
+
+        final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+            .getReturnMessageCallback();
+        Callable<Void> callBack = new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            returnMessageCallback.call();
+            return null;
+          }
+        };
+        RETURN_VALUE_CALLBACK.set(callBack);
+      } else {
+        rpcProxy.rename2(null, req);
+      }
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eded3d10/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
new file mode 100644
index 0000000..9322e1a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAsyncDFSRename {
+  final Path asyncRenameDir = new Path("/test/async_rename/");
+  public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+  final private static Configuration CONF = new HdfsConfiguration();
+
+  final private static String GROUP1_NAME = "group1";
+  final private static String GROUP2_NAME = "group2";
+  final private static String USER1_NAME = "user1";
+  private static final UserGroupInformation USER1;
+
+  private MiniDFSCluster gCluster;
+
+  static {
+    // explicitly turn on permission checking
+    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+    // Initiate all four users
+    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] 
{
+        GROUP1_NAME, GROUP2_NAME });
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+    gCluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (gCluster != null) {
+      gCluster.shutdown();
+      gCluster = null;
+    }
+  }
+
+  static int countLease(MiniDFSCluster cluster) {
+    return TestDFSRename.countLease(cluster);
+  }
+
+  void list(DistributedFileSystem dfs, String name) throws IOException {
+    FileSystem.LOG.info("\n\n" + name);
+    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+      FileSystem.LOG.info("" + s.getPath());
+    }
+  }
+
+  static void createFile(DistributedFileSystem dfs, Path f) throws IOException 
{
+    DataOutputStream a_out = dfs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
+
+  /**
+   * Check the blocks of dst file are cleaned after rename with overwrite
+   * Restart NN to check the rename successfully
+   */
+  @Test
+  public void testAsyncRenameWithOverwrite() throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        replFactor).build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+
+    try {
+
+      long fileLen = blockSize * 3;
+      String src = "/foo/src";
+      String dst = "/foo/dst";
+      String src2 = "/foo/src2";
+      String dst2 = "/foo/dst2";
+      Path srcPath = new Path(src);
+      Path dstPath = new Path(dst);
+      Path srcPath2 = new Path(src2);
+      Path dstPath2 = new Path(dst2);
+
+      DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
+
+      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), dst, 0, fileLen);
+      LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), dst2, 0, fileLen);
+      BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
+          .getBlockManager();
+      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) != null);
+      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) != null);
+
+      Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+      Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
+      retVal1.get();
+      retVal2.get();
+
+      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) == null);
+      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) == null);
+
+      // Restart NN and check the rename successfully
+      cluster.restartNameNodes();
+      assertFalse(dfs.exists(srcPath));
+      assertTrue(dfs.exists(dstPath));
+      assertFalse(dfs.exists(srcPath2));
+      assertTrue(dfs.exists(dstPath2));
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
+    final Path renameDir = new Path(
+        "/test/concurrent_reanme_with_overwrite_dir/");
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+    int count = 1000;
+
+    try {
+      long fileLen = blockSize * 3;
+      assertTrue(dfs.mkdirs(renameDir));
+
+      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, 
Future<Void>>();
+
+      // concurrently invoking many rename
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
+      }
+
+      // wait for completing the calls
+      for (int i = 0; i < count; i++) {
+        returnFutures.get(i).get();
+      }
+
+      // Restart NN and check the rename successfully
+      cluster.restartNameNodes();
+
+      // very the src dir should not exist, dst should
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        assertFalse(dfs.exists(src));
+        assertTrue(dfs.exists(dst));
+      }
+    } finally {
+      dfs.delete(renameDir, true);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testAsyncRenameWithException() throws Exception {
+    FileSystem rootFs = FileSystem.get(CONF);
+    final Path renameDir = new Path("/test/async_rename_exception/");
+    final Path src = new Path(renameDir, "src");
+    final Path dst = new Path(renameDir, "dst");
+    rootFs.mkdirs(src);
+
+    AsyncDistributedFileSystem adfs = USER1
+        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+          @Override
+          public AsyncDistributedFileSystem run() throws Exception {
+            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+          }
+        });
+
+    try {
+      Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+      returnFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src);
+    }
+  }
+
+  private void checkPermissionDenied(final Exception e, final Path dir) {
+    assertTrue(e.getCause() instanceof ExecutionException);
+    assertTrue("Permission denied messages must carry AccessControlException",
+        e.getMessage().contains("AccessControlException"));
+    assertTrue("Permission denied messages must carry the username", e
+        .getMessage().contains(USER1_NAME));
+    assertTrue("Permission denied messages must carry the path parent", e
+        .getMessage().contains(dir.getParent().toUri().getPath()));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to