This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 1cf9e95eef HDDS-10427. Retry read wait based on policy. (#6292)
1cf9e95eef is described below

commit 1cf9e95eef51b03cecbc3b956e968f6608c33a4c
Author: Ashish Kumar <[email protected]>
AuthorDate: Mon Mar 4 09:20:17 2024 +0530

    HDDS-10427. Retry read wait based on policy. (#6292)
---
 .../apache/hadoop/hdds/scm/storage/BlockInputStream.java  | 15 ++++++++++++++-
 .../hadoop/hdds/scm/storage/TestBlockInputStream.java     |  7 +++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index d06b1816dc..2c30abd3d5 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -574,7 +574,20 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     } catch (Exception e) {
       throw new IOException(e);
     }
-    return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+    if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+      if (retryAction.delayMillis > 0) {
+        try {
+          LOG.debug("Retry read after {}ms", retryAction.delayMillis);
+          Thread.sleep(retryAction.delayMillis);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          String msg = "Interrupted: action=" + retryAction.action + ", retry 
policy=" + retryPolicy;
+          throw new IOException(msg, e);
+        }
+      }
+      return true;
+    }
+    return false;
   }
 
   private void handleReadError(IOException cause) throws IOException {
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 4db569b7c0..f755817816 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.ozone.common.Checksum;
 
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.StatusException;
 import org.junit.jupiter.api.BeforeEach;
@@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.stubbing.OngoingStubbing;
+import org.slf4j.event.Level;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -58,6 +60,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
 import static 
org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -257,6 +260,9 @@ public class TestBlockInputStream {
 
   @Test
   public void testRefreshPipelineFunction() throws Exception {
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(BlockInputStream.LOG);
+    GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
     AtomicBoolean isRefreshed = new AtomicBoolean();
     createChunkList(5);
@@ -269,6 +275,7 @@ public class TestBlockInputStream {
       seekAndVerify(50);
       byte[] b = new byte[200];
       blockInputStreamWithRetry.read(b, 0, 200);
+      assertThat(logCapturer.getOutput()).contains("Retry read after");
       assertTrue(isRefreshed.get());
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to