This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bbb5742c5 HDDS-8024. When readChunk from a datanode fails, retry 
other datanodes. (#4336)
3bbb5742c5 is described below

commit 3bbb5742c5f494d940ced1553e5772eecfb6398c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Mar 3 17:03:45 2023 -0800

    HDDS-8024. When readChunk from a datanode fails, retry other datanodes. 
(#4336)
---
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  | 39 ++++++++++++--
 .../hdds/scm/storage/ContainerProtocolCalls.java   | 59 +++++++++++++++++++++-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  5 +-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 24 ++++++---
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    |  4 +-
 5 files changed, 116 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 116e767667..b91e7f3717 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -153,6 +153,11 @@ public final class Pipeline {
     this.leaderId = leaderId;
   }
 
+  /** @return the number of datanodes in this pipeline. */
+  public int size() {
+    return nodeStatus.size();
+  }
+
   /**
    * Returns the list of nodes which form this pipeline.
    *
@@ -216,18 +221,46 @@ public final class Pipeline {
   }
 
   public DatanodeDetails getFirstNode() throws IOException {
+    return getFirstNode(null);
+  }
+
+  public DatanodeDetails getFirstNode(Set<DatanodeDetails> excluded)
+      throws IOException {
+    if (excluded == null) {
+      excluded = Collections.emptySet();
+    }
     if (nodeStatus.isEmpty()) {
       throw new IOException(String.format("Pipeline=%s is empty", id));
     }
-    return nodeStatus.keySet().iterator().next();
+    for (DatanodeDetails d : nodeStatus.keySet()) {
+      if (!excluded.contains(d)) {
+        return d;
+      }
+    }
+    throw new IOException(String.format(
+        "All nodes are excluded: Pipeline=%s, excluded=%s", id, excluded));
   }
 
   public DatanodeDetails getClosestNode() throws IOException {
+    return getClosestNode(null);
+  }
+
+  public DatanodeDetails getClosestNode(Set<DatanodeDetails> excluded)
+      throws IOException {
+    if (excluded == null) {
+      excluded = Collections.emptySet();
+    }
     if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) {
       LOG.debug("Nodes in order is empty, delegate to getFirstNode");
-      return getFirstNode();
+      return getFirstNode(excluded);
+    }
+    for (DatanodeDetails d : nodesInOrder.get()) {
+      if (!excluded.contains(d)) {
+        return d;
+      }
     }
-    return nodesInOrder.get().get(0);
+    throw new IOException(String.format(
+        "All nodes are excluded: Pipeline=%s, excluded=%s", id, excluded));
   }
 
   public boolean isClosed() {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index b921d4c897..bfe3ebceaa 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hdds.scm.storage;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
@@ -47,6 +49,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRe
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
@@ -64,12 +67,16 @@ import org.apache.hadoop.security.token.Token;
 
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of all container protocol calls performed by Container
  * clients.
  */
 public final class ContainerProtocolCalls  {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerProtocolCalls.class);
 
   /**
    * There is no need to instantiate this class.
@@ -278,7 +285,32 @@ public final class ContainerProtocolCalls  {
             .setBlockID(blockID.getDatanodeBlockIDProtobuf())
             .setChunkData(chunk)
             .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
-    String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
+    final Pipeline pipeline = xceiverClient.getPipeline();
+    final Set<DatanodeDetails> excluded = new HashSet<>();
+    for (; ;) {
+      final DatanodeDetails d = pipeline.getClosestNode(excluded);
+
+      try {
+        return readChunk(xceiverClient, chunk, blockID,
+            validators, token, readChunkRequest, d);
+      } catch (IOException e) {
+        excluded.add(d);
+        if (excluded.size() < pipeline.size()) {
+          LOG.warn(toErrorMessage(chunk, blockID, d), e);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private static ContainerProtos.ReadChunkResponseProto readChunk(
+      XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
+      List<CheckedBiFunction> validators,
+      Token<? extends TokenIdentifier> token,
+      ReadChunkRequestProto.Builder readChunkRequest,
+      DatanodeDetails d) throws IOException {
+    final String id = d.getUuidString();
     ContainerCommandRequestProto.Builder builder =
         ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
             .setContainerID(blockID.getContainerID())
@@ -289,7 +321,30 @@ public final class ContainerProtocolCalls  {
     ContainerCommandRequestProto request = builder.build();
     ContainerCommandResponseProto reply =
         xceiverClient.sendCommand(request, validators);
-    return reply.getReadChunk();
+    final ReadChunkResponseProto response = reply.getReadChunk();
+    final long readLen = getLen(response);
+    if (readLen != chunk.getLen()) {
+      throw new IOException(toErrorMessage(chunk, blockID, d)
+          + ": readLen=" + readLen);
+    }
+    return response;
+  }
+
+  static String toErrorMessage(ChunkInfo chunk, BlockID blockId,
+      DatanodeDetails d) {
+    return String.format("Failed to read chunk %s (len=%s) %s from %s",
+        chunk.getChunkName(), chunk.getLen(), blockId, d);
+  }
+
+  static long getLen(ReadChunkResponseProto response) {
+    if (response.hasData()) {
+      return response.getData().size();
+    } else if (response.hasDataBuffers()) {
+      return response.getDataBuffers() .getBuffersList().stream()
+          .mapToLong(ByteString::size).sum();
+    } else {
+      return -1;
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 00970788ea..e94074e827 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -452,8 +452,11 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
   @Override
   public void hsync() throws IOException {
     checkNotClosed();
+    final long hsyncPos = writeOffset;
     handleFlushOrClose(StreamAction.HSYNC);
-    blockOutputStreamEntryPool.hsyncKey(offset);
+    Preconditions.checkState(offset >= hsyncPos,
+        "offset = %s < hsyncPos = %s", offset, hsyncPos);
+    blockOutputStreamEntryPool.hsyncKey(hsyncPos);
   }
 
   /**
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 1985449e49..16d968c1b4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -54,6 +54,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
@@ -73,6 +75,8 @@ import static org.mockito.Mockito.when;
  */
 @Timeout(value = 300)
 public class TestHSync {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestHSync.class);
 
   private static MiniOzoneCluster cluster;
   private static OzoneBucket bucket;
@@ -122,10 +126,11 @@ public class TestHSync {
         OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
     CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
-    final Path file = new Path("/file");
-
     try (FileSystem fs = FileSystem.get(CONF)) {
-      runTestHSync(fs, file);
+      for (int i = 0; i < 10; i++) {
+        final Path file = new Path("/file" + i);
+        runTestHSync(fs, file, 1 << i);
+      }
     }
   }
 
@@ -139,17 +144,20 @@ public class TestHSync {
 
     final String dir = OZONE_ROOT + bucket.getVolumeName()
         + OZONE_URI_DELIMITER + bucket.getName();
-    final Path file = new Path(dir, "file");
 
     try (FileSystem fs = FileSystem.get(CONF)) {
-      runTestHSync(fs, file);
+      for (int i = 0; i < 10; i++) {
+        final Path file = new Path(dir, "file" + i);
+        runTestHSync(fs, file, 1 << i);
+      }
     }
   }
 
-  static void runTestHSync(FileSystem fs, Path file) throws Exception {
+  static void runTestHSync(FileSystem fs, Path file, int initialDataSize)
+      throws Exception {
     try (StreamWithLength out = new StreamWithLength(
         fs.create(file, true))) {
-      runTestHSync(fs, file, out, 1);
+      runTestHSync(fs, file, out, initialDataSize);
       for (int i = 1; i < 5; i++) {
         for (int j = -1; j <= 1; j++) {
           int dataSize = (1 << (i * 5)) + j;
@@ -187,6 +195,8 @@ public class TestHSync {
       StreamWithLength out, int dataSize)
       throws Exception {
     final long length = out.getLength();
+    LOG.info("runTestHSync {} with size {}, skipLength={}",
+        file, dataSize, length);
     final byte[] data = new byte[dataSize];
     ThreadLocalRandom.current().nextBytes(data);
     out.writeAndHsync(data);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
index d42f29b89e..ca4bb9e20e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
@@ -173,8 +173,8 @@ public class TestXceiverClientGrpc {
                 .setBytesPerChecksum(512)
                 .setType(ContainerProtos.ChecksumType.CRC32)
                 .build())
-            .setLen(100)
-            .setOffset(100)
+            .setLen(-1)
+            .setOffset(0)
             .build(),
         bid,
         null, null);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to