This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b1588e82b5 HDDS-9709. Avoid empty pipelines in the container pipeline 
cache. (#5625)
b1588e82b5 is described below

commit b1588e82b586a143d59c5e94fee28f818db287a8
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Dec 4 09:25:26 2023 -0800

    HDDS-9709. Avoid empty pipelines in the container pipeline cache. (#5625)
---
 .../ozone/om/TestOmContainerLocationCache.java     | 89 +++++++++++++++++++++-
 .../java/org/apache/hadoop/ozone/om/ScmClient.java | 10 ++-
 2 files changed, 96 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 2c9506d530..c4948d404b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -102,7 +102,9 @@ import java.util.stream.Stream;
 import static com.google.common.collect.Sets.newHashSet;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
@@ -196,6 +198,11 @@ public class TestOmContainerLocationCache {
     mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf));
     mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf));
     XceiverClientManager manager = mock(XceiverClientManager.class);
+    when(manager.acquireClient(argThat(matchEmptyPipeline())))
+        .thenCallRealMethod();
+    when(manager.acquireClientForReadData(argThat(matchEmptyPipeline())))
+        .thenCallRealMethod();
+
     when(manager.acquireClient(argThat(matchPipeline(DN1))))
         .thenReturn(mockDn1Protocol);
     when(manager.acquireClientForReadData(argThat(matchPipeline(DN1))))
@@ -208,8 +215,14 @@ public class TestOmContainerLocationCache {
     return manager;
   }
 
-  private static ArgumentMatcher<Pipeline> matchPipeline(DatanodeDetails dn) {
+  private static ArgumentMatcher<Pipeline> matchEmptyPipeline() {
     return argument -> argument != null
+        && argument.getNodes().isEmpty();
+  }
+
+
+  private static ArgumentMatcher<Pipeline> matchPipeline(DatanodeDetails dn) {
+    return argument -> argument != null && !argument.getNodes().isEmpty()
         && argument.getNodes().get(0).getUuid().equals(dn.getUuid());
   }
 
@@ -500,6 +513,60 @@ public class TestOmContainerLocationCache {
         .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
   }
 
+  /**
+   * Verify that in situation that SCM returns empty pipelines (that prevents
+   * clients from reading data), the empty pipelines are not cached and
+   * subsequent key reads re-fetch container data from SCM.
+   */
+  @Test
+  public void containerRefreshedOnEmptyPipelines() throws Exception {
+    byte[] data = "Test content".getBytes(UTF_8);
+
+    mockScmAllocationOnDn1(CONTAINER_ID.get(), 1L);
+    mockWriteChunkResponse(mockDn1Protocol);
+    mockPutBlockResponse(mockDn1Protocol, CONTAINER_ID.get(), 1L, data);
+
+    OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME)
+        .getBucket(BUCKET_NAME);
+
+    String keyName = "key";
+    try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) {
+      IOUtils.write(data, os);
+    }
+
+    // All datanodes go down and scm returns empty pipeline for the container.
+    mockScmGetContainerPipelineEmpty(CONTAINER_ID.get());
+
+    OzoneKeyDetails key1 = bucket.getKey(keyName);
+
+    verify(mockScmContainerClient, times(1))
+        .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+
+    // verify that the effort to read will result in a NO_REPLICA_FOUND error.
+    Exception ex =
+        assertThrows(IllegalArgumentException.class, () -> {
+          try (InputStream is = key1.getContent()) {
+            IOUtils.read(is, new byte[(int) key1.getDataSize()]);
+          }
+        });
+    assertEquals(NO_REPLICA_FOUND.toString(), ex.getMessage());
+
+    // but the empty pipeline is not cached, and when some data node is back.
+    mockScmGetContainerPipeline(CONTAINER_ID.get(), DN1);
+    mockGetBlock(mockDn1Protocol, CONTAINER_ID.get(), 1L, data, null, null);
+    mockReadChunk(mockDn1Protocol, CONTAINER_ID.get(), 1L, data, null, null);
+    // the subsequent effort to read the key is success.
+    OzoneKeyDetails updatedKey1 = bucket.getKey(keyName);
+    try (InputStream is = updatedKey1.getContent()) {
+      byte[] read = new byte[(int) key1.getDataSize()];
+      IOUtils.read(is, read);
+      Assertions.assertArrayEquals(data, read);
+    }
+    // verify SCM is called one more time to refetch the container pipeline..
+    verify(mockScmContainerClient, times(2))
+        .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+  }
+
   private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol,
                                     long containerId, long localId,
                                     byte[] data)
@@ -578,6 +645,20 @@ public class TestOmContainerLocationCache {
         newHashSet(containerId))).thenReturn(containerWithPipelines);
   }
 
+  private void mockScmGetContainerPipelineEmpty(long containerId)
+      throws IOException {
+    Pipeline pipeline = createPipeline(Collections.emptyList());
+    ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setContainerID(containerId)
+        .setPipelineID(pipeline.getId()).build();
+    List<ContainerWithPipeline> containerWithPipelines =
+        Collections.singletonList(
+            new ContainerWithPipeline(containerInfo, pipeline));
+
+    when(mockScmContainerClient.getContainerWithPipelineBatch(
+        newHashSet(containerId))).thenReturn(containerWithPipelines);
+  }
+
   private void mockGetBlock(XceiverClientGrpc mockDnProtocol,
                             long containerId, long localId,
                             byte[] data,
@@ -664,12 +745,16 @@ public class TestOmContainerLocationCache {
   }
 
   private static Pipeline createPipeline(DatanodeDetails dn) {
+    return createPipeline(Collections.singletonList(dn));
+  }
+
+  private static Pipeline createPipeline(List<DatanodeDetails> nodes) {
     return Pipeline.newBuilder()
         .setState(Pipeline.PipelineState.OPEN)
         .setId(PipelineID.randomId())
         .setReplicationConfig(
             RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
-        .setNodes(Collections.singletonList(dn))
+        .setNodes(nodes)
         .build();
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index 0718c89c42..3a15f2e8d5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.NotNull;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -111,7 +112,14 @@ public class ScmClient {
       containerLocationCache.invalidateAll(containerIds);
     }
     try {
-      return containerLocationCache.getAll(containerIds);
+      Map<Long, Pipeline> result = containerLocationCache.getAll(containerIds);
+      // Don't keep empty pipelines in the cache.
+      List<Long> emptyPipelines = result.entrySet().stream()
+          .filter(e -> e.getValue().isEmpty())
+          .map(Map.Entry::getKey)
+          .collect(Collectors.toList());
+      containerLocationCache.invalidateAll(emptyPipelines);
+      return result;
     } catch (ExecutionException e) {
       return handleCacheExecutionException(e);
     } catch (InvalidCacheLoadException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to