This is an automated email from the ASF dual-hosted git repository.

siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c81615f7bc HDDS-11209. Avoid insufficient EC pipelines in the 
container pipeline cache (#6974)
c81615f7bc is described below

commit c81615f7bc16a41633c9bf214a68d87e8a250b17
Author: Hongbing Wang <[email protected]>
AuthorDate: Wed Aug 21 17:38:50 2024 +0800

    HDDS-11209. Avoid insufficient EC pipelines in the container pipeline cache 
(#6974)
---
 .../ozone/om/TestOmContainerLocationCache.java     | 110 ++++++++++++++++++++-
 .../java/org/apache/hadoop/ozone/om/ScmClient.java |  27 ++++-
 2 files changed, 132 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index e773bf7ed7..f25bb47f0d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.ozone.om;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -89,8 +91,11 @@ import org.mockito.ArgumentMatcher;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -145,10 +150,17 @@ public class TestOmContainerLocationCache {
   private static ObjectStore objectStore;
   private static XceiverClientGrpc mockDn1Protocol;
   private static XceiverClientGrpc mockDn2Protocol;
+  private static XceiverClientGrpc mockDnEcProtocol;
   private static final DatanodeDetails DN1 =
       MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
   private static final DatanodeDetails DN2 =
       MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
+  private static final DatanodeDetails DN3 =
+      MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
+  private static final DatanodeDetails DN4 =
+      MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
+  private static final DatanodeDetails DN5 =
+      MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
   private static final AtomicLong CONTAINER_ID = new AtomicLong(1);
 
 
@@ -200,6 +212,8 @@ public class TestOmContainerLocationCache {
       throws IOException {
     mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf));
     mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf));
+    mockDnEcProtocol = spy(new XceiverClientGrpc(createEcPipeline(
+        ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5)), conf));
     XceiverClientManager manager = mock(XceiverClientManager.class);
     when(manager.acquireClient(argThat(matchEmptyPipeline())))
         .thenCallRealMethod();
@@ -217,6 +231,11 @@ public class TestOmContainerLocationCache {
         .thenReturn(mockDn2Protocol);
     when(manager.acquireClientForReadData(argThat(matchPipeline(DN2))))
         .thenReturn(mockDn2Protocol);
+
+    when(manager.acquireClient(argThat(matchEcPipeline())))
+        .thenReturn(mockDnEcProtocol);
+    when(manager.acquireClientForReadData(argThat(matchEcPipeline())))
+        .thenReturn(mockDnEcProtocol);
     return manager;
   }
 
@@ -231,6 +250,11 @@ public class TestOmContainerLocationCache {
         && argument.getNodes().get(0).getUuid().equals(dn.getUuid());
   }
 
+  private static ArgumentMatcher<Pipeline> matchEcPipeline() {
+    return argument -> argument != null && !argument.getNodes().isEmpty()
+        && argument.getReplicationConfig() instanceof ECReplicationConfig;
+  }
+
   private static void createBucket(String volumeName, String bucketName,
                                    boolean isVersionEnabled)
       throws IOException {
@@ -256,12 +280,14 @@ public class TestOmContainerLocationCache {
   public void beforeEach() throws IOException {
     CONTAINER_ID.getAndIncrement();
     reset(mockScmBlockLocationProtocol, mockScmContainerClient,
-        mockDn1Protocol, mockDn2Protocol);
+        mockDn1Protocol, mockDn2Protocol, mockDnEcProtocol);
     InnerNode.Factory factory = InnerNodeImpl.FACTORY;
     when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn(
         factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
     when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1));
     when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2));
+    when(mockDnEcProtocol.getPipeline()).thenReturn(createEcPipeline(
+        ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5)));
   }
 
   /**
@@ -575,6 +601,48 @@ public class TestOmContainerLocationCache {
         .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
   }
 
+  @Test
+  public void containerRefreshedOnInsufficientEcPipelines() throws Exception {
+    int chunkSize = 1024 * 1024;
+    int dataBlocks = 3;
+    int parityBlocks = 2;
+    int inputSize = chunkSize * dataBlocks;
+    byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+
+    mockScmAllocationEcPipeline(CONTAINER_ID.get(), 1L);
+    mockWriteChunkResponse(mockDnEcProtocol);
+    mockPutBlockResponse(mockDnEcProtocol, CONTAINER_ID.get(), 1L, null);
+
+    OzoneBucket bucket = 
objectStore.getVolume(VOLUME_NAME).getBucket(BUCKET_NAME);
+
+    String keyName = "ecKey";
+    try (OzoneOutputStream os = bucket.createKey(keyName, inputSize,
+        new ECReplicationConfig(dataBlocks, parityBlocks, 
ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      for (int i = 0; i < dataBlocks; i++) {
+        os.write(inputChunks[i]);
+      }
+    }
+
+    // case1: pipeline replicaIndexes missing some data indexes, should not 
cache
+    mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, 
DN2, 2, DN4, 4));
+    bucket.getKey(keyName);
+    verify(mockScmContainerClient, times(1))
+        .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+    bucket.getKey(keyName);
+    verify(mockScmContainerClient, times(2))
+        .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+
+    // case2: pipeline replicaIndexes contain all data indexes, should cache
+    mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, 
DN2, 2, DN3, 3, DN4, 4));
+    bucket.getKey(keyName);
+    verify(mockScmContainerClient, times(3))
+        .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+    bucket.getKey(keyName);
+    verify(mockScmContainerClient, times(3))
+        .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+  }
+
   private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol,
                                     long containerId, long localId,
                                     byte[] data)
@@ -661,6 +729,22 @@ public class TestOmContainerLocationCache {
         .thenReturn(Collections.singletonList(block));
   }
 
+  private void mockScmAllocationEcPipeline(long containerID, long localId)
+      throws IOException {
+    ContainerBlockID blockId = new ContainerBlockID(containerID, localId);
+    AllocatedBlock block = new AllocatedBlock.Builder()
+        .setPipeline(createEcPipeline(ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, 
DN4, 4, DN5, 5)))
+        .setContainerBlockID(blockId)
+        .build();
+    when(mockScmBlockLocationProtocol
+        .allocateBlock(anyLong(), anyInt(),
+            any(ECReplicationConfig.class),
+            anyString(),
+            any(ExcludeList.class),
+            anyString()))
+        .thenReturn(Collections.singletonList(block));
+  }
+
   private void mockScmGetContainerPipeline(long containerId,
                                            DatanodeDetails dn)
       throws IOException {
@@ -690,6 +774,20 @@ public class TestOmContainerLocationCache {
         newHashSet(containerId))).thenReturn(containerWithPipelines);
   }
 
+  private void mockScmGetContainerEcPipeline(long containerId, 
Map<DatanodeDetails, Integer> indexes)
+      throws IOException {
+    Pipeline pipeline = createEcPipeline(indexes);
+    ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setContainerID(containerId)
+        .setPipelineID(pipeline.getId()).build();
+    List<ContainerWithPipeline> containerWithPipelines =
+        Collections.singletonList(
+            new ContainerWithPipeline(containerInfo, pipeline));
+
+    when(mockScmContainerClient.getContainerWithPipelineBatch(
+        newHashSet(containerId))).thenReturn(containerWithPipelines);
+  }
+
   private void mockGetBlock(XceiverClientGrpc mockDnProtocol,
                             long containerId, long localId,
                             byte[] data,
@@ -788,4 +886,14 @@ public class TestOmContainerLocationCache {
         .setNodes(nodes)
         .build();
   }
+
+  private static Pipeline createEcPipeline(Map<DatanodeDetails, Integer> 
indexes) {
+    return Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .setReplicationConfig(new ECReplicationConfig(3, 2))
+        .setReplicaIndexes(indexes)
+        .setNodes(new ArrayList<>(indexes.keySet()))
+        .build();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index 77ed7f63ad..318decfa70 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -21,6 +21,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
 import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -113,12 +115,29 @@ public class ScmClient {
     }
     try {
       Map<Long, Pipeline> result = containerLocationCache.getAll(containerIds);
-      // Don't keep empty pipelines in the cache.
-      List<Long> emptyPipelines = result.entrySet().stream()
-          .filter(e -> e.getValue().isEmpty())
+      // Don't keep empty pipelines or insufficient EC pipelines in the cache.
+      List<Long> uncachePipelines = result.entrySet().stream()
+          .filter(e -> {
+            Pipeline pipeline = e.getValue();
+            // filter empty pipelines
+            if (pipeline.isEmpty()) {
+              return true;
+            }
+            // filter insufficient EC pipelines which missing any data index
+            ReplicationConfig repConfig = pipeline.getReplicationConfig();
+            if (repConfig instanceof ECReplicationConfig) {
+              int d = ((ECReplicationConfig) repConfig).getData();
+              for (int i = 1; i <= d; i++) {
+                if (!pipeline.getReplicaIndexes().containsValue(i)) {
+                  return true;
+                }
+              }
+            }
+            return false;
+          })
           .map(Map.Entry::getKey)
           .collect(Collectors.toList());
-      containerLocationCache.invalidateAll(emptyPipelines);
+      containerLocationCache.invalidateAll(uncachePipelines);
       return result;
     } catch (ExecutionException e) {
       return handleCacheExecutionException(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to