This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 984cf66  HDDS-4667. BlockInputStream should give up read retry if 
pipeline is not updated (#1774)
984cf66 is described below

commit 984cf668a6f3e89dfbba659414de5749390723a6
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed Jan 13 07:02:58 2021 +0100

    HDDS-4667. BlockInputStream should give up read retry if pipeline is not 
updated (#1774)
---
 hadoop-hdds/client/pom.xml                         |  5 ++
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 32 +++++++--
 .../hdds/scm/storage/TestBlockInputStream.java     | 77 ++++++++++++++++++++++
 .../client/src/test/resources/log4j.properties     | 23 +++++++
 .../ozone/client/rpc/TestKeyInputStream.java       | 15 -----
 5 files changed, 133 insertions(+), 19 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index e1b51e8..e3b0824 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -57,5 +57,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 8cb97c1..a5f3091 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.hadoop.fs.CanUnbuffer;
@@ -35,9 +36,12 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockRe
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -66,6 +70,9 @@ public class BlockInputStream extends InputStream
   private XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
   private boolean initialized = false;
+  private final RetryPolicy retryPolicy =
+      HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
+  private int retries;
 
   // List of ChunkInputStreams, one for each chunk in the block
   private List<ChunkInputStream> chunkStreams;
@@ -168,7 +175,7 @@ public class BlockInputStream extends InputStream
     if (refreshPipelineFunction != null) {
       LOG.debug("Re-fetching pipeline for block {}", blockID);
       Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
-      if (newPipeline == null || newPipeline.equals(pipeline)) {
+      if (newPipeline == null || newPipeline.sameDatanodes(pipeline)) {
         LOG.warn("No new pipeline for block {}", blockID);
         throw cause;
       } else {
@@ -286,9 +293,14 @@ public class BlockInputStream extends InputStream
       int numBytesRead;
       try {
         numBytesRead = current.read(b, off, numBytesToRead);
-      } catch (IOException e) {
-        handleReadError(e);
-        continue;
+        retries = 0; // reset retries after successful read
+      } catch (StorageContainerException e) {
+        if (shouldRetryRead(e)) {
+          handleReadError(e);
+          continue;
+        } else {
+          throw e;
+        }
       }
 
       if (numBytesRead != numBytesToRead) {
@@ -460,6 +472,18 @@ public class BlockInputStream extends InputStream
     blockPosition = getPos();
   }
 
+  private boolean shouldRetryRead(IOException cause) throws IOException {
+    RetryPolicy.RetryAction retryAction;
+    try {
+      retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+  }
+
   private void handleReadError(IOException cause) throws IOException {
     releaseClient();
     final List<ChunkInputStream> inputStreams = this.chunkStreams;
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index e1ede97..940caa7 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.storage;
 import com.google.common.primitives.Bytes;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -30,6 +31,8 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +43,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,10 +52,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
 import static 
org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -269,6 +275,77 @@ public class TestBlockInputStream {
   }
 
   @Test
+  public void testRefreshExitsIfPipelineHasSameNodes() throws Exception {
+    // GIVEN
+    BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+
+    final int len = 200;
+    final ChunkInputStream stream = mock(ChunkInputStream.class);
+    when(stream.read(any(), anyInt(), anyInt()))
+        .thenThrow(new StorageContainerException("test", CONTAINER_UNHEALTHY));
+    when(stream.getRemaining())
+        .thenReturn((long) len);
+
+    when(refreshPipeline.apply(blockID))
+        .thenAnswer(invocation -> samePipelineWithNewId(pipeline));
+
+    BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
+        pipeline, null, false, null, refreshPipeline, chunks, null) {
+      @Override
+      protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
+        return stream;
+      }
+    };
+    subject.initialize();
+
+    // WHEN
+    byte[] b = new byte[len];
+    LambdaTestUtils.intercept(StorageContainerException.class,
+        () -> subject.read(b, 0, len));
+
+    // THEN
+    verify(refreshPipeline).apply(blockID);
+  }
+
+  @Test
+  public void testReadNotRetriedOnOtherException() throws Exception {
+    // GIVEN
+    BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+
+    final int len = 200;
+    final ChunkInputStream stream = mock(ChunkInputStream.class);
+    when(stream.read(any(), anyInt(), anyInt()))
+        .thenThrow(new OzoneChecksumException("checksum missing"));
+    when(stream.getRemaining())
+        .thenReturn((long) len);
+
+    BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
+        pipeline, null, false, null, refreshPipeline, chunks, null) {
+      @Override
+      protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
+        return stream;
+      }
+    };
+    subject.initialize();
+
+    // WHEN
+    byte[] b = new byte[len];
+    LambdaTestUtils.intercept(OzoneChecksumException.class,
+        () -> subject.read(b, 0, len));
+
+    // THEN
+    verify(refreshPipeline, never()).apply(blockID);
+  }
+
+  private Pipeline samePipelineWithNewId(Pipeline pipeline) {
+    List<DatanodeDetails> reverseOrder = new ArrayList<>(pipeline.getNodes());
+    Collections.reverse(reverseOrder);
+    return MockPipeline.createPipeline(reverseOrder);
+  }
+
+  @Test
   public void testRefreshOnReadFailureAfterUnbuffer() throws Exception {
     // GIVEN
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
diff --git a/hadoop-hdds/client/src/test/resources/log4j.properties 
b/hadoop-hdds/client/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bb5cbe5
--- /dev/null
+++ b/hadoop-hdds/client/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=INFO,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 9c9406b..2cb352d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -58,7 +58,6 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
-import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
 import static org.junit.Assert.fail;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -465,21 +464,7 @@ public class TestKeyInputStream {
         keyInputStream.unbuffer();
       }
 
-      // stop one node, wait for container to be replicated to another one
       cluster.shutdownHddsDatanode(pipelineNodes.get(0));
-      waitForNodeToBecomeDead(pipelineNodes.get(0));
-      waitForReplicaCount(containerID, 2, cluster);
-      waitForReplicaCount(containerID, 3, cluster);
-
-      // avoid polluting the logs
-      cluster.getStorageContainerManager().getReplicationManager().stop();
-
-      // stop original pipeline's remaining nodes
-      cluster.shutdownHddsDatanode(pipelineNodes.get(1));
-      cluster.shutdownHddsDatanode(pipelineNodes.get(2));
-
-      // now only the new node has the container
-      waitForReplicaCount(containerID, 1, cluster);
 
       // check that we can still read it
       assertReadFully(data, keyInputStream, dataLength - 1, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to