This is an automated email from the ASF dual-hosted git repository.
siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c81615f7bc HDDS-11209. Avoid insufficient EC pipelines in the
container pipeline cache (#6974)
c81615f7bc is described below
commit c81615f7bc16a41633c9bf214a68d87e8a250b17
Author: Hongbing Wang <[email protected]>
AuthorDate: Wed Aug 21 17:38:50 2024 +0800
HDDS-11209. Avoid insufficient EC pipelines in the container pipeline cache
(#6974)
---
.../ozone/om/TestOmContainerLocationCache.java | 110 ++++++++++++++++++++-
.../java/org/apache/hadoop/ozone/om/ScmClient.java | 27 ++++-
2 files changed, 132 insertions(+), 5 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index e773bf7ed7..f25bb47f0d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.ozone.om;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -89,8 +91,11 @@ import org.mockito.ArgumentMatcher;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -145,10 +150,17 @@ public class TestOmContainerLocationCache {
private static ObjectStore objectStore;
private static XceiverClientGrpc mockDn1Protocol;
private static XceiverClientGrpc mockDn2Protocol;
+ private static XceiverClientGrpc mockDnEcProtocol;
private static final DatanodeDetails DN1 =
MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
private static final DatanodeDetails DN2 =
MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
+ private static final DatanodeDetails DN3 =
+ MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
+ private static final DatanodeDetails DN4 =
+ MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
+ private static final DatanodeDetails DN5 =
+ MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID());
private static final AtomicLong CONTAINER_ID = new AtomicLong(1);
@@ -200,6 +212,8 @@ public class TestOmContainerLocationCache {
throws IOException {
mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf));
mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf));
+ mockDnEcProtocol = spy(new XceiverClientGrpc(createEcPipeline(
+ ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5)), conf));
XceiverClientManager manager = mock(XceiverClientManager.class);
when(manager.acquireClient(argThat(matchEmptyPipeline())))
.thenCallRealMethod();
@@ -217,6 +231,11 @@ public class TestOmContainerLocationCache {
.thenReturn(mockDn2Protocol);
when(manager.acquireClientForReadData(argThat(matchPipeline(DN2))))
.thenReturn(mockDn2Protocol);
+
+ when(manager.acquireClient(argThat(matchEcPipeline())))
+ .thenReturn(mockDnEcProtocol);
+ when(manager.acquireClientForReadData(argThat(matchEcPipeline())))
+ .thenReturn(mockDnEcProtocol);
return manager;
}
@@ -231,6 +250,11 @@ public class TestOmContainerLocationCache {
&& argument.getNodes().get(0).getUuid().equals(dn.getUuid());
}
+ private static ArgumentMatcher<Pipeline> matchEcPipeline() {
+ return argument -> argument != null && !argument.getNodes().isEmpty()
+ && argument.getReplicationConfig() instanceof ECReplicationConfig;
+ }
+
private static void createBucket(String volumeName, String bucketName,
boolean isVersionEnabled)
throws IOException {
@@ -256,12 +280,14 @@ public class TestOmContainerLocationCache {
public void beforeEach() throws IOException {
CONTAINER_ID.getAndIncrement();
reset(mockScmBlockLocationProtocol, mockScmContainerClient,
- mockDn1Protocol, mockDn2Protocol);
+ mockDn1Protocol, mockDn2Protocol, mockDnEcProtocol);
InnerNode.Factory factory = InnerNodeImpl.FACTORY;
when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn(
factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1));
when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1));
when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2));
+ when(mockDnEcProtocol.getPipeline()).thenReturn(createEcPipeline(
+ ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5)));
}
/**
@@ -575,6 +601,48 @@ public class TestOmContainerLocationCache {
.getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
}
+ @Test
+ public void containerRefreshedOnInsufficientEcPipelines() throws Exception {
+ int chunkSize = 1024 * 1024;
+ int dataBlocks = 3;
+ int parityBlocks = 2;
+ int inputSize = chunkSize * dataBlocks;
+ byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+
+ mockScmAllocationEcPipeline(CONTAINER_ID.get(), 1L);
+ mockWriteChunkResponse(mockDnEcProtocol);
+ mockPutBlockResponse(mockDnEcProtocol, CONTAINER_ID.get(), 1L, null);
+
+ OzoneBucket bucket =
objectStore.getVolume(VOLUME_NAME).getBucket(BUCKET_NAME);
+
+ String keyName = "ecKey";
+ try (OzoneOutputStream os = bucket.createKey(keyName, inputSize,
+ new ECReplicationConfig(dataBlocks, parityBlocks,
ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ for (int i = 0; i < dataBlocks; i++) {
+ os.write(inputChunks[i]);
+ }
+ }
+
+ // case1: pipeline replicaIndexes missing some data indexes, should not
cache
+ mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1,
DN2, 2, DN4, 4));
+ bucket.getKey(keyName);
+ verify(mockScmContainerClient, times(1))
+ .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+ bucket.getKey(keyName);
+ verify(mockScmContainerClient, times(2))
+ .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+
+ // case2: pipeline replicaIndexes contain all data indexes, should cache
+ mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1,
DN2, 2, DN3, 3, DN4, 4));
+ bucket.getKey(keyName);
+ verify(mockScmContainerClient, times(3))
+ .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+ bucket.getKey(keyName);
+ verify(mockScmContainerClient, times(3))
+ .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get()));
+ }
+
private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol,
long containerId, long localId,
byte[] data)
@@ -661,6 +729,22 @@ public class TestOmContainerLocationCache {
.thenReturn(Collections.singletonList(block));
}
+ private void mockScmAllocationEcPipeline(long containerID, long localId)
+ throws IOException {
+ ContainerBlockID blockId = new ContainerBlockID(containerID, localId);
+ AllocatedBlock block = new AllocatedBlock.Builder()
+ .setPipeline(createEcPipeline(ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3,
DN4, 4, DN5, 5)))
+ .setContainerBlockID(blockId)
+ .build();
+ when(mockScmBlockLocationProtocol
+ .allocateBlock(anyLong(), anyInt(),
+ any(ECReplicationConfig.class),
+ anyString(),
+ any(ExcludeList.class),
+ anyString()))
+ .thenReturn(Collections.singletonList(block));
+ }
+
private void mockScmGetContainerPipeline(long containerId,
DatanodeDetails dn)
throws IOException {
@@ -690,6 +774,20 @@ public class TestOmContainerLocationCache {
newHashSet(containerId))).thenReturn(containerWithPipelines);
}
+ private void mockScmGetContainerEcPipeline(long containerId,
Map<DatanodeDetails, Integer> indexes)
+ throws IOException {
+ Pipeline pipeline = createEcPipeline(indexes);
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
+ .setContainerID(containerId)
+ .setPipelineID(pipeline.getId()).build();
+ List<ContainerWithPipeline> containerWithPipelines =
+ Collections.singletonList(
+ new ContainerWithPipeline(containerInfo, pipeline));
+
+ when(mockScmContainerClient.getContainerWithPipelineBatch(
+ newHashSet(containerId))).thenReturn(containerWithPipelines);
+ }
+
private void mockGetBlock(XceiverClientGrpc mockDnProtocol,
long containerId, long localId,
byte[] data,
@@ -788,4 +886,14 @@ public class TestOmContainerLocationCache {
.setNodes(nodes)
.build();
}
+
+ private static Pipeline createEcPipeline(Map<DatanodeDetails, Integer>
indexes) {
+ return Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.OPEN)
+ .setId(PipelineID.randomId())
+ .setReplicationConfig(new ECReplicationConfig(3, 2))
+ .setReplicaIndexes(indexes)
+ .setNodes(new ArrayList<>(indexes.keySet()))
+ .build();
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index 77ed7f63ad..318decfa70 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -21,6 +21,8 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -113,12 +115,29 @@ public class ScmClient {
}
try {
Map<Long, Pipeline> result = containerLocationCache.getAll(containerIds);
- // Don't keep empty pipelines in the cache.
- List<Long> emptyPipelines = result.entrySet().stream()
- .filter(e -> e.getValue().isEmpty())
+ // Don't keep empty pipelines or insufficient EC pipelines in the cache.
+ List<Long> uncachePipelines = result.entrySet().stream()
+ .filter(e -> {
+ Pipeline pipeline = e.getValue();
+ // filter empty pipelines
+ if (pipeline.isEmpty()) {
+ return true;
+ }
+ // filter insufficient EC pipelines which missing any data index
+ ReplicationConfig repConfig = pipeline.getReplicationConfig();
+ if (repConfig instanceof ECReplicationConfig) {
+ int d = ((ECReplicationConfig) repConfig).getData();
+ for (int i = 1; i <= d; i++) {
+ if (!pipeline.getReplicaIndexes().containsValue(i)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ })
.map(Map.Entry::getKey)
.collect(Collectors.toList());
- containerLocationCache.invalidateAll(emptyPipelines);
+ containerLocationCache.invalidateAll(uncachePipelines);
return result;
} catch (ExecutionException e) {
return handleCacheExecutionException(e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]