This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 8cff9fd  HDDS-6368. EC: Fix broken future chain and cleanup 
unnecessary validation function. (#3128)
8cff9fd is described below

commit 8cff9fd2ad2c4bcaaf70a64c0ff6b5ebc3f9b876
Author: Gui Hecheng <[email protected]>
AuthorDate: Wed Mar 9 00:34:36 2022 +0800

    HDDS-6368. EC: Fix broken future chain and cleanup unnecessary validation 
function. (#3128)
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 37 +++++++--------
 .../ozone/client/io/ECBlockOutputStreamEntry.java  | 52 ++++++++++++----------
 2 files changed, 47 insertions(+), 42 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index fbf041f..b86464c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -686,25 +686,26 @@ public class BlockOutputStream extends OutputStream {
     try {
       XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
           blockID.get(), data, token, replicationIndex);
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
-          asyncReply.getResponse();
-      future.thenApplyAsync(e -> {
-        try {
-          validateResponse(e);
-        } catch (IOException sce) {
-          future.completeExceptionally(sce);
-        }
-        return e;
-      }, responseExecutor).exceptionally(e -> {
-        String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " 
+
-            "into block " + blockID;
-        LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
-        CompletionException ce = new CompletionException(msg, e);
-        setIoException(ce);
-        throw ce;
-      });
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+          respFuture = asyncReply.getResponse();
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+          validateFuture = respFuture.thenApplyAsync(e -> {
+            try {
+              validateResponse(e);
+            } catch (IOException sce) {
+              respFuture.completeExceptionally(sce);
+            }
+            return e;
+          }, responseExecutor).exceptionally(e -> {
+            String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+                " into block " + blockID;
+            LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+            CompletionException ce = new CompletionException(msg, e);
+            setIoException(ce);
+            throw ce;
+          });
       containerBlockData.addChunks(chunkInfo);
-      return future;
+      return validateFuture;
     } catch (IOException | ExecutionException e) {
       throw new IOException(EXCEPTION_MSG + e.toString(), e);
     } catch (InterruptedException ex) {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 88f8d2e..ebfa5cc 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -25,14 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -60,6 +60,10 @@ import static 
org.apache.ratis.util.Preconditions.assertInstanceOf;
  * is derived from the original EC pipeline.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockOutputStreamEntry.class);
+
   private final ECReplicationConfig replicationConfig;
   private final long length;
 
@@ -304,9 +308,9 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
    * stripe. After every stripe write finishes, use this method to validate the
    * responses of current stripe data writes. This method can also be used to
    * validate the stripe put block responses.
-   * @param forPutBlock : If true, it will validate the put block response
-   *                   futures. It will validates stripe data write response
-   *                   futures if false.
+   * @param forPutBlock If true, it will validate the put block response
+   *                    futures. It will validate stripe data write response
+   *                    futures if false.
    * @return
    */
   private List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
@@ -317,11 +321,9 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
           responseFuture = null;
       if (forPutBlock) {
-        responseFuture =
-            stream != null ? stream.getCurrentPutBlkResponseFuture() : null;
+        responseFuture = stream.getCurrentPutBlkResponseFuture();
       } else {
-        responseFuture =
-            stream != null ? stream.getCurrentChunkResponseFuture() : null;
+        responseFuture = stream.getCurrentChunkResponseFuture();
       }
       if (isFailed(stream, responseFuture)) {
         failedStreams.add(stream);
@@ -334,12 +336,19 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
       ECBlockOutputStream outputStream,
       CompletableFuture<ContainerProtos.
           ContainerCommandResponseProto> chunkWriteResponseFuture) {
+
+    if (chunkWriteResponseFuture == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to reap response from datanode {}",
+            outputStream.getDatanodeDetails());
+      }
+      return true;
+    }
+
     ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
         = null;
     try {
-      containerCommandResponseProto = chunkWriteResponseFuture != null ?
-          chunkWriteResponseFuture.get() :
-          null;
+      containerCommandResponseProto = chunkWriteResponseFuture.get();
     } catch (InterruptedException e) {
       outputStream.setIoException(e);
       Thread.currentThread().interrupt();
@@ -347,23 +356,18 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
       outputStream.setIoException(e);
     }
 
-    if ((outputStream != null && containerCommandResponseProto != null)
-        && (outputStream.getIoException() != null || isStreamFailed(
-        containerCommandResponseProto, outputStream))) {
+    if (outputStream.getIoException() != null) {
       return true;
     }
-    return false;
-  }
 
-  boolean isStreamFailed(
-      ContainerProtos.ContainerCommandResponseProto responseProto,
-      ECBlockOutputStream stream) {
-    try {
-      ContainerProtocolCalls.validateContainerResponse(responseProto);
-    } catch (StorageContainerException sce) {
-      stream.setIoException(sce);
+    if (containerCommandResponseProto == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Empty response from datanode {}",
+            outputStream.getDatanodeDetails());
+      }
       return true;
     }
+
     return false;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to