This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 984cf66 HDDS-4667. BlockInputStream should give up read retry if
pipeline is not updated (#1774)
984cf66 is described below
commit 984cf668a6f3e89dfbba659414de5749390723a6
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed Jan 13 07:02:58 2021 +0100
HDDS-4667. BlockInputStream should give up read retry if pipeline is not
updated (#1774)
---
hadoop-hdds/client/pom.xml | 5 ++
.../hadoop/hdds/scm/storage/BlockInputStream.java | 32 +++++++--
.../hdds/scm/storage/TestBlockInputStream.java | 77 ++++++++++++++++++++++
.../client/src/test/resources/log4j.properties | 23 +++++++
.../ozone/client/rpc/TestKeyInputStream.java | 15 -----
5 files changed, 133 insertions(+), 19 deletions(-)
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index e1b51e8..e3b0824 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -57,5 +57,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 8cb97c1..a5f3091 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.fs.CanUnbuffer;
@@ -35,9 +36,12 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockRe
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
@@ -66,6 +70,9 @@ public class BlockInputStream extends InputStream
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
+ private final RetryPolicy retryPolicy =
+ HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
+ private int retries;
// List of ChunkInputStreams, one for each chunk in the block
private List<ChunkInputStream> chunkStreams;
@@ -168,7 +175,7 @@ public class BlockInputStream extends InputStream
if (refreshPipelineFunction != null) {
LOG.debug("Re-fetching pipeline for block {}", blockID);
Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
- if (newPipeline == null || newPipeline.equals(pipeline)) {
+ if (newPipeline == null || newPipeline.sameDatanodes(pipeline)) {
LOG.warn("No new pipeline for block {}", blockID);
throw cause;
} else {
@@ -286,9 +293,14 @@ public class BlockInputStream extends InputStream
int numBytesRead;
try {
numBytesRead = current.read(b, off, numBytesToRead);
- } catch (IOException e) {
- handleReadError(e);
- continue;
+ retries = 0; // reset retries after successful read
+ } catch (StorageContainerException e) {
+ if (shouldRetryRead(e)) {
+ handleReadError(e);
+ continue;
+ } else {
+ throw e;
+ }
}
if (numBytesRead != numBytesToRead) {
@@ -460,6 +472,18 @@ public class BlockInputStream extends InputStream
blockPosition = getPos();
}
+ private boolean shouldRetryRead(IOException cause) throws IOException {
+ RetryPolicy.RetryAction retryAction;
+ try {
+ retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+ }
+
private void handleReadError(IOException cause) throws IOException {
releaseClient();
final List<ChunkInputStream> inputStreams = this.chunkStreams;
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index e1ede97..940caa7 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.storage;
import com.google.common.primitives.Bytes;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -30,6 +31,8 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +43,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,10 +52,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
import static
org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -269,6 +275,77 @@ public class TestBlockInputStream {
}
@Test
+ public void testRefreshExitsIfPipelineHasSameNodes() throws Exception {
+ // GIVEN
+ BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+ Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+
+ final int len = 200;
+ final ChunkInputStream stream = mock(ChunkInputStream.class);
+ when(stream.read(any(), anyInt(), anyInt()))
+ .thenThrow(new StorageContainerException("test", CONTAINER_UNHEALTHY));
+ when(stream.getRemaining())
+ .thenReturn((long) len);
+
+ when(refreshPipeline.apply(blockID))
+ .thenAnswer(invocation -> samePipelineWithNewId(pipeline));
+
+ BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
+ pipeline, null, false, null, refreshPipeline, chunks, null) {
+ @Override
+ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
+ return stream;
+ }
+ };
+ subject.initialize();
+
+ // WHEN
+ byte[] b = new byte[len];
+ LambdaTestUtils.intercept(StorageContainerException.class,
+ () -> subject.read(b, 0, len));
+
+ // THEN
+ verify(refreshPipeline).apply(blockID);
+ }
+
+ @Test
+ public void testReadNotRetriedOnOtherException() throws Exception {
+ // GIVEN
+ BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+ Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+
+ final int len = 200;
+ final ChunkInputStream stream = mock(ChunkInputStream.class);
+ when(stream.read(any(), anyInt(), anyInt()))
+ .thenThrow(new OzoneChecksumException("checksum missing"));
+ when(stream.getRemaining())
+ .thenReturn((long) len);
+
+ BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
+ pipeline, null, false, null, refreshPipeline, chunks, null) {
+ @Override
+ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
+ return stream;
+ }
+ };
+ subject.initialize();
+
+ // WHEN
+ byte[] b = new byte[len];
+ LambdaTestUtils.intercept(OzoneChecksumException.class,
+ () -> subject.read(b, 0, len));
+
+ // THEN
+ verify(refreshPipeline, never()).apply(blockID);
+ }
+
+ private Pipeline samePipelineWithNewId(Pipeline pipeline) {
+ List<DatanodeDetails> reverseOrder = new ArrayList<>(pipeline.getNodes());
+ Collections.reverse(reverseOrder);
+ return MockPipeline.createPipeline(reverseOrder);
+ }
+
+ @Test
public void testRefreshOnReadFailureAfterUnbuffer() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
diff --git a/hadoop-hdds/client/src/test/resources/log4j.properties
b/hadoop-hdds/client/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bb5cbe5
--- /dev/null
+++ b/hadoop-hdds/client/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=INFO,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2}
(%F:%M(%L)) - %m%n
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 9c9406b..2cb352d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -58,7 +58,6 @@ import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
-import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
import static org.junit.Assert.fail;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -465,21 +464,7 @@ public class TestKeyInputStream {
keyInputStream.unbuffer();
}
- // stop one node, wait for container to be replicated to another one
cluster.shutdownHddsDatanode(pipelineNodes.get(0));
- waitForNodeToBecomeDead(pipelineNodes.get(0));
- waitForReplicaCount(containerID, 2, cluster);
- waitForReplicaCount(containerID, 3, cluster);
-
- // avoid polluting the logs
- cluster.getStorageContainerManager().getReplicationManager().stop();
-
- // stop original pipeline's remaining nodes
- cluster.shutdownHddsDatanode(pipelineNodes.get(1));
- cluster.shutdownHddsDatanode(pipelineNodes.get(2));
-
- // now only the new node has the container
- waitForReplicaCount(containerID, 1, cluster);
// check that we can still read it
assertReadFully(data, keyInputStream, dataLength - 1, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]