This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 8cff9fd HDDS-6368. EC: Fix broken future chain and cleanup
unnecessary validation function. (#3128)
8cff9fd is described below
commit 8cff9fd2ad2c4bcaaf70a64c0ff6b5ebc3f9b876
Author: Gui Hecheng <[email protected]>
AuthorDate: Wed Mar 9 00:34:36 2022 +0800
HDDS-6368. EC: Fix broken future chain and cleanup unnecessary validation
function. (#3128)
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 37 +++++++--------
.../ozone/client/io/ECBlockOutputStreamEntry.java | 52 ++++++++++++----------
2 files changed, 47 insertions(+), 42 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index fbf041f..b86464c 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -686,25 +686,26 @@ public class BlockOutputStream extends OutputStream {
try {
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, token, replicationIndex);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
- asyncReply.getResponse();
- future.thenApplyAsync(e -> {
- try {
- validateResponse(e);
- } catch (IOException sce) {
- future.completeExceptionally(sce);
- }
- return e;
- }, responseExecutor).exceptionally(e -> {
- String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " "
+
- "into block " + blockID;
- LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
- CompletionException ce = new CompletionException(msg, e);
- setIoException(ce);
- throw ce;
- });
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ respFuture = asyncReply.getResponse();
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ validateFuture = respFuture.thenApplyAsync(e -> {
+ try {
+ validateResponse(e);
+ } catch (IOException sce) {
+ respFuture.completeExceptionally(sce);
+ }
+ return e;
+ }, responseExecutor).exceptionally(e -> {
+ String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+ " into block " + blockID;
+ LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+ CompletionException ce = new CompletionException(msg, e);
+ setIoException(ce);
+ throw ce;
+ });
containerBlockData.addChunks(chunkInfo);
- return future;
+ return validateFuture;
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 88f8d2e..ebfa5cc 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -25,14 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
@@ -60,6 +60,10 @@ import static
org.apache.ratis.util.Preconditions.assertInstanceOf;
* is derived from the original EC pipeline.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECBlockOutputStreamEntry.class);
+
private final ECReplicationConfig replicationConfig;
private final long length;
@@ -304,9 +308,9 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
* stripe. After every stripe write finishes, use this method to validate the
* responses of current stripe data writes. This method can also be used to
* validate the stripe put block responses.
- * @param forPutBlock : If true, it will validate the put block response
- * futures. It will validates stripe data write response
- * futures if false.
+ * @param forPutBlock If true, it will validate the put block response
+ * futures. It will validate stripe data write response
+ * futures if false.
* @return
*/
private List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
@@ -317,11 +321,9 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
responseFuture = null;
if (forPutBlock) {
- responseFuture =
- stream != null ? stream.getCurrentPutBlkResponseFuture() : null;
+ responseFuture = stream.getCurrentPutBlkResponseFuture();
} else {
- responseFuture =
- stream != null ? stream.getCurrentChunkResponseFuture() : null;
+ responseFuture = stream.getCurrentChunkResponseFuture();
}
if (isFailed(stream, responseFuture)) {
failedStreams.add(stream);
@@ -334,12 +336,19 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
ECBlockOutputStream outputStream,
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> chunkWriteResponseFuture) {
+
+ if (chunkWriteResponseFuture == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to reap response from datanode {}",
+ outputStream.getDatanodeDetails());
+ }
+ return true;
+ }
+
ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
= null;
try {
- containerCommandResponseProto = chunkWriteResponseFuture != null ?
- chunkWriteResponseFuture.get() :
- null;
+ containerCommandResponseProto = chunkWriteResponseFuture.get();
} catch (InterruptedException e) {
outputStream.setIoException(e);
Thread.currentThread().interrupt();
@@ -347,23 +356,18 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
outputStream.setIoException(e);
}
- if ((outputStream != null && containerCommandResponseProto != null)
- && (outputStream.getIoException() != null || isStreamFailed(
- containerCommandResponseProto, outputStream))) {
+ if (outputStream.getIoException() != null) {
return true;
}
- return false;
- }
- boolean isStreamFailed(
- ContainerProtos.ContainerCommandResponseProto responseProto,
- ECBlockOutputStream stream) {
- try {
- ContainerProtocolCalls.validateContainerResponse(responseProto);
- } catch (StorageContainerException sce) {
- stream.setIoException(sce);
+ if (containerCommandResponseProto == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Empty response from datanode {}",
+ outputStream.getDatanodeDetails());
+ }
return true;
}
+
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]