This is an automated email from the ASF dual-hosted git repository. guohao1225 pushed a commit to branch master-batch-allocateblock in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 058231d59039aa5b44eebd05fbc6e9444f892a3f Author: guohao1 <[email protected]> AuthorDate: Fri Nov 8 19:45:16 2024 +0800 111 --- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 36 +++++++ .../scm/pipeline/WritableContainerFactory.java | 19 ++++ .../scm/pipeline/WritableContainerProvider.java | 5 + .../scm/pipeline/WritableECContainerProvider.java | 13 +++ .../pipeline/WritableRatisContainerProvider.java | 118 +++++++++++++++------ 5 files changed, 161 insertions(+), 30 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 5f42fb00e4..8c49e7bb4e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -174,6 +174,42 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { return null; } + public List<AllocatedBlock> allocateBlocks(final long size,int num, + ReplicationConfig replicationConfig, String owner, + ExcludeList excludeList) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Size : {} , replicationConfig: {}", size, replicationConfig); + } + if (scm.getScmContext().isInSafeMode()) { + throw new SCMException("SafeModePrecheck failed for allocateBlock", + SCMException.ResultCodes.SAFE_MODE_EXCEPTION); + } + if (size < 0 || size > containerSize) { + LOG.warn("Invalid block size requested : {}", size); + throw new SCMException("Unsupported block size: " + size, + INVALID_BLOCK_SIZE); + } + + List<ContainerInfo> containers = writableContainerFactory.getContainers( + size,num, replicationConfig, owner, excludeList); + + if (containers != null && !containers.isEmpty()) { + List<AllocatedBlock> blocks = new ArrayList<>(); + for (ContainerInfo container : containers) { + AllocatedBlock allocatedBlock = newBlock(container); + blocks.add(allocatedBlock); + } + return blocks; + } + // we have tried all strategies we know and but somehow we are not able + // to get a container for this block. Log that info and return a null. + LOG.error( + "Unable to allocate a block for the size: {}, replicationConfig: {}", + size, replicationConfig); + return null; + } + /** * newBlock - returns a new block assigned to a container. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java index 81189538b5..78eac214c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableE import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import java.io.IOException; +import java.util.List; import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; @@ -81,6 +82,24 @@ public class WritableContainerFactory { } } + public List<ContainerInfo> getContainers(final long size, int num, + ReplicationConfig repConfig, String owner, ExcludeList excludeList) + throws IOException { + switch (repConfig.getReplicationType()) { + case STAND_ALONE: + return standaloneProvider + .getContainers(size, num, repConfig, owner, excludeList); + case RATIS: + return ratisProvider.getContainers(size, num, repConfig, owner, excludeList); + case EC: + return ecProvider.getContainers(size, num, (ECReplicationConfig) repConfig, + owner, excludeList); + default: + throw new IOException(repConfig.getReplicationType() + + " is an invalid replication type"); + } + } + private long getConfiguredContainerSize(ConfigurationSource conf) { return (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java index 628aba629a..a7635e6a8b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import java.io.IOException; +import java.util.List; /** * Interface used by the WritableContainerFactory to obtain a writable container @@ -55,4 +56,8 @@ public interface WritableContainerProvider<T extends ReplicationConfig> { String owner, ExcludeList excludeList) throws IOException; + List<ContainerInfo> getContainers(long size, int num, + T repConfig, String owner, ExcludeList excludeList) + throws IOException; + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java index b528a30197..36944ecaa9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java @@ -184,6 +184,19 @@ public class WritableECContainerProvider } } + @Override + public List<ContainerInfo> getContainers(long size, int num, + ECReplicationConfig repConfig, String owner, ExcludeList excludeList) + throws IOException { + List<ContainerInfo> containers = new ArrayList<>(); + for (int i = 0; i < num; i++) { + ContainerInfo container = getContainer(size, repConfig, owner, + excludeList); + containers.add(container); + } + return containers; + } + private int getMaximumPipelines(ECReplicationConfig repConfig) { final double factor = providerConfig.getPipelinePerVolumeFactor(); int volumeBasedCount = 0; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java index 99a58f690c..455a80a144 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import jakarta.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -80,8 +81,6 @@ public class WritableRatisContainerProvider So we can use different kind of policies. */ - String failureReason = null; - //TODO we need to continue the refactor to use repConfig everywhere //in downstream managers. @@ -94,6 +93,27 @@ public class WritableRatisContainerProvider return containerInfo; } + String failureReason = createPipeline(repConfig, excludeList); + + // If Exception occurred or successful creation of pipeline do one + // final try to fetch pipelines. + containerInfo = getContainer(repConfig, owner, excludeList, req); + if (containerInfo != null) { + return containerInfo; + } + + // we have tried all strategies we know but somehow we are not able + // to get a container for this block. Log that info and throw an exception. + LOG.error( + "Unable to allocate a block for the size: {}, repConfig: {}", + size, repConfig); + throw new IOException( + "Unable to allocate a container to the block of size: " + size + + ", replicationConfig: " + repConfig + ". " + failureReason); + } + + private String createPipeline(ReplicationConfig repConfig, ExcludeList excludeList) { + String failureReason = null; try { // TODO: #CLUTIL Remove creation logic when all replication types // and factors are handled by pipeline creator @@ -104,23 +124,23 @@ public class WritableRatisContainerProvider } catch (SCMException se) { LOG.warn("Pipeline creation failed for repConfig {} " + - "Datanodes may be used up. Try to see if any pipeline is in " + + "Datanodes may be used up. Try to see if any pipeline is in " + "ALLOCATED state, and then will wait for it to be OPEN", - repConfig, se); + repConfig, se); List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig, - excludeList, - Pipeline.PipelineState.ALLOCATED); + excludeList, + Pipeline.PipelineState.ALLOCATED); if (!allocatedPipelines.isEmpty()) { List<PipelineID> allocatedPipelineIDs = - allocatedPipelines.stream() - .map(p -> p.getId()) - .collect(Collectors.toList()); + allocatedPipelines.stream() + .map(p -> p.getId()) + .collect(Collectors.toList()); try { pipelineManager - .waitOnePipelineReady(allocatedPipelineIDs, 0); + .waitOnePipelineReady(allocatedPipelineIDs, 0); } catch (IOException e) { LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ", - allocatedPipelineIDs, e); + allocatedPipelineIDs, e); failureReason = "Waiting for one of pipelines to be OPEN failed. " + e.getMessage(); } @@ -132,38 +152,76 @@ public class WritableRatisContainerProvider + "Retrying get pipelines call once.", repConfig, e); failureReason = e.getMessage(); } + return failureReason; + } - // If Exception occurred or successful creation of pipeline do one - // final try to fetch pipelines. - containerInfo = getContainer(repConfig, owner, excludeList, req); - if (containerInfo != null) { - return containerInfo; - } + @Override + public List<ContainerInfo> getContainers(long size, int num, + ReplicationConfig repConfig, String owner, ExcludeList excludeList) + throws IOException { + PipelineRequestInformation req = + PipelineRequestInformation.Builder.getBuilder().setSize(size).build(); - // we have tried all strategies we know but somehow we are not able - // to get a container for this block. Log that info and throw an exception. - LOG.error( - "Unable to allocate a block for the size: {}, repConfig: {}", - size, repConfig); - throw new IOException( - "Unable to allocate a container to the block of size: " + size - + ", replicationConfig: " + repConfig + ". " + failureReason); + List<ContainerInfo> containers = getContainers(num, repConfig, owner, + excludeList, req); + int need = num - containers.size(); + if (need > 0) { + String failureReason = createPipeline(repConfig, excludeList); + List<ContainerInfo> needContainers = getContainers(need, + repConfig, owner, excludeList, req); + if (needContainers.size() != need) { + // we have tried all strategies we know but somehow we are not able + // to get a container for this block. Log that info and throw an exception. + LOG.error( + "Unable to allocate a block for the size: {}, repConfig: {}", + size, repConfig); + throw new IOException( + "Unable to allocate a container to the block of size: " + size + + ", replicationConfig: " + repConfig + ". " + failureReason); + } + containers.addAll(needContainers); + } + return containers; } - @Nullable - private ContainerInfo getContainer(ReplicationConfig repConfig, String owner, - ExcludeList excludeList, PipelineRequestInformation req) { + private List<Pipeline> getAvailablePipelines(ReplicationConfig repConfig, + ExcludeList excludeList) { // Acquire pipeline manager lock, to avoid any updates to pipeline // while allocate container happens. This is to avoid scenario like // mentioned in HDDS-5655. + List<Pipeline> availablePipelines; pipelineManager.acquireReadLock(); try { - List<Pipeline> availablePipelines = findPipelinesByState(repConfig, + availablePipelines = findPipelinesByState(repConfig, excludeList, Pipeline.PipelineState.OPEN); - return selectContainer(availablePipelines, req, owner, excludeList); } finally { pipelineManager.releaseReadLock(); } + return availablePipelines; + } + + @Nullable + private ContainerInfo getContainer(ReplicationConfig repConfig, String owner, + ExcludeList excludeList, PipelineRequestInformation req) { + List<Pipeline> availablePipelines = getAvailablePipelines(repConfig, + excludeList); + return selectContainer(availablePipelines, req, owner, excludeList); + } + + private List<ContainerInfo> getContainers(int num, + ReplicationConfig repConfig, String owner, ExcludeList excludeList, + PipelineRequestInformation req) { + List<ContainerInfo> containers = new ArrayList<>(); + List<Pipeline> availablePipelines = getAvailablePipelines(repConfig, + excludeList); + for (int i = 0; i < num; i++) { + ContainerInfo containerInfo = selectContainer(availablePipelines, req, + owner, excludeList); + if (containerInfo != null) { + containers.add(containerInfo); + } + } + return containers; } private List<Pipeline> findPipelinesByState( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
