This is an automated email from the ASF dual-hosted git repository.

guohao1225 pushed a commit to branch master-batch-allocateblock
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 058231d59039aa5b44eebd05fbc6e9444f892a3f
Author: guohao1 <[email protected]>
AuthorDate: Fri Nov 8 19:45:16 2024 +0800

    111
---
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |  36 +++++++
 .../scm/pipeline/WritableContainerFactory.java     |  19 ++++
 .../scm/pipeline/WritableContainerProvider.java    |   5 +
 .../scm/pipeline/WritableECContainerProvider.java  |  13 +++
 .../pipeline/WritableRatisContainerProvider.java   | 118 +++++++++++++++------
 5 files changed, 161 insertions(+), 30 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 5f42fb00e4..8c49e7bb4e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -174,6 +174,42 @@ public class BlockManagerImpl implements BlockManager, 
BlockmanagerMXBean {
     return null;
   }
 
+  public List<AllocatedBlock> allocateBlocks(final long size,int num,
+      ReplicationConfig replicationConfig, String owner,
+      ExcludeList excludeList)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Size : {} , replicationConfig: {}", size, replicationConfig);
+    }
+    if (scm.getScmContext().isInSafeMode()) {
+      throw new SCMException("SafeModePrecheck failed for allocateBlock",
+          SCMException.ResultCodes.SAFE_MODE_EXCEPTION);
+    }
+    if (size < 0 || size > containerSize) {
+      LOG.warn("Invalid block size requested : {}", size);
+      throw new SCMException("Unsupported block size: " + size,
+          INVALID_BLOCK_SIZE);
+    }
+
+    List<ContainerInfo> containers = writableContainerFactory.getContainers(
+        size,num, replicationConfig, owner, excludeList);
+
+    if (containers != null && !containers.isEmpty()) {
+      List<AllocatedBlock> blocks = new ArrayList<>();
+      for (ContainerInfo container : containers) {
+        AllocatedBlock allocatedBlock = newBlock(container);
+        blocks.add(allocatedBlock);
+      }
+      return blocks;
+    }
+    // we have tried all strategies we know and but somehow we are not able
+    // to get a container for this block. Log that info and return a null.
+    LOG.error(
+        "Unable to allocate a block for the size: {}, replicationConfig: {}",
+        size, replicationConfig);
+    return null;
+  }
+
   /**
    * newBlock - returns a new block assigned to a container.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
index 81189538b5..78eac214c7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableE
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
@@ -81,6 +82,24 @@ public class WritableContainerFactory {
     }
   }
 
+  public List<ContainerInfo> getContainers(final long size, int num,
+      ReplicationConfig repConfig, String owner, ExcludeList excludeList)
+      throws IOException {
+    switch (repConfig.getReplicationType()) {
+    case STAND_ALONE:
+      return standaloneProvider
+          .getContainers(size, num, repConfig, owner, excludeList);
+    case RATIS:
+      return ratisProvider.getContainers(size, num, repConfig, owner, 
excludeList);
+    case EC:
+      return ecProvider.getContainers(size, num, (ECReplicationConfig) 
repConfig,
+          owner, excludeList);
+    default:
+      throw new IOException(repConfig.getReplicationType()
+          + " is an invalid replication type");
+    }
+  }
+
   private long getConfiguredContainerSize(ConfigurationSource conf) {
     return (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
         OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java
index 628aba629a..a7635e6a8b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Interface used by the WritableContainerFactory to obtain a writable 
container
@@ -55,4 +56,8 @@ public interface WritableContainerProvider<T extends 
ReplicationConfig> {
       String owner, ExcludeList excludeList)
       throws IOException;
 
+  List<ContainerInfo> getContainers(long size, int num,
+      T repConfig, String owner, ExcludeList excludeList)
+      throws IOException;
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
index b528a30197..36944ecaa9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -184,6 +184,19 @@ public class WritableECContainerProvider
     }
   }
 
+  @Override
+  public List<ContainerInfo> getContainers(long size, int num,
+      ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
+      throws IOException {
+    List<ContainerInfo> containers = new ArrayList<>();
+    for (int i = 0; i < num; i++) {
+      ContainerInfo container = getContainer(size, repConfig, owner,
+          excludeList);
+      containers.add(container);
+    }
+    return containers;
+  }
+
   private int getMaximumPipelines(ECReplicationConfig repConfig) {
     final double factor = providerConfig.getPipelinePerVolumeFactor();
     int volumeBasedCount = 0;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
index 99a58f690c..455a80a144 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import jakarta.annotation.Nullable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -80,8 +81,6 @@ public class WritableRatisContainerProvider
       So we can use different kind of policies.
     */
 
-    String failureReason = null;
-
     //TODO we need to continue the refactor to use repConfig everywhere
     //in downstream managers.
 
@@ -94,6 +93,27 @@ public class WritableRatisContainerProvider
       return containerInfo;
     }
 
+    String failureReason = createPipeline(repConfig, excludeList);
+
+    // If Exception occurred or successful creation of pipeline do one
+    // final try to fetch pipelines.
+    containerInfo = getContainer(repConfig, owner, excludeList, req);
+    if (containerInfo != null) {
+      return containerInfo;
+    }
+
+    // we have tried all strategies we know but somehow we are not able
+    // to get a container for this block. Log that info and throw an exception.
+    LOG.error(
+        "Unable to allocate a block for the size: {}, repConfig: {}",
+        size, repConfig);
+    throw new IOException(
+        "Unable to allocate a container to the block of size: " + size
+            + ", replicationConfig: " + repConfig + ". " + failureReason);
+  }
+
+  private String createPipeline(ReplicationConfig repConfig, ExcludeList 
excludeList) {
+    String failureReason = null;
     try {
       // TODO: #CLUTIL Remove creation logic when all replication types
       //  and factors are handled by pipeline creator
@@ -104,23 +124,23 @@ public class WritableRatisContainerProvider
 
     } catch (SCMException se) {
       LOG.warn("Pipeline creation failed for repConfig {} " +
-          "Datanodes may be used up. Try to see if any pipeline is in " +
+              "Datanodes may be used up. Try to see if any pipeline is in " +
               "ALLOCATED state, and then will wait for it to be OPEN",
-              repConfig, se);
+          repConfig, se);
       List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
-              excludeList,
-              Pipeline.PipelineState.ALLOCATED);
+          excludeList,
+          Pipeline.PipelineState.ALLOCATED);
       if (!allocatedPipelines.isEmpty()) {
         List<PipelineID> allocatedPipelineIDs =
-                allocatedPipelines.stream()
-                        .map(p -> p.getId())
-                        .collect(Collectors.toList());
+            allocatedPipelines.stream()
+                .map(p -> p.getId())
+                .collect(Collectors.toList());
         try {
           pipelineManager
-                  .waitOnePipelineReady(allocatedPipelineIDs, 0);
+              .waitOnePipelineReady(allocatedPipelineIDs, 0);
         } catch (IOException e) {
           LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
-                  allocatedPipelineIDs, e);
+              allocatedPipelineIDs, e);
           failureReason = "Waiting for one of pipelines to be OPEN failed. "
               + e.getMessage();
         }
@@ -132,38 +152,76 @@ public class WritableRatisContainerProvider
           + "Retrying get pipelines call once.", repConfig, e);
       failureReason = e.getMessage();
     }
+    return failureReason;
+  }
 
-    // If Exception occurred or successful creation of pipeline do one
-    // final try to fetch pipelines.
-    containerInfo = getContainer(repConfig, owner, excludeList, req);
-    if (containerInfo != null) {
-      return containerInfo;
-    }
+  @Override
+  public List<ContainerInfo> getContainers(long size, int num,
+      ReplicationConfig repConfig, String owner, ExcludeList excludeList)
+      throws IOException {
+    PipelineRequestInformation req =
+        PipelineRequestInformation.Builder.getBuilder().setSize(size).build();
 
-    // we have tried all strategies we know but somehow we are not able
-    // to get a container for this block. Log that info and throw an exception.
-    LOG.error(
-        "Unable to allocate a block for the size: {}, repConfig: {}",
-        size, repConfig);
-    throw new IOException(
-        "Unable to allocate a container to the block of size: " + size
-            + ", replicationConfig: " + repConfig + ". " + failureReason);
+    List<ContainerInfo> containers = getContainers(num, repConfig, owner,
+        excludeList, req);
+    int need = num - containers.size();
+    if (need > 0) {
+      String failureReason = createPipeline(repConfig, excludeList);
+      List<ContainerInfo> needContainers = getContainers(need,
+          repConfig, owner, excludeList, req);
+      if (needContainers.size() != need) {
+        // we have tried all strategies we know but somehow we are not able
+        // to get a container for this block. Log that info and throw an 
exception.
+        LOG.error(
+            "Unable to allocate a block for the size: {}, repConfig: {}",
+            size, repConfig);
+        throw new IOException(
+            "Unable to allocate a container to the block of size: " + size
+                + ", replicationConfig: " + repConfig + ". " + failureReason);
+      }
+      containers.addAll(needContainers);
+    }
+    return containers;
   }
 
-  @Nullable
-  private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
-      ExcludeList excludeList, PipelineRequestInformation req) {
+  private List<Pipeline> getAvailablePipelines(ReplicationConfig repConfig,
+        ExcludeList excludeList) {
     // Acquire pipeline manager lock, to avoid any updates to pipeline
     // while allocate container happens. This is to avoid scenario like
     // mentioned in HDDS-5655.
+    List<Pipeline> availablePipelines;
     pipelineManager.acquireReadLock();
     try {
-      List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
+      availablePipelines = findPipelinesByState(repConfig,
           excludeList, Pipeline.PipelineState.OPEN);
-      return selectContainer(availablePipelines, req, owner, excludeList);
     } finally {
       pipelineManager.releaseReadLock();
     }
+    return availablePipelines;
+  }
+
+  @Nullable
+  private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
+      ExcludeList excludeList, PipelineRequestInformation req) {
+    List<Pipeline> availablePipelines = getAvailablePipelines(repConfig,
+        excludeList);
+    return selectContainer(availablePipelines, req, owner, excludeList);
+  }
+
+  private List<ContainerInfo> getContainers(int num,
+      ReplicationConfig repConfig, String owner, ExcludeList excludeList,
+      PipelineRequestInformation req) {
+    List<ContainerInfo> containers = new ArrayList<>();
+    List<Pipeline> availablePipelines = getAvailablePipelines(repConfig,
+        excludeList);
+    for (int i = 0; i < num; i++) {
+      ContainerInfo containerInfo = selectContainer(availablePipelines, req,
+          owner, excludeList);
+      if (containerInfo != null) {
+        containers.add(containerInfo);
+      }
+    }
+    return containers;
   }
 
   private List<Pipeline> findPipelinesByState(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to