Repository: hadoop
Updated Branches:
  refs/heads/ozone-0.3 575613ef7 -> 3191afa12


HDDS-676. Enable Read from open Containers via Standalone Protocol. Contributed 
by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3191afa1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3191afa1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3191afa1

Branch: refs/heads/ozone-0.3
Commit: 3191afa12134cb85ff3af68d50906a352b4b6979
Parents: 575613e
Author: Shashikant Banerjee <shashik...@apache.org>
Authored: Tue Oct 23 19:19:58 2018 +0530
Committer: Shashikant Banerjee <shashik...@apache.org>
Committed: Tue Oct 23 19:19:58 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientGrpc.java      | 194 +++++++++++++------
 .../hadoop/hdds/scm/XceiverClientManager.java   |  14 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java     |   6 +
 .../scm/container/common/helpers/Pipeline.java  |   4 +
 .../ozone/client/io/ChunkGroupInputStream.java  |   9 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |   4 +
 .../hadoop/ozone/TestMiniOzoneCluster.java      |   2 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    | 107 ++++++++++
 .../ozone/scm/TestXceiverClientManager.java     |  15 +-
 9 files changed, 285 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 2f11872..9526be3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.UUID;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
   private final Pipeline pipeline;
   private final Configuration config;
-  private XceiverClientProtocolServiceStub asyncStub;
+  private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
   private XceiverClientMetrics metrics;
-  private ManagedChannel channel;
+  private Map<UUID, ManagedChannel> channels;
   private final Semaphore semaphore;
   private boolean closed = false;
 
@@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     this.semaphore =
         new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
     this.metrics = XceiverClientManager.getXceiverClientMetrics();
+    this.channels = new HashMap<>();
+    this.asyncStubs = new HashMap<>();
   }
 
   @Override
   public void connect() throws Exception {
+
+    // leader by default is the 1st datanode in the datanode list of pipleline
     DatanodeDetails leader = this.pipeline.getLeader();
+    // just make a connection to the 1st datanode at the beginning
+    connectToDatanode(leader);
+  }
 
+  private void connectToDatanode(DatanodeDetails dn) {
     // read port from the data node, on failure use default configured
     // port.
-    int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+    int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
     if (port == 0) {
       port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     }
-    LOG.debug("Connecting to server Port : " + leader.getIpAddress());
-    channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
-        .usePlaintext()
-        .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-        .build();
-    asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+    LOG.debug("Connecting to server Port : " + dn.getIpAddress());
+    ManagedChannel channel =
+        NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
+            
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+            .build();
+    XceiverClientProtocolServiceStub asyncStub =
+        XceiverClientProtocolServiceGrpc.newStub(channel);
+    asyncStubs.put(dn.getUuid(), asyncStub);
+    channels.put(dn.getUuid(), channel);
   }
-
   /**
-   * Returns if the xceiver client connects to a server.
+   * Returns if the xceiver client connects to all servers in the pipeline.
    *
    * @return True if the connection is alive, false otherwise.
    */
   @VisibleForTesting
-  public boolean isConnected() {
-    return !channel.isTerminated() && !channel.isShutdown();
+  public boolean isConnected(DatanodeDetails details) {
+    return isConnected(channels.get(details.getUuid()));
+  }
+
+  private boolean isConnected(ManagedChannel channel) {
+    return channel != null && !channel.isTerminated() && !channel.isShutdown();
   }
 
   @Override
   public void close() {
     closed = true;
-    channel.shutdownNow();
-    try {
-      channel.awaitTermination(60, TimeUnit.MINUTES);
-    } catch (Exception e) {
-      LOG.error("Unexpected exception while waiting for channel termination",
-          e);
+    for (ManagedChannel channel : channels.values()) {
+      channel.shutdownNow();
+      try {
+        channel.awaitTermination(60, TimeUnit.MINUTES);
+      } catch (Exception e) {
+        LOG.error("Unexpected exception while waiting for channel termination",
+            e);
+      }
     }
   }
 
@@ -120,6 +140,56 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     return pipeline;
   }
 
+  @Override
+  public ContainerCommandResponseProto sendCommand(
+      ContainerCommandRequestProto request) throws IOException {
+    return sendCommandWithRetry(request);
+  }
+
+  public ContainerCommandResponseProto sendCommandWithRetry(
+      ContainerCommandRequestProto request) throws IOException {
+    int size = pipeline.getMachines().size();
+    ContainerCommandResponseProto responseProto = null;
+    DatanodeDetails dn = null;
+
+    // In case of an exception or an error, we will try to read from the
+    // datanodes in the pipeline in a round robin fashion.
+
+    // TODO: cache the correct leader info in here, so that any subsequent 
calls
+    // should first go to leader
+    for (int dnIndex = 0; dnIndex < size; dnIndex++) {
+      try {
+        dn = pipeline.getMachines().get(dnIndex);
+        LOG.debug("Executing command " + request + " on datanode " + dn);
+        // In case the command gets retried on a 2nd datanode,
+        // sendCommandAsyncCall will create a new channel and async stub
+        // in case these don't exist for the specific datanode.
+        responseProto = sendCommandAsync(request, dn).get();
+        if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
+          break;
+        }
+      } catch (ExecutionException | InterruptedException e) {
+        LOG.warn("Failed to execute command " + request + " on datanode " + dn
+            .getUuidString(), e);
+      }
+    }
+
+    if (responseProto != null) {
+      return responseProto;
+    } else {
+      throw new IOException(
+          "Failed to execute command " + request + " on the pipeline "
+              + pipeline.getId());
+    }
+  }
+
+  // TODO: for a true async API, once the waitable future while executing
+  // the command on one channel fails, it should be retried asynchronously
+  // on the future Task for all the remaining datanodes.
+
+  // Note: this Async api is not used currently used in any active I/O path.
+  // In case it gets used, the asynchronous retry logic needs to be plugged
+  // in here.
   /**
    * Sends a given command to server gets a waitable future back.
    *
@@ -128,15 +198,25 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public CompletableFuture<ContainerCommandResponseProto>
-      sendCommandAsync(ContainerCommandRequestProto request)
+  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+      ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
-    if(closed){
+    return sendCommandAsync(request, pipeline.getLeader());
+  }
+
+  private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+      ContainerCommandRequestProto request, DatanodeDetails dn)
+      throws IOException, ExecutionException, InterruptedException {
+    if (closed) {
       throw new IOException("This channel is not connected.");
     }
 
-    if(channel == null || !isConnected()) {
-      reconnect();
+    UUID dnId = dn.getUuid();
+    ManagedChannel channel = channels.get(dnId);
+    // If the channel doesn't exist for this specific datanode or the channel
+    // is closed, just reconnect
+    if (!isConnected(channel)) {
+      reconnect(dn);
     }
 
     final CompletableFuture<ContainerCommandResponseProto> replyFuture =
@@ -145,48 +225,54 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     long requestTime = Time.monotonicNowNanos();
     metrics.incrPendingContainerOpsMetrics(request.getCmdType());
     // create a new grpc stream for each non-async call.
+
+    // TODO: for async calls, we should reuse StreamObserver resources.
     final StreamObserver<ContainerCommandRequestProto> requestObserver =
-        asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
-          @Override
-          public void onNext(ContainerCommandResponseProto value) {
-            replyFuture.complete(value);
-            metrics.decrPendingContainerOpsMetrics(request.getCmdType());
-            metrics.addContainerOpsLatency(request.getCmdType(),
-                Time.monotonicNowNanos() - requestTime);
-            semaphore.release();
-          }
-          @Override
-          public void onError(Throwable t) {
-            replyFuture.completeExceptionally(t);
-            metrics.decrPendingContainerOpsMetrics(request.getCmdType());
-            metrics.addContainerOpsLatency(request.getCmdType(),
-                Time.monotonicNowNanos() - requestTime);
-            semaphore.release();
-          }
-
-          @Override
-          public void onCompleted() {
-            if (!replyFuture.isDone()) {
-              replyFuture.completeExceptionally(
-                  new IOException("Stream completed but no reply for request "
-                      + request));
-            }
-          }
-        });
+        asyncStubs.get(dnId)
+            .send(new StreamObserver<ContainerCommandResponseProto>() {
+              @Override
+              public void onNext(ContainerCommandResponseProto value) {
+                replyFuture.complete(value);
+                metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+                metrics.addContainerOpsLatency(request.getCmdType(),
+                    Time.monotonicNowNanos() - requestTime);
+                semaphore.release();
+              }
+
+              @Override
+              public void onError(Throwable t) {
+                replyFuture.completeExceptionally(t);
+                metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+                metrics.addContainerOpsLatency(request.getCmdType(),
+                    Time.monotonicNowNanos() - requestTime);
+                semaphore.release();
+              }
+
+              @Override
+              public void onCompleted() {
+                if (!replyFuture.isDone()) {
+                  replyFuture.completeExceptionally(new IOException(
+                      "Stream completed but no reply for request " + request));
+                }
+              }
+            });
     requestObserver.onNext(request);
     requestObserver.onCompleted();
     return replyFuture;
   }
 
-  private void reconnect() throws IOException {
+  private void reconnect(DatanodeDetails dn)
+      throws IOException {
+    ManagedChannel channel;
     try {
-      connect();
+      connectToDatanode(dn);
+      channel = channels.get(dn.getUuid());
     } catch (Exception e) {
       LOG.error("Error while connecting: ", e);
       throw new IOException(e);
     }
 
-    if (channel == null || !isConnected()) {
+    if (channel == null || !isConnected(channel)) {
       throw new IOException("This channel is not connected.");
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index d542abc..83b5a4c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -27,7 +27,6 @@ import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -59,7 +58,7 @@ public class XceiverClientManager implements Closeable {
 
   //TODO : change this to SCM configuration class
   private final Configuration conf;
-  private final Cache<PipelineID, XceiverClientSpi> clientCache;
+  private final Cache<String, XceiverClientSpi> clientCache;
   private final boolean useRatis;
 
   private static XceiverClientMetrics metrics;
@@ -83,10 +82,10 @@ public class XceiverClientManager implements Closeable {
         .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
         .maximumSize(maxSize)
         .removalListener(
-            new RemovalListener<PipelineID, XceiverClientSpi>() {
+            new RemovalListener<String, XceiverClientSpi>() {
             @Override
             public void onRemoval(
-                RemovalNotification<PipelineID, XceiverClientSpi>
+                RemovalNotification<String, XceiverClientSpi>
                   removalNotification) {
               synchronized (clientCache) {
                 // Mark the entry as evicted
@@ -98,7 +97,7 @@ public class XceiverClientManager implements Closeable {
   }
 
   @VisibleForTesting
-  public Cache<PipelineID, XceiverClientSpi> getClientCache() {
+  public Cache<String, XceiverClientSpi> getClientCache() {
     return clientCache;
   }
 
@@ -140,13 +139,14 @@ public class XceiverClientManager implements Closeable {
 
   private XceiverClientSpi getClient(Pipeline pipeline)
       throws IOException {
+    HddsProtos.ReplicationType type = pipeline.getType();
     try {
-      return clientCache.get(pipeline.getId(),
+      return clientCache.get(pipeline.getId().getId().toString() + type,
           new Callable<XceiverClientSpi>() {
           @Override
           public XceiverClientSpi call() throws Exception {
             XceiverClientSpi client = null;
-            switch (pipeline.getType()) {
+            switch (type) {
             case RATIS:
               client = XceiverClientRatis.newXceiverClientRatis(pipeline, 
conf);
               break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 4efe7ba..fda30fb 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
@@ -51,6 +52,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -190,6 +192,10 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
         getClient().sendAsync(() -> byteString);
   }
 
+  public void watchForCommit(long index, long timeout) throws Exception {
+    getClient().sendWatchAsync(index, 
RaftProtos.ReplicationLevel.ALL_COMMITTED)
+        .get(timeout, TimeUnit.MILLISECONDS);
+  }
   /**
    * Sends a given command to server gets a waitable future back.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 49c00a8..7fa4c39 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -301,6 +301,10 @@ public class Pipeline {
     return b.toString();
   }
 
+  public void setType(HddsProtos.ReplicationType type) {
+    this.type = type;
+  }
+
   /**
    * Returns a JSON string of this object.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index 3158334..3772c59 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -276,8 +278,13 @@ public class ChunkGroupInputStream extends InputStream 
implements Seekable {
       long containerID = blockID.getContainerID();
       ContainerWithPipeline containerWithPipeline =
           storageContainerLocationClient.getContainerWithPipeline(containerID);
+      Pipeline pipeline = containerWithPipeline.getPipeline();
+
+      // irrespective of the container state, we will always read via 
Standalone
+      // protocol.
+      pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE);
       XceiverClientSpi xceiverClient = xceiverClientManager
-          .acquireClient(containerWithPipeline.getPipeline());
+          .acquireClient(pipeline);
       boolean success = false;
       containerKey = omKeyLocationInfo.getLocalID();
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 0a38a5a..5966718 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -116,6 +116,10 @@ public class ChunkGroupOutputStream extends OutputStream {
   public List<ChunkOutputStreamEntry> getStreamEntries() {
     return streamEntries;
   }
+  @VisibleForTesting
+  public XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
 
   public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
     List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 302ea46..bf6a189 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -102,7 +102,7 @@ public class TestMiniOzoneCluster {
       // Verify client is able to connect to the container
       try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
         client.connect();
-        assertTrue(client.isConnected());
+        assertTrue(client.isConnected(pipeline.getLeader()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index b60343a..1a401d6 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -24,6 +24,10 @@ import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.*;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -31,6 +35,7 @@ import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -597,6 +602,108 @@ public class TestOzoneRpcClient {
   }
 
   @Test
+  public void testPutKeyAndGetKeyThreeNodes()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+
+    OzoneOutputStream out = bucket
+        .createKey(keyName, value.getBytes().length, ReplicationType.RATIS,
+            ReplicationFactor.THREE);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) out.getOutputStream();
+    XceiverClientManager manager = groupOutputStream.getXceiverClientManager();
+    out.write(value.getBytes());
+    out.close();
+    // First, confirm the key info from the client matches the info in OM.
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
+    builder.setVolumeName(volumeName).setBucketName(bucketName)
+        .setKeyName(keyName);
+    OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()).
+        getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0);
+    long containerID = keyInfo.getContainerID();
+    long localID = keyInfo.getLocalID();
+    OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, keyDetails.getName());
+
+    List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations();
+    Assert.assertEquals(1, keyLocations.size());
+    Assert.assertEquals(containerID, keyLocations.get(0).getContainerID());
+    Assert.assertEquals(localID, keyLocations.get(0).getLocalID());
+
+    // Make sure that the data size matched.
+    Assert
+        .assertEquals(value.getBytes().length, 
keyLocations.get(0).getLength());
+
+    ContainerWithPipeline container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainerWithPipeline(containerID);
+    Pipeline pipeline = container.getPipeline();
+    List<DatanodeDetails> datanodes = pipeline.getMachines();
+
+    DatanodeDetails datanodeDetails = datanodes.get(0);
+    Assert.assertNotNull(datanodeDetails);
+
+    XceiverClientSpi clientSpi = manager.acquireClient(pipeline);
+    Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
+    XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
+
+    ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
+    // shutdown the datanode
+    cluster.shutdownHddsDatanode(datanodeDetails);
+
+    Assert.assertTrue(container.getContainerInfo().getState()
+        == HddsProtos.LifeCycleState.OPEN);
+    // try to read, this shouls be successful
+    readKey(bucket, keyName, value);
+
+    Assert.assertTrue(container.getContainerInfo().getState()
+        == HddsProtos.LifeCycleState.OPEN);
+    // shutdown the second datanode
+    datanodeDetails = datanodes.get(1);
+    cluster.shutdownHddsDatanode(datanodeDetails);
+    Assert.assertTrue(container.getContainerInfo().getState()
+        == HddsProtos.LifeCycleState.OPEN);
+
+    // the container is open and with loss of 2 nodes we still should be able
+    // to read via Standalone protocol
+    // try to read
+    readKey(bucket, keyName, value);
+
+    // shutdown the 3rd datanode
+    datanodeDetails = datanodes.get(2);
+    cluster.shutdownHddsDatanode(datanodeDetails);
+    try {
+      // try to read
+      readKey(bucket, keyName, value);
+      Assert.fail("Expected exception not thrown");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("Failed to execute command"));
+      Assert.assertTrue(
+          e.getMessage().contains("on the pipeline " + pipeline.getId()));
+    }
+    manager.releaseClient(clientSpi);
+  }
+
+  private void readKey(OzoneBucket bucket, String keyName, String data)
+      throws IOException {
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    OzoneInputStream is = bucket.readKey(keyName);
+    byte[] fileContent = new byte[data.getBytes().length];
+    is.read(fileContent);
+    is.close();
+  }
+
+  @Test
   public void testGetKeyDetails() throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3191afa1/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index da445bf..8b35bbb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.scm;
 import com.google.common.cache.Cache;
 import org.apache.commons.lang3.RandomStringUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -107,7 +106,7 @@ public class TestXceiverClientManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
-    Cache<PipelineID, XceiverClientSpi> cache =
+    Cache<String, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
     ContainerWithPipeline container1 =
@@ -130,8 +129,9 @@ public class TestXceiverClientManager {
     Assert.assertNotEquals(client1, client2);
 
     // least recent container (i.e containerName1) is evicted
-    XceiverClientSpi nonExistent1 = cache
-        .getIfPresent(container1.getContainerInfo().getPipelineID());
+    XceiverClientSpi nonExistent1 = cache.getIfPresent(
+        container1.getContainerInfo().getPipelineID().getId().toString()
+            + container1.getContainerInfo().getReplicationType());
     Assert.assertEquals(null, nonExistent1);
     // However container call should succeed because of refcount on the client.
     String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
@@ -160,7 +160,7 @@ public class TestXceiverClientManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
-    Cache<PipelineID, XceiverClientSpi> cache =
+    Cache<String, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
     ContainerWithPipeline container1 =
@@ -183,8 +183,9 @@ public class TestXceiverClientManager {
     Assert.assertNotEquals(client1, client2);
 
     // now client 1 should be evicted
-    XceiverClientSpi nonExistent = cache
-        .getIfPresent(container1.getContainerInfo().getPipelineID());
+    XceiverClientSpi nonExistent = cache.getIfPresent(
+        container1.getContainerInfo().getPipelineID().getId().toString()
+            + container1.getContainerInfo().getReplicationType());
     Assert.assertEquals(null, nonExistent);
 
     // Any container operation should now fail


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to