This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b1588e82b5 HDDS-9709. Avoid empty pipelines in the container pipeline
cache. (#5625)
b1588e82b5 is described below
commit b1588e82b586a143d59c5e94fee28f818db287a8
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Dec 4 09:25:26 2023 -0800
HDDS-9709. Avoid empty pipelines in the container pipeline cache. (#5625)
---
.../ozone/om/TestOmContainerLocationCache.java | 89 +++++++++++++++++++++-
.../java/org/apache/hadoop/ozone/om/ScmClient.java | 10 ++-
2 files changed, 96 insertions(+), 3 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 2c9506d530..c4948d404b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -102,7 +102,9 @@ import java.util.stream.Stream;
import static com.google.common.collect.Sets.newHashSet;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
@@ -196,6 +198,11 @@ public class TestOmContainerLocationCache {
mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf));
mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf));
XceiverClientManager manager = mock(XceiverClientManager.class);
+ when(manager.acquireClient(argThat(matchEmptyPipeline())))
+ .thenCallRealMethod();
+ when(manager.acquireClientForReadData(argThat(matchEmptyPipeline())))
+ .thenCallRealMethod();
+
when(manager.acquireClient(argThat(matchPipeline(DN1))))
.thenReturn(mockDn1Protocol);
when(manager.acquireClientForReadData(argThat(matchPipeline(DN1))))
@@ -208,8 +215,14 @@ public class TestOmContainerLocationCache {
return manager;
}
- private static ArgumentMatcher<Pipeline> matchPipeline(DatanodeDetails dn) {
+ private static ArgumentMatcher<Pipeline> matchEmptyPipeline() {
return argument -> argument != null
+ && argument.getNodes().isEmpty();
+ }
+
+
+ private static ArgumentMatcher<Pipeline> matchPipeline(DatanodeDetails dn) {
+ return argument -> argument != null && !argument.getNodes().isEmpty()
&& argument.getNodes().get(0).getUuid().equals(dn.getUuid());
}
@@ -500,6 +513,60 @@ public class TestOmContainerLocationCache {
.getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
}
+ /**
+ * Verify that in situation that SCM returns empty pipelines (that prevents
+ * clients from reading data), the empty pipelines are not cached and
+ * subsequent key reads re-fetch container data from SCM.
+ */
+ @Test
+ public void containerRefreshedOnEmptyPipelines() throws Exception {
+ byte[] data = "Test content".getBytes(UTF_8);
+
+ mockScmAllocationOnDn1(CONTAINER_ID.get(), 1L);
+ mockWriteChunkResponse(mockDn1Protocol);
+ mockPutBlockResponse(mockDn1Protocol, CONTAINER_ID.get(), 1L, data);
+
+ OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME)
+ .getBucket(BUCKET_NAME);
+
+ String keyName = "key";
+ try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) {
+ IOUtils.write(data, os);
+ }
+
+ // All datanodes go down and scm returns empty pipeline for the container.
+ mockScmGetContainerPipelineEmpty(CONTAINER_ID.get());
+
+ OzoneKeyDetails key1 = bucket.getKey(keyName);
+
+ verify(mockScmContainerClient, times(1))
+ .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+
+ // verify that the effort to read will result in a NO_REPLICA_FOUND error.
+ Exception ex =
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (InputStream is = key1.getContent()) {
+ IOUtils.read(is, new byte[(int) key1.getDataSize()]);
+ }
+ });
+ assertEquals(NO_REPLICA_FOUND.toString(), ex.getMessage());
+
+ // but the empty pipeline is not cached, and when some data node is back.
+ mockScmGetContainerPipeline(CONTAINER_ID.get(), DN1);
+ mockGetBlock(mockDn1Protocol, CONTAINER_ID.get(), 1L, data, null, null);
+ mockReadChunk(mockDn1Protocol, CONTAINER_ID.get(), 1L, data, null, null);
+ // the subsequent effort to read the key is success.
+ OzoneKeyDetails updatedKey1 = bucket.getKey(keyName);
+ try (InputStream is = updatedKey1.getContent()) {
+ byte[] read = new byte[(int) key1.getDataSize()];
+ IOUtils.read(is, read);
+ Assertions.assertArrayEquals(data, read);
+ }
+ // verify SCM is called one more time to refetch the container pipeline..
+ verify(mockScmContainerClient, times(2))
+ .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+ }
+
private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol,
long containerId, long localId,
byte[] data)
@@ -578,6 +645,20 @@ public class TestOmContainerLocationCache {
newHashSet(containerId))).thenReturn(containerWithPipelines);
}
+ private void mockScmGetContainerPipelineEmpty(long containerId)
+ throws IOException {
+ Pipeline pipeline = createPipeline(Collections.emptyList());
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
+ .setContainerID(containerId)
+ .setPipelineID(pipeline.getId()).build();
+ List<ContainerWithPipeline> containerWithPipelines =
+ Collections.singletonList(
+ new ContainerWithPipeline(containerInfo, pipeline));
+
+ when(mockScmContainerClient.getContainerWithPipelineBatch(
+ newHashSet(containerId))).thenReturn(containerWithPipelines);
+ }
+
private void mockGetBlock(XceiverClientGrpc mockDnProtocol,
long containerId, long localId,
byte[] data,
@@ -664,12 +745,16 @@ public class TestOmContainerLocationCache {
}
private static Pipeline createPipeline(DatanodeDetails dn) {
+ return createPipeline(Collections.singletonList(dn));
+ }
+
+ private static Pipeline createPipeline(List<DatanodeDetails> nodes) {
return Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
.setReplicationConfig(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
- .setNodes(Collections.singletonList(dn))
+ .setNodes(nodes)
.build();
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index 0718c89c42..3a15f2e8d5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -111,7 +112,14 @@ public class ScmClient {
containerLocationCache.invalidateAll(containerIds);
}
try {
- return containerLocationCache.getAll(containerIds);
+ Map<Long, Pipeline> result = containerLocationCache.getAll(containerIds);
+ // Don't keep empty pipelines in the cache.
+ List<Long> emptyPipelines = result.entrySet().stream()
+ .filter(e -> e.getValue().isEmpty())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ containerLocationCache.invalidateAll(emptyPipelines);
+ return result;
} catch (ExecutionException e) {
return handleCacheExecutionException(e);
} catch (InvalidCacheLoadException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]