This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 1cf9e95eef HDDS-10427. Retry read wait based on policy. (#6292)
1cf9e95eef is described below
commit 1cf9e95eef51b03cecbc3b956e968f6608c33a4c
Author: Ashish Kumar <[email protected]>
AuthorDate: Mon Mar 4 09:20:17 2024 +0530
HDDS-10427. Retry read wait based on policy. (#6292)
---
.../apache/hadoop/hdds/scm/storage/BlockInputStream.java | 15 ++++++++++++++-
.../hadoop/hdds/scm/storage/TestBlockInputStream.java | 7 +++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index d06b1816dc..2c30abd3d5 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -574,7 +574,20 @@ public class BlockInputStream extends
BlockExtendedInputStream {
} catch (Exception e) {
throw new IOException(e);
}
- return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+ if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+ if (retryAction.delayMillis > 0) {
+ try {
+ LOG.debug("Retry read after {}ms", retryAction.delayMillis);
+ Thread.sleep(retryAction.delayMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ String msg = "Interrupted: action=" + retryAction.action + ", retry
policy=" + retryPolicy;
+ throw new IOException(msg, e);
+ }
+ }
+ return true;
+ }
+ return false;
}
private void handleReadError(IOException cause) throws IOException {
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 4db569b7c0..f755817816 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -34,6 +34,7 @@ import
org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusException;
import org.junit.jupiter.api.BeforeEach;
@@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.stubbing.OngoingStubbing;
+import org.slf4j.event.Level;
import java.io.EOFException;
import java.io.IOException;
@@ -58,6 +60,7 @@ import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static
org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -257,6 +260,9 @@ public class TestBlockInputStream {
@Test
public void testRefreshPipelineFunction() throws Exception {
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(BlockInputStream.LOG);
+ GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
AtomicBoolean isRefreshed = new AtomicBoolean();
createChunkList(5);
@@ -269,6 +275,7 @@ public class TestBlockInputStream {
seekAndVerify(50);
byte[] b = new byte[200];
blockInputStreamWithRetry.read(b, 0, 200);
+ assertThat(logCapturer.getOutput()).contains("Retry read after");
assertTrue(isRefreshed.get());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]