This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7196dbbd22 HDDS-8995. Refactor the datanode BlockDeletingService
(#5047)
7196dbbd22 is described below
commit 7196dbbd22e0f4ea2018e2451289798a42ddb79b
Author: hao guo <[email protected]>
AuthorDate: Fri Aug 25 14:19:54 2023 +0800
HDDS-8995. Refactor the datanode BlockDeletingService (#5047)
---
.../helpers/BlockDeletingServiceMetrics.java | 2 +-
.../container/common/helpers/ContainerUtils.java | 15 +
.../common/impl/BlockDeletingService.java | 290 +++++++++
.../RandomContainerDeletionChoosingPolicy.java | 3 +-
...TopNOrderedContainerDeletionChoosingPolicy.java | 15 +-
.../ContainerDeletionChoosingPolicy.java | 2 +-
.../ContainerDeletionChoosingPolicyTemplate.java | 22 +-
.../background/BlockDeletingService.java | 672 ---------------------
.../statemachine/background/BlockDeletingTask.java | 484 +++++++++++++++
.../ozone/container/ozoneimpl/OzoneContainer.java | 2 +-
.../container/common/TestBlockDeletingService.java | 2 +-
.../impl/TestContainerDeletionChoosingPolicy.java | 7 +-
.../testutils/BlockDeletingServiceTestImpl.java | 2 +-
13 files changed, 817 insertions(+), 701 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
index ba3b817519..f8125cfa82 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
/**
* Metrics related to Block Deleting Service running on Datanode.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index ff974b9cf4..3c3371ebf3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -326,4 +327,18 @@ public final class ContainerUtils {
tarName);
}
}
+
+ public static long getPendingDeletionBlocks(ContainerData containerData) {
+ if (containerData.getContainerType()
+ .equals(ContainerProtos.ContainerType.KeyValueContainer)) {
+ return ((KeyValueContainerData) containerData)
+ .getNumPendingDeletionBlocks();
+ } else {
+ // If another ContainerType is available later, implement it
+ throw new IllegalArgumentException(
+ "getPendingDeletionBlocks for ContainerType: " +
+ containerData.getContainerType() +
+ " not support.");
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/BlockDeletingService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/BlockDeletingService.java
new file mode 100644
index 0000000000..87550d8656
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/BlockDeletingService.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingTask;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * A per-datanode container block deleting service takes in charge
+ * of deleting staled ozone blocks.
+ */
+public class BlockDeletingService extends BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockDeletingService.class);
+
+ private final OzoneContainer ozoneContainer;
+ private final ContainerDeletionChoosingPolicy containerDeletionPolicy;
+ private final ConfigurationSource conf;
+
+ private final int blockLimitPerInterval;
+
+ private final BlockDeletingServiceMetrics metrics;
+
+ // Task priority is useful when a to-delete block has weight.
+ private static final int TASK_PRIORITY_DEFAULT = 1;
+
+ public BlockDeletingService(OzoneContainer ozoneContainer,
+ long serviceInterval, long serviceTimeout,
+ TimeUnit timeUnit, int workerSize,
+ ConfigurationSource conf) {
+ super("BlockDeletingService", serviceInterval, timeUnit,
+ workerSize, serviceTimeout);
+ this.ozoneContainer = ozoneContainer;
+ try {
+ containerDeletionPolicy = conf.getClass(
+ ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
+ TopNOrderedContainerDeletionChoosingPolicy.class,
+ ContainerDeletionChoosingPolicy.class).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.conf = conf;
+ DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+ this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
+ metrics = BlockDeletingServiceMetrics.create();
+ }
+
+ /**
+ * Pair of container data and the number of blocks to delete.
+ */
+ public static class ContainerBlockInfo {
+ private final ContainerData containerData;
+ private final Long numBlocksToDelete;
+
+ public ContainerBlockInfo(ContainerData containerData, Long blocks) {
+ this.containerData = containerData;
+ this.numBlocksToDelete = blocks;
+ }
+
+ public ContainerData getContainerData() {
+ return containerData;
+ }
+
+ public Long getNumBlocksToDelete() {
+ return numBlocksToDelete;
+ }
+
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+ try {
+ // We at most list a number of containers a time,
+ // in case there are too many containers and start too many workers.
+ // We must ensure there is no empty container in this result.
+ // The chosen result depends on what container deletion policy is
+ // configured.
+ List<ContainerBlockInfo> containers =
+ chooseContainerForBlockDeletion(blockLimitPerInterval,
+ containerDeletionPolicy);
+
+ BackgroundTask
+ containerBlockInfos = null;
+ long totalBlocks = 0;
+ for (ContainerBlockInfo containerBlockInfo : containers) {
+ BlockDeletingTaskBuilder builder =
+ new BlockDeletingTaskBuilder();
+ builder.setBlockDeletingService(this)
+ .setContainerBlockInfo(containerBlockInfo)
+ .setPriority(TASK_PRIORITY_DEFAULT);
+ containerBlockInfos = builder.build();
+ queue.add(containerBlockInfos);
+ totalBlocks += containerBlockInfo.getNumBlocksToDelete();
+ }
+ metrics.incrTotalBlockChosenCount(totalBlocks);
+ metrics.incrTotalContainerChosenCount(containers.size());
+ if (containers.size() > 0) {
+ LOG.debug("Queued {} blocks from {} containers for deletion",
+ totalBlocks, containers.size());
+ }
+ } catch (StorageContainerException e) {
+ LOG.warn("Failed to initiate block deleting tasks, "
+ + "caused by unable to get containers info. "
+ + "Retry in next interval. ", e);
+ } catch (Exception e) {
+ // In case listContainer call throws any uncaught RuntimeException.
+ LOG.error("Unexpected error occurs during deleting blocks.", e);
+ }
+ return queue;
+ }
+
+ public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
+ int blockLimit, ContainerDeletionChoosingPolicy deletionPolicy)
+ throws StorageContainerException {
+
+ AtomicLong totalPendingBlockCount = new AtomicLong(0L);
+ Map<Long, ContainerData> containerDataMap =
+ ozoneContainer.getContainerSet().getContainerMap().entrySet().stream()
+ .filter(e -> (checkPendingDeletionBlocks(
+ e.getValue().getContainerData())))
+ .filter(e -> isDeletionAllowed(e.getValue().getContainerData(),
+ deletionPolicy)).collect(Collectors
+ .toMap(Map.Entry::getKey, e -> {
+ ContainerData containerData =
+ e.getValue().getContainerData();
+ totalPendingBlockCount
+ .addAndGet(
+ ContainerUtils.getPendingDeletionBlocks(containerData));
+ return containerData;
+ }));
+
+ metrics.setTotalPendingBlockCount(totalPendingBlockCount.get());
+ return deletionPolicy
+ .chooseContainerForBlockDeletion(blockLimit, containerDataMap);
+ }
+
+ private boolean checkPendingDeletionBlocks(ContainerData containerData) {
+ return ContainerUtils.getPendingDeletionBlocks(containerData) > 0;
+ }
+
+ private boolean isDeletionAllowed(ContainerData containerData,
+ ContainerDeletionChoosingPolicy deletionPolicy) {
+ if (!deletionPolicy
+ .isValidContainerType(containerData.getContainerType())) {
+ return false;
+ } else if (!containerData.isClosed()) {
+ return false;
+ } else {
+ if (ozoneContainer.getWriteChannel() instanceof XceiverServerRatis) {
+ XceiverServerRatis ratisServer =
+ (XceiverServerRatis) ozoneContainer.getWriteChannel();
+ final String originPipelineId = containerData.getOriginPipelineId();
+ if (originPipelineId == null || originPipelineId.isEmpty()) {
+ // In case the pipelineID is empty, just mark it for deletion.
+ // TODO: currently EC container goes through this path.
+ return true;
+ }
+ UUID pipelineUUID;
+ try {
+ pipelineUUID = UUID.fromString(originPipelineId);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Invalid pipelineID {} for container {}",
+ originPipelineId, containerData.getContainerID());
+ return false;
+ }
+ PipelineID pipelineID = PipelineID.valueOf(pipelineUUID);
+ // in case the ratis group does not exist, just mark it for deletion.
+ if (!ratisServer.isExist(pipelineID.getProtobuf())) {
+ return true;
+ }
+ try {
+ long minReplicatedIndex =
+ ratisServer.getMinReplicatedIndex(pipelineID);
+ long containerBCSID = containerData.getBlockCommitSequenceId();
+ if (minReplicatedIndex < containerBCSID) {
+ LOG.warn("Close Container log Index {} is not replicated across
all"
+ + " the servers in the pipeline {} as the min replicated "
+ + "index is {}. Deletion is not allowed in this container "
+ + "yet.", containerBCSID,
+ containerData.getOriginPipelineId(), minReplicatedIndex);
+ return false;
+ } else {
+ return true;
+ }
+ } catch (IOException ioe) {
+ // in case of any exception check again whether the pipeline exist
+ // and in case the pipeline got destroyed, just mark it for deletion
+ if (!ratisServer.isExist(pipelineID.getProtobuf())) {
+ return true;
+ } else {
+ LOG.info(ioe.getMessage());
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ public OzoneContainer getOzoneContainer() {
+ return ozoneContainer;
+ }
+
+ public ConfigurationSource getConf() {
+ return conf;
+ }
+
+ public BlockDeletingServiceMetrics getMetrics() {
+ return metrics;
+ }
+
+ private static class BlockDeletingTaskBuilder {
+ private BlockDeletingService blockDeletingService;
+ private BlockDeletingService.ContainerBlockInfo containerBlockInfo;
+ private int priority;
+
+ public BlockDeletingTaskBuilder setBlockDeletingService(
+ BlockDeletingService blockDeletingService) {
+ this.blockDeletingService = blockDeletingService;
+ return this;
+ }
+
+ public BlockDeletingTaskBuilder setContainerBlockInfo(
+ ContainerBlockInfo containerBlockInfo) {
+ this.containerBlockInfo = containerBlockInfo;
+ return this;
+ }
+
+ public BlockDeletingTaskBuilder setPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public BackgroundTask build() {
+ ContainerProtos.ContainerType containerType =
+ containerBlockInfo.getContainerData().getContainerType();
+ if (containerType
+ .equals(ContainerProtos.ContainerType.KeyValueContainer)) {
+ return
+ new BlockDeletingTask(blockDeletingService, containerBlockInfo,
+ priority);
+ } else {
+ // If another ContainerType is available later, implement it
+ throw new IllegalArgumentException(
+ "BlockDeletingTask for ContainerType: " + containerType +
+ "doesn't exist.");
+ }
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
index bd8db67690..90ff474827 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.impl;
import
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicyTemplate;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import java.util.Collections;
import java.util.List;
@@ -31,7 +30,7 @@ public class RandomContainerDeletionChoosingPolicy
@Override
protected void orderByDescendingPriority(
- List<KeyValueContainerData> candidateContainers) {
+ List<ContainerData> candidateContainers) {
Collections.shuffle(candidateContainers);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
index e45dbbd3c9..4889d5597c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
@@ -17,8 +17,8 @@
*/
package org.apache.hadoop.ozone.container.common.impl;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicyTemplate;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import java.util.Comparator;
import java.util.List;
@@ -30,16 +30,15 @@ import java.util.List;
public class TopNOrderedContainerDeletionChoosingPolicy
extends ContainerDeletionChoosingPolicyTemplate {
/** customized comparator used to compare differentiate container data. **/
- private static final Comparator<KeyValueContainerData>
- KEY_VALUE_CONTAINER_DATA_COMPARATOR = (KeyValueContainerData c1,
- KeyValueContainerData c2) ->
- Long.compare(c2.getNumPendingDeletionBlocks(),
- c1.getNumPendingDeletionBlocks());
+ private static final Comparator<ContainerData> CONTAINER_DATA_COMPARATOR =
+ (ContainerData c1, ContainerData c2) -> Long.compare(
+ ContainerUtils.getPendingDeletionBlocks(c2),
+ ContainerUtils.getPendingDeletionBlocks(c1));
@Override
protected void orderByDescendingPriority(
- List<KeyValueContainerData> candidateContainers) {
+ List<ContainerData> candidateContainers) {
// get top N list ordered by pending deletion blocks' number
- candidateContainers.sort(KEY_VALUE_CONTAINER_DATA_COMPARATOR);
+ candidateContainers.sort(CONTAINER_DATA_COMPARATOR);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
index 884c6a1627..81f4b66d92 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
@@ -21,7 +21,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;
+import
org.apache.hadoop.ozone.container.common.impl.BlockDeletingService.ContainerBlockInfo;
import java.util.List;
import java.util.Map;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
index d35cead5e4..c584ba7903 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicyTemplate.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.ozone.container.common.interfaces;
import com.google.common.base.Preconditions;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;
+import
org.apache.hadoop.ozone.container.common.impl.BlockDeletingService.ContainerBlockInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +49,10 @@ public abstract class
ContainerDeletionChoosingPolicyTemplate
int originalBlockCount = blockCount;
List<ContainerBlockInfo> result = new ArrayList<>();
- List<KeyValueContainerData> orderedList = new LinkedList<>();
+ List<ContainerData> orderedList = new LinkedList<>();
for (ContainerData entry: candidateContainers.values()) {
- orderedList.add((KeyValueContainerData) entry);
+ orderedList.add(entry);
}
orderByDescendingPriority(orderedList);
@@ -64,16 +64,18 @@ public abstract class
ContainerDeletionChoosingPolicyTemplate
// container but with container we also return an integer so that total
// blocks don't exceed the number of blocks to be deleted in an interval.
- for (KeyValueContainerData entry : orderedList) {
- if (entry.getNumPendingDeletionBlocks() > 0) {
- long numBlocksToDelete = Math.min(blockCount,
- entry.getNumPendingDeletionBlocks());
+ for (ContainerData entry : orderedList) {
+ long pendingDeletionBlocks =
+ ContainerUtils.getPendingDeletionBlocks(entry);
+
+ if (pendingDeletionBlocks > 0) {
+ long numBlocksToDelete = Math.min(blockCount, pendingDeletionBlocks);
blockCount -= numBlocksToDelete;
result.add(new ContainerBlockInfo(entry, numBlocksToDelete));
if (LOG.isDebugEnabled()) {
LOG.debug("Select container {} for block deletion, "
+ "pending deletion blocks num: {}.", entry.getContainerID(),
- entry.getNumPendingDeletionBlocks());
+ pendingDeletionBlocks);
}
if (blockCount == 0) {
break;
@@ -92,5 +94,5 @@ public abstract class ContainerDeletionChoosingPolicyTemplate
* @param candidateContainers candidate containers to be ordered
*/
protected abstract void orderByDescendingPriority(
- List<KeyValueContainerData> candidateContainers);
+ List<ContainerData> candidateContainers);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
deleted file mode 100644
index c7bdff0256..0000000000
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ /dev/null
@@ -1,672 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-import java.util.LinkedList;
-import java.util.Objects;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
-import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
-import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import
org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.ozone.container.metadata.DeleteTransactionStore;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
-
-import com.google.common.collect.Lists;
-
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
-
-import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A per-datanode container block deleting service takes in charge
- * of deleting staled ozone blocks.
- */
-
-public class BlockDeletingService extends BackgroundService {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(BlockDeletingService.class);
-
- private OzoneContainer ozoneContainer;
- private ContainerDeletionChoosingPolicy containerDeletionPolicy;
- private final ConfigurationSource conf;
-
- private final int blockLimitPerInterval;
-
- private final BlockDeletingServiceMetrics metrics;
-
- // Task priority is useful when a to-delete block has weight.
- private static final int TASK_PRIORITY_DEFAULT = 1;
-
- public BlockDeletingService(OzoneContainer ozoneContainer,
- long serviceInterval, long serviceTimeout,
- TimeUnit timeUnit, int workerSize,
- ConfigurationSource conf) {
- super("BlockDeletingService", serviceInterval, timeUnit,
- workerSize, serviceTimeout);
- this.ozoneContainer = ozoneContainer;
- try {
- containerDeletionPolicy = conf.getClass(
- ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
- TopNOrderedContainerDeletionChoosingPolicy.class,
- ContainerDeletionChoosingPolicy.class).newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- this.conf = conf;
- DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
- this.blockLimitPerInterval = dnConf.getBlockDeletionLimit();
- metrics = BlockDeletingServiceMetrics.create();
- }
-
- /**
- * Pair of container data and the number of blocks to delete.
- */
- public static class ContainerBlockInfo {
- private final ContainerData containerData;
- private final Long numBlocksToDelete;
-
- public ContainerBlockInfo(ContainerData containerData, Long blocks) {
- this.containerData = containerData;
- this.numBlocksToDelete = blocks;
- }
-
- public ContainerData getContainerData() {
- return containerData;
- }
-
- public Long getBlocks() {
- return numBlocksToDelete;
- }
-
- }
-
-
- @Override
- public BackgroundTaskQueue getTasks() {
- BackgroundTaskQueue queue = new BackgroundTaskQueue();
- List<ContainerBlockInfo> containers = Lists.newArrayList();
- try {
- // We at most list a number of containers a time,
- // in case there are too many containers and start too many workers.
- // We must ensure there is no empty container in this result.
- // The chosen result depends on what container deletion policy is
- // configured.
- containers = chooseContainerForBlockDeletion(blockLimitPerInterval,
- containerDeletionPolicy);
-
- BlockDeletingTask containerBlockInfos = null;
- long totalBlocks = 0;
- for (ContainerBlockInfo containerBlockInfo : containers) {
- containerBlockInfos =
- new BlockDeletingTask(containerBlockInfo.containerData,
- TASK_PRIORITY_DEFAULT, containerBlockInfo.numBlocksToDelete);
- queue.add(containerBlockInfos);
- totalBlocks += containerBlockInfo.numBlocksToDelete;
- }
- metrics.incrTotalBlockChosenCount(totalBlocks);
- metrics.incrTotalContainerChosenCount(containers.size());
- if (containers.size() > 0) {
- LOG.debug("Queued {} blocks from {} containers for deletion",
- totalBlocks, containers.size());
- }
- } catch (StorageContainerException e) {
- LOG.warn("Failed to initiate block deleting tasks, "
- + "caused by unable to get containers info. "
- + "Retry in next interval. ", e);
- } catch (Exception e) {
- // In case listContainer call throws any uncaught RuntimeException.
- LOG.error("Unexpected error occurs during deleting blocks.", e);
- }
- return queue;
- }
-
- public List<ContainerBlockInfo> chooseContainerForBlockDeletion(
- int blockLimit, ContainerDeletionChoosingPolicy deletionPolicy)
- throws StorageContainerException {
- Map<Long, ContainerData> containerDataMap =
- ozoneContainer.getContainerSet().getContainerMap().entrySet().stream()
- .filter(e -> ((KeyValueContainerData) e.getValue()
- .getContainerData()).getNumPendingDeletionBlocks() > 0)
- .filter(e -> isDeletionAllowed(e.getValue().getContainerData(),
- deletionPolicy)).collect(Collectors
- .toMap(Map.Entry::getKey, e -> e.getValue().getContainerData()));
-
- long totalPendingBlockCount =
- containerDataMap.values().stream().mapToLong(
- containerData -> ((KeyValueContainerData) containerData)
- .getNumPendingDeletionBlocks())
- .sum();
- metrics.setTotalPendingBlockCount(totalPendingBlockCount);
- return deletionPolicy
- .chooseContainerForBlockDeletion(blockLimit, containerDataMap);
- }
-
- private boolean isDeletionAllowed(ContainerData containerData,
- ContainerDeletionChoosingPolicy deletionPolicy) {
- if (!deletionPolicy
- .isValidContainerType(containerData.getContainerType())) {
- return false;
- } else if (!containerData.isClosed()) {
- return false;
- } else {
- if (ozoneContainer.getWriteChannel() instanceof XceiverServerRatis) {
- XceiverServerRatis ratisServer =
- (XceiverServerRatis) ozoneContainer.getWriteChannel();
- final String originPipelineId = containerData.getOriginPipelineId();
- if (originPipelineId == null || originPipelineId.isEmpty()) {
- // In case the pipelineID is empty, just mark it for deletion.
- // TODO: currently EC container goes through this path.
- return true;
- }
- UUID pipelineUUID;
- try {
- pipelineUUID = UUID.fromString(originPipelineId);
- } catch (IllegalArgumentException e) {
- LOG.warn("Invalid pipelineID {} for container {}",
- originPipelineId, containerData.getContainerID());
- return false;
- }
- PipelineID pipelineID = PipelineID.valueOf(pipelineUUID);
- // in case the ratis group does not exist, just mark it for deletion.
- if (!ratisServer.isExist(pipelineID.getProtobuf())) {
- return true;
- }
- try {
- long minReplicatedIndex =
- ratisServer.getMinReplicatedIndex(pipelineID);
- long containerBCSID = containerData.getBlockCommitSequenceId();
- if (minReplicatedIndex < containerBCSID) {
- LOG.warn("Close Container log Index {} is not replicated across
all"
- + " the servers in the pipeline {} as the min replicated "
- + "index is {}. Deletion is not allowed in this container "
- + "yet.", containerBCSID,
- containerData.getOriginPipelineId(), minReplicatedIndex);
- return false;
- } else {
- return true;
- }
- } catch (IOException ioe) {
- // in case of any exception check again whether the pipeline exist
- // and in case the pipeline got destroyed, just mark it for deletion
- if (!ratisServer.isExist(pipelineID.getProtobuf())) {
- return true;
- } else {
- LOG.info(ioe.getMessage());
- return false;
- }
- }
- }
- return true;
- }
- }
-
- private static class ContainerBackgroundTaskResult
- implements BackgroundTaskResult {
- private List<String> deletedBlockIds;
-
- ContainerBackgroundTaskResult() {
- deletedBlockIds = new LinkedList<>();
- }
-
- public void addBlockId(String blockId) {
- deletedBlockIds.add(blockId);
- }
-
- public void addAll(List<String> blockIds) {
- deletedBlockIds.addAll(blockIds);
- }
-
- public List<String> getDeletedBlocks() {
- return deletedBlockIds;
- }
-
- @Override
- public int getSize() {
- return deletedBlockIds.size();
- }
- }
-
- private class BlockDeletingTask implements BackgroundTask {
-
- private final int priority;
- private final KeyValueContainerData containerData;
- private final long blocksToDelete;
-
- BlockDeletingTask(ContainerData containerName, int priority,
- long blocksToDelete) {
- this.priority = priority;
- this.containerData = (KeyValueContainerData) containerName;
- this.blocksToDelete = blocksToDelete;
- }
-
- @Override
- public BackgroundTaskResult call() throws Exception {
- ContainerBackgroundTaskResult crr;
- final Container container = ozoneContainer.getContainerSet()
- .getContainer(containerData.getContainerID());
- container.writeLock();
- File dataDir = new File(containerData.getChunksPath());
- long startTime = Time.monotonicNow();
- // Scan container's db and get list of under deletion blocks
- try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
- if (containerData.hasSchema(SCHEMA_V1)) {
- crr = deleteViaSchema1(meta, container, dataDir, startTime);
- } else if (containerData.hasSchema(SCHEMA_V2)) {
- crr = deleteViaSchema2(meta, container, dataDir, startTime);
- } else if (containerData.hasSchema(SCHEMA_V3)) {
- crr = deleteViaSchema3(meta, container, dataDir, startTime);
- } else {
- throw new UnsupportedOperationException(
- "Only schema version 1,2,3 are supported.");
- }
- return crr;
- } finally {
- container.writeUnlock();
- }
- }
-
- public boolean checkDataDir(File dataDir) {
- boolean b = true;
- if (!dataDir.exists() || !dataDir.isDirectory()) {
- LOG.error("Invalid container data dir {} : "
- + "does not exist or not a directory", dataDir.getAbsolutePath());
- b = false;
- }
- return b;
- }
-
- public ContainerBackgroundTaskResult deleteViaSchema1(
- DBHandle meta, Container container, File dataDir,
- long startTime) throws IOException {
- ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
- if (!checkDataDir(dataDir)) {
- return crr;
- }
- try {
- Table<String, BlockData> blockDataTable =
- meta.getStore().getBlockDataTable();
-
- // # of blocks to delete is throttled
- KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
- List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
- blockDataTable
- .getSequentialRangeKVs(containerData.startKeyEmpty(),
- (int) blocksToDelete, containerData.containerPrefix(),
- filter);
- if (toDeleteBlocks.isEmpty()) {
- LOG.debug("No under deletion block found in container : {}",
- containerData.getContainerID());
- return crr;
- }
-
- List<String> succeedBlocks = new LinkedList<>();
- LOG.debug("Container : {}, To-Delete blocks : {}",
- containerData.getContainerID(), toDeleteBlocks.size());
-
- Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
- .getHandler(container.getContainerType()));
-
- long releasedBytes = 0;
- for (Table.KeyValue<String, BlockData> entry: toDeleteBlocks) {
- String blockName = entry.getKey();
- LOG.debug("Deleting block {}", blockName);
- if (entry.getValue() == null) {
- LOG.warn("Missing delete block(Container = " +
- container.getContainerData().getContainerID() + ", Block = " +
- blockName);
- continue;
- }
- try {
- handler.deleteBlock(container, entry.getValue());
- releasedBytes += KeyValueContainerUtil.getBlockLength(
- entry.getValue());
- succeedBlocks.add(blockName);
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Failed to parse block info for block {}", blockName, e);
- } catch (IOException e) {
- LOG.error("Failed to delete files for block {}", blockName, e);
- }
- }
-
- // Once chunks in the blocks are deleted... remove the blockID from
- // blockDataTable.
- try (BatchOperation batch = meta.getStore().getBatchHandler()
- .initBatchOperation()) {
- for (String entry : succeedBlocks) {
- blockDataTable.deleteWithBatch(batch, entry);
- }
-
- // Handler.deleteBlock calls deleteChunk to delete all the chunks
- // in the block. The ContainerData stats (DB and in-memory) are not
- // updated with decremented used bytes during deleteChunk. This is
- // done here so that all the DB update for block delete can be
- // batched together while committing to DB.
- int deletedBlocksCount = succeedBlocks.size();
- containerData.updateAndCommitDBCounters(meta, batch,
- deletedBlocksCount, releasedBytes);
- // Once DB update is persisted, check if there are any blocks
- // remaining in the DB. This will determine whether the container
- // can be deleted by SCM.
- if (!container.hasBlocks()) {
- containerData.markAsEmpty();
- }
-
- // update count of pending deletion blocks, block count and used
- // bytes in in-memory container status.
- containerData.decrPendingDeletionBlocks(deletedBlocksCount);
- containerData.decrBlockCount(deletedBlocksCount);
- containerData.decrBytesUsed(releasedBytes);
- containerData.getVolume().decrementUsedSpace(releasedBytes);
- metrics.incrSuccessCount(deletedBlocksCount);
- metrics.incrSuccessBytes(releasedBytes);
- }
-
- if (!succeedBlocks.isEmpty()) {
- LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, "
+
- "task elapsed time: {}ms", containerData.getContainerID(),
- succeedBlocks.size(), releasedBytes,
- Time.monotonicNow() - startTime);
- }
- crr.addAll(succeedBlocks);
- return crr;
- } catch (IOException exception) {
- LOG.warn("Deletion operation was not successful for container: " +
- container.getContainerData().getContainerID(), exception);
- metrics.incrFailureCount();
- throw exception;
- }
- }
-
- public ContainerBackgroundTaskResult deleteViaSchema2(
- DBHandle meta, Container container, File dataDir,
- long startTime) throws IOException {
- Deleter schema2Deleter = (table, batch, tid) -> {
- Table<Long, DeletedBlocksTransaction> delTxTable =
- (Table<Long, DeletedBlocksTransaction>) table;
- delTxTable.deleteWithBatch(batch, tid);
- };
- Table<Long, DeletedBlocksTransaction> deleteTxns =
- ((DeleteTransactionStore<Long>) meta.getStore())
- .getDeleteTransactionTable();
- try (TableIterator<Long,
- ? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
- iterator = deleteTxns.iterator()) {
- return deleteViaTransactionStore(
- iterator, meta,
- container, dataDir, startTime, schema2Deleter);
- }
- }
-
- public ContainerBackgroundTaskResult deleteViaSchema3(
- DBHandle meta, Container container, File dataDir,
- long startTime) throws IOException {
- Deleter schema3Deleter = (table, batch, tid) -> {
- Table<String, DeletedBlocksTransaction> delTxTable =
- (Table<String, DeletedBlocksTransaction>) table;
- delTxTable.deleteWithBatch(batch,
- containerData.getDeleteTxnKey(tid));
- };
- Table<String, DeletedBlocksTransaction> deleteTxns =
- ((DeleteTransactionStore<String>) meta.getStore())
- .getDeleteTransactionTable();
- try (TableIterator<String,
- ? extends Table.KeyValue<String, DeletedBlocksTransaction>>
- iterator = deleteTxns.iterator(containerData.containerPrefix())) {
- return deleteViaTransactionStore(
- iterator, meta,
- container, dataDir, startTime, schema3Deleter);
- }
- }
-
- private ContainerBackgroundTaskResult deleteViaTransactionStore(
- TableIterator<?, ? extends Table.KeyValue<?, DeletedBlocksTransaction>>
- iter, DBHandle meta, Container container, File dataDir,
- long startTime, Deleter deleter) throws IOException {
- ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
- if (!checkDataDir(dataDir)) {
- return crr;
- }
- try {
- Table<String, BlockData> blockDataTable =
- meta.getStore().getBlockDataTable();
- DeleteTransactionStore<?> txnStore =
- (DeleteTransactionStore<?>) meta.getStore();
- Table<?, DeletedBlocksTransaction> deleteTxns =
- txnStore.getDeleteTransactionTable();
- List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
- int numBlocks = 0;
- while (iter.hasNext() && (numBlocks < blocksToDelete)) {
- DeletedBlocksTransaction delTx = iter.next().getValue();
- numBlocks += delTx.getLocalIDList().size();
- delBlocks.add(delTx);
- }
- if (delBlocks.isEmpty()) {
- LOG.info("No transaction found in container {} with pending delete "
+
- "block count {}",
- containerData.getContainerID(),
- containerData.getNumPendingDeletionBlocks());
- // If the container was queued for delete, it had a positive
- // pending delete block count. After checking the DB there were
- // actually no delete transactions for the container, so reset the
- // pending delete block count to the correct value of zero.
- containerData.resetPendingDeleteBlockCount(meta);
- return crr;
- }
-
- LOG.debug("Container : {}, To-Delete blocks : {}",
- containerData.getContainerID(), delBlocks.size());
-
- Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
- .getHandler(container.getContainerType()));
-
- DeleteTransactionStats deleteBlocksResult =
- deleteTransactions(delBlocks, handler, blockDataTable, container);
- int deletedBlocksProcessed = deleteBlocksResult.getBlocksProcessed();
- int deletedBlocksCount = deleteBlocksResult.getBlocksDeleted();
- long releasedBytes = deleteBlocksResult.getBytesReleased();
-
- // Once blocks are deleted... remove the blockID from blockDataTable
- // and also remove the transactions from txnTable.
- try (BatchOperation batch = meta.getStore().getBatchHandler()
- .initBatchOperation()) {
- for (DeletedBlocksTransaction delTx : delBlocks) {
- deleter.apply(deleteTxns, batch, delTx.getTxID());
- for (Long blk : delTx.getLocalIDList()) {
- blockDataTable.deleteWithBatch(batch,
- containerData.getBlockKey(blk));
- }
- }
-
- // Handler.deleteBlock calls deleteChunk to delete all the chunks
- // in the block. The ContainerData stats (DB and in-memory) are not
- // updated with decremented used bytes during deleteChunk. This is
- // done here so that all the DB updates for block delete can be
- // batched together while committing to DB.
- containerData.updateAndCommitDBCounters(meta, batch,
- deletedBlocksCount, releasedBytes);
- // Once DB update is persisted, check if there are any blocks
- // remaining in the DB. This will determine whether the container
- // can be deleted by SCM.
- if (!container.hasBlocks()) {
- containerData.markAsEmpty();
- }
-
- // update count of pending deletion blocks, block count and used
- // bytes in in-memory container status and used space in volume.
- containerData.decrPendingDeletionBlocks(deletedBlocksProcessed);
- containerData.decrBlockCount(deletedBlocksCount);
- containerData.decrBytesUsed(releasedBytes);
- containerData.getVolume().decrementUsedSpace(releasedBytes);
- metrics.incrSuccessCount(deletedBlocksCount);
- metrics.incrSuccessBytes(releasedBytes);
- }
-
- LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " +
- "task elapsed time: {}ms", containerData.getContainerID(),
- deletedBlocksCount, releasedBytes, Time.monotonicNow() -
startTime);
-
- return crr;
- } catch (IOException exception) {
- LOG.warn("Deletion operation was not successful for container: " +
- container.getContainerData().getContainerID(), exception);
- metrics.incrFailureCount();
- throw exception;
- }
- }
-
- /**
- * Delete the chunks for the given blocks.
- * Return the deletedBlocks count and number of bytes released.
- */
- private DeleteTransactionStats deleteTransactions(
- List<DeletedBlocksTransaction> delBlocks, Handler handler,
- Table<String, BlockData> blockDataTable, Container container)
- throws IOException {
- int blocksProcessed = 0;
- int blocksDeleted = 0;
- long bytesReleased = 0;
- for (DeletedBlocksTransaction entry : delBlocks) {
- for (Long blkLong : entry.getLocalIDList()) {
- String blk = containerData.getBlockKey(blkLong);
- BlockData blkInfo = blockDataTable.get(blk);
- LOG.debug("Deleting block {}", blkLong);
- if (blkInfo == null) {
- try {
- handler.deleteUnreferenced(container, blkLong);
- } catch (IOException e) {
- LOG.error("Failed to delete files for unreferenced block {} of" +
- " container {}", blkLong,
- container.getContainerData().getContainerID(), e);
- } finally {
- blocksProcessed++;
- }
- continue;
- }
-
- boolean deleted = false;
- try {
- handler.deleteBlock(container, blkInfo);
- blocksDeleted++;
- deleted = true;
- } catch (IOException e) {
- // TODO: if deletion of certain block retries exceed the certain
- // number of times, service should skip deleting it,
- // otherwise invalid numPendingDeletionBlocks could accumulate
- // beyond the limit and the following deletion will stop.
- LOG.error("Failed to delete files for block {}", blkLong, e);
- } finally {
- blocksProcessed++;
- }
-
- if (deleted) {
- try {
- bytesReleased += KeyValueContainerUtil.getBlockLength(blkInfo);
- } catch (IOException e) {
- // TODO: handle the bytesReleased correctly for the unexpected
- // exception.
- LOG.error("Failed to get block length for block {}", blkLong, e);
- }
- }
- }
- }
- return new DeleteTransactionStats(blocksProcessed,
- blocksDeleted, bytesReleased);
- }
-
- @Override
- public int getPriority() {
- return priority;
- }
- }
-
- private interface Deleter {
- void apply(Table<?, DeletedBlocksTransaction> deleteTxnsTable,
- BatchOperation batch, long txnID) throws IOException;
- }
-
- public BlockDeletingServiceMetrics getMetrics() {
- return metrics;
- }
-
- /**
- * The wrapper class of the result of deleting transactions.
- */
- private static class DeleteTransactionStats {
-
- private final int blocksProcessed;
- private final int blocksDeleted;
- private final long bytesReleased;
-
- DeleteTransactionStats(int proceeded, int deleted, long released) {
- blocksProcessed = proceeded;
- blocksDeleted = deleted;
- bytesReleased = released;
- }
-
- public int getBlocksProcessed() {
- return blocksProcessed;
- }
-
- public int getBlocksDeleted() {
- return blocksDeleted;
- }
-
- public long getBytesReleased() {
- return bytesReleased;
- }
- }
-}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
new file mode 100644
index 0000000000..f2cca28bd8
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.metadata.DeleteTransactionStore;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
+
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockDeletingTask for KeyValueContainer.
+ */
+public class BlockDeletingTask implements BackgroundTask {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockDeletingTask.class);
+
+ private final BlockDeletingServiceMetrics metrics;
+ private final int priority;
+ private final KeyValueContainerData containerData;
+ private final long blocksToDelete;
+ private final OzoneContainer ozoneContainer;
+ private final ConfigurationSource conf;
+
+ public BlockDeletingTask(
+ BlockDeletingService blockDeletingService,
+ BlockDeletingService.ContainerBlockInfo containerBlockInfo,
+ int priority) {
+ this.ozoneContainer = blockDeletingService.getOzoneContainer();
+ this.metrics = blockDeletingService.getMetrics();
+ this.conf = blockDeletingService.getConf();
+ this.priority = priority;
+ this.containerData =
+ (KeyValueContainerData) containerBlockInfo.getContainerData();
+ this.blocksToDelete = containerBlockInfo.getNumBlocksToDelete();
+ }
+
+ private static class ContainerBackgroundTaskResult
+ implements BackgroundTaskResult {
+ private List<String> deletedBlockIds;
+
+ ContainerBackgroundTaskResult() {
+ deletedBlockIds = new LinkedList<>();
+ }
+
+ public void addBlockId(String blockId) {
+ deletedBlockIds.add(blockId);
+ }
+
+ public void addAll(List<String> blockIds) {
+ deletedBlockIds.addAll(blockIds);
+ }
+
+ public List<String> getDeletedBlocks() {
+ return deletedBlockIds;
+ }
+
+ @Override
+ public int getSize() {
+ return deletedBlockIds.size();
+ }
+ }
+
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ ContainerBackgroundTaskResult crr;
+ final Container container = ozoneContainer.getContainerSet()
+ .getContainer(containerData.getContainerID());
+ container.writeLock();
+ File dataDir = new File(containerData.getChunksPath());
+ long startTime = Time.monotonicNow();
+ // Scan container's db and get list of under deletion blocks
+ try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
+ if (containerData.hasSchema(SCHEMA_V1)) {
+ crr = deleteViaSchema1(meta, container, dataDir, startTime);
+ } else if (containerData.hasSchema(SCHEMA_V2)) {
+ crr = deleteViaSchema2(meta, container, dataDir, startTime);
+ } else if (containerData.hasSchema(SCHEMA_V3)) {
+ crr = deleteViaSchema3(meta, container, dataDir, startTime);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1,2,3 are supported.");
+ }
+ return crr;
+ } finally {
+ container.writeUnlock();
+ }
+ }
+
+ public boolean checkDataDir(File dataDir) {
+ boolean b = true;
+ if (!dataDir.exists() || !dataDir.isDirectory()) {
+ LOG.error("Invalid container data dir {} : "
+ + "does not exist or not a directory", dataDir.getAbsolutePath());
+ b = false;
+ }
+ return b;
+ }
+
+ public ContainerBackgroundTaskResult deleteViaSchema1(
+ DBHandle meta, Container container, File dataDir,
+ long startTime) throws IOException {
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ if (!checkDataDir(dataDir)) {
+ return crr;
+ }
+ try {
+ Table<String, BlockData> blockDataTable =
+ meta.getStore().getBlockDataTable();
+
+ // # of blocks to delete is throttled
+ KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
+ List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
+ blockDataTable
+ .getSequentialRangeKVs(containerData.startKeyEmpty(),
+ (int) blocksToDelete, containerData.containerPrefix(),
+ filter);
+ if (toDeleteBlocks.isEmpty()) {
+ LOG.debug("No under deletion block found in container : {}",
+ containerData.getContainerID());
+ return crr;
+ }
+
+ List<String> succeedBlocks = new LinkedList<>();
+ LOG.debug("Container : {}, To-Delete blocks : {}",
+ containerData.getContainerID(), toDeleteBlocks.size());
+
+ Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+ .getHandler(container.getContainerType()));
+
+ long releasedBytes = 0;
+ for (Table.KeyValue<String, BlockData> entry : toDeleteBlocks) {
+ String blockName = entry.getKey();
+ LOG.debug("Deleting block {}", blockName);
+ if (entry.getValue() == null) {
+ LOG.warn("Missing delete block(Container = " +
+ container.getContainerData().getContainerID() +
+ ", Block = " + blockName);
+ continue;
+ }
+ try {
+ handler.deleteBlock(container, entry.getValue());
+ releasedBytes += KeyValueContainerUtil.getBlockLength(
+ entry.getValue());
+ succeedBlocks.add(blockName);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Failed to parse block info for block {}", blockName, e);
+ } catch (IOException e) {
+ LOG.error("Failed to delete files for block {}", blockName, e);
+ }
+ }
+
+ // Once chunks in the blocks are deleted... remove the blockID from
+ // blockDataTable.
+ try (BatchOperation batch = meta.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ for (String entry : succeedBlocks) {
+ blockDataTable.deleteWithBatch(batch, entry);
+ }
+
+ // Handler.deleteBlock calls deleteChunk to delete all the chunks
+ // in the block. The ContainerData stats (DB and in-memory) are not
+ // updated with decremented used bytes during deleteChunk. This is
+ // done here so that all the DB update for block delete can be
+ // batched together while committing to DB.
+ int deletedBlocksCount = succeedBlocks.size();
+ containerData.updateAndCommitDBCounters(meta, batch,
+ deletedBlocksCount, releasedBytes);
+ // Once DB update is persisted, check if there are any blocks
+ // remaining in the DB. This will determine whether the container
+ // can be deleted by SCM.
+ if (!container.hasBlocks()) {
+ containerData.markAsEmpty();
+ }
+
+ // update count of pending deletion blocks, block count and used
+ // bytes in in-memory container status.
+ containerData.decrPendingDeletionBlocks(deletedBlocksCount);
+ containerData.decrBlockCount(deletedBlocksCount);
+ containerData.decrBytesUsed(releasedBytes);
+ containerData.getVolume().decrementUsedSpace(releasedBytes);
+ metrics.incrSuccessCount(deletedBlocksCount);
+ metrics.incrSuccessBytes(releasedBytes);
+ }
+
+ if (!succeedBlocks.isEmpty()) {
+ LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " +
+ "task elapsed time: {}ms", containerData.getContainerID(),
+ succeedBlocks.size(), releasedBytes,
+ Time.monotonicNow() - startTime);
+ }
+ crr.addAll(succeedBlocks);
+ return crr;
+ } catch (IOException exception) {
+ LOG.warn("Deletion operation was not successful for container: " +
+ container.getContainerData().getContainerID(), exception);
+ metrics.incrFailureCount();
+ throw exception;
+ }
+ }
+
+ public ContainerBackgroundTaskResult deleteViaSchema2(
+ DBHandle meta, Container container, File dataDir,
+ long startTime) throws IOException {
+ Deleter schema2Deleter = (table, batch, tid) -> {
+ Table<Long, DeletedBlocksTransaction> delTxTable =
+ (Table<Long, DeletedBlocksTransaction>) table;
+ delTxTable.deleteWithBatch(batch, tid);
+ };
+ Table<Long, DeletedBlocksTransaction> deleteTxns =
+ ((DeleteTransactionStore<Long>) meta.getStore())
+ .getDeleteTransactionTable();
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
+ iterator = deleteTxns.iterator()) {
+ return deleteViaTransactionStore(
+ iterator, meta,
+ container, dataDir, startTime, schema2Deleter);
+ }
+ }
+
+ public ContainerBackgroundTaskResult deleteViaSchema3(
+ DBHandle meta, Container container, File dataDir,
+ long startTime) throws IOException {
+ Deleter schema3Deleter = (table, batch, tid) -> {
+ Table<String, DeletedBlocksTransaction> delTxTable =
+ (Table<String, DeletedBlocksTransaction>) table;
+ delTxTable.deleteWithBatch(batch,
+ containerData.getDeleteTxnKey(tid));
+ };
+ Table<String, DeletedBlocksTransaction> deleteTxns =
+ ((DeleteTransactionStore<String>) meta.getStore())
+ .getDeleteTransactionTable();
+ try (TableIterator<String,
+ ? extends Table.KeyValue<String, DeletedBlocksTransaction>>
+ iterator = deleteTxns.iterator(containerData.containerPrefix())) {
+ return deleteViaTransactionStore(
+ iterator, meta,
+ container, dataDir, startTime, schema3Deleter);
+ }
+ }
+
+ private ContainerBackgroundTaskResult deleteViaTransactionStore(
+ TableIterator<?, ? extends Table.KeyValue<?, DeletedBlocksTransaction>>
+ iter, DBHandle meta, Container container, File dataDir,
+ long startTime, Deleter deleter) throws IOException {
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ if (!checkDataDir(dataDir)) {
+ return crr;
+ }
+ try {
+ Table<String, BlockData> blockDataTable =
+ meta.getStore().getBlockDataTable();
+ DeleteTransactionStore<?> txnStore =
+ (DeleteTransactionStore<?>) meta.getStore();
+ Table<?, DeletedBlocksTransaction> deleteTxns =
+ txnStore.getDeleteTransactionTable();
+ List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
+ int numBlocks = 0;
+ while (iter.hasNext() && (numBlocks < blocksToDelete)) {
+ DeletedBlocksTransaction delTx = iter.next().getValue();
+ numBlocks += delTx.getLocalIDList().size();
+ delBlocks.add(delTx);
+ }
+ if (delBlocks.isEmpty()) {
+ LOG.info("No transaction found in container {} with pending delete " +
+ "block count {}",
+ containerData.getContainerID(),
+ containerData.getNumPendingDeletionBlocks());
+ // If the container was queued for delete, it had a positive
+ // pending delete block count. After checking the DB there were
+ // actually no delete transactions for the container, so reset the
+ // pending delete block count to the correct value of zero.
+ containerData.resetPendingDeleteBlockCount(meta);
+ return crr;
+ }
+
+ LOG.debug("Container : {}, To-Delete blocks : {}",
+ containerData.getContainerID(), delBlocks.size());
+
+ Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+ .getHandler(container.getContainerType()));
+
+ DeleteTransactionStats deleteBlocksResult =
+ deleteTransactions(delBlocks, handler, blockDataTable, container);
+ int deletedBlocksProcessed = deleteBlocksResult.getBlocksProcessed();
+ int deletedBlocksCount = deleteBlocksResult.getBlocksDeleted();
+ long releasedBytes = deleteBlocksResult.getBytesReleased();
+
+ // Once blocks are deleted... remove the blockID from blockDataTable
+ // and also remove the transactions from txnTable.
+ try (BatchOperation batch = meta.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ for (DeletedBlocksTransaction delTx : delBlocks) {
+ deleter.apply(deleteTxns, batch, delTx.getTxID());
+ for (Long blk : delTx.getLocalIDList()) {
+ blockDataTable.deleteWithBatch(batch,
+ containerData.getBlockKey(blk));
+ }
+ }
+
+ // Handler.deleteBlock calls deleteChunk to delete all the chunks
+ // in the block. The ContainerData stats (DB and in-memory) are not
+ // updated with decremented used bytes during deleteChunk. This is
+ // done here so that all the DB updates for block delete can be
+ // batched together while committing to DB.
+ containerData.updateAndCommitDBCounters(meta, batch,
+ deletedBlocksCount, releasedBytes);
+ // Once DB update is persisted, check if there are any blocks
+ // remaining in the DB. This will determine whether the container
+ // can be deleted by SCM.
+ if (!container.hasBlocks()) {
+ containerData.markAsEmpty();
+ }
+
+ // update count of pending deletion blocks, block count and used
+ // bytes in in-memory container status and used space in volume.
+ containerData.decrPendingDeletionBlocks(deletedBlocksProcessed);
+ containerData.decrBlockCount(deletedBlocksCount);
+ containerData.decrBytesUsed(releasedBytes);
+ containerData.getVolume().decrementUsedSpace(releasedBytes);
+ metrics.incrSuccessCount(deletedBlocksCount);
+ metrics.incrSuccessBytes(releasedBytes);
+ }
+
+ LOG.debug("Container: {}, deleted blocks: {}, space reclaimed: {}, " +
+ "task elapsed time: {}ms", containerData.getContainerID(),
+ deletedBlocksCount, releasedBytes, Time.monotonicNow() - startTime);
+
+ return crr;
+ } catch (IOException exception) {
+ LOG.warn("Deletion operation was not successful for container: " +
+ container.getContainerData().getContainerID(), exception);
+ metrics.incrFailureCount();
+ throw exception;
+ }
+ }
+
+ /**
+ * Delete the chunks for the given blocks.
+ * Return the deletedBlocks count and number of bytes released.
+ */
+ private DeleteTransactionStats deleteTransactions(
+ List<DeletedBlocksTransaction> delBlocks, Handler handler,
+ Table<String, BlockData> blockDataTable, Container container)
+ throws IOException {
+ int blocksProcessed = 0;
+ int blocksDeleted = 0;
+ long bytesReleased = 0;
+ for (DeletedBlocksTransaction entry : delBlocks) {
+ for (Long blkLong : entry.getLocalIDList()) {
+ String blk = containerData.getBlockKey(blkLong);
+ BlockData blkInfo = blockDataTable.get(blk);
+ LOG.debug("Deleting block {}", blkLong);
+ if (blkInfo == null) {
+ try {
+ handler.deleteUnreferenced(container, blkLong);
+ } catch (IOException e) {
+ LOG.error("Failed to delete files for unreferenced block {} of" +
+ " container {}", blkLong,
+ container.getContainerData().getContainerID(), e);
+ } finally {
+ blocksProcessed++;
+ }
+ continue;
+ }
+
+ boolean deleted = false;
+ try {
+ handler.deleteBlock(container, blkInfo);
+ blocksDeleted++;
+ deleted = true;
+ } catch (IOException e) {
+ // TODO: if deletion of certain block retries exceed the certain
+ // number of times, service should skip deleting it,
+ // otherwise invalid numPendingDeletionBlocks could accumulate
+ // beyond the limit and the following deletion will stop.
+ LOG.error("Failed to delete files for block {}", blkLong, e);
+ } finally {
+ blocksProcessed++;
+ }
+
+ if (deleted) {
+ try {
+ bytesReleased += KeyValueContainerUtil.getBlockLength(blkInfo);
+ } catch (IOException e) {
+ // TODO: handle the bytesReleased correctly for the unexpected
+ // exception.
+ LOG.error("Failed to get block length for block {}", blkLong, e);
+ }
+ }
+ }
+ }
+ return new DeleteTransactionStats(blocksProcessed,
+ blocksDeleted, bytesReleased);
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ private interface Deleter {
+ void apply(Table<?, DeletedBlocksTransaction> deleteTxnsTable,
+ BatchOperation batch, long txnID) throws IOException;
+ }
+
+ /**
+ * The wrapper class of the result of deleting transactions.
+ */
+ private static class DeleteTransactionStats {
+
+ private final int blocksProcessed;
+ private final int blocksDeleted;
+ private final long bytesReleased;
+
+ DeleteTransactionStats(int proceeded, int deleted, long released) {
+ blocksProcessed = proceeded;
+ blocksDeleted = deleted;
+ bytesReleased = released;
+ }
+
+ public int getBlocksProcessed() {
+ return blocksProcessed;
+ }
+
+ public int getBlocksDeleted() {
+ return blocksDeleted;
+ }
+
+ public long getBytesReleased() {
+ return bytesReleased;
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 29ac6ea240..50c406d35e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.security.token.TokenVerifier;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
@@ -53,7 +54,6 @@ import
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 2e49cd8882..bed5ac40f1 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -62,7 +62,7 @@ import
org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
index cb86bf38e7..67338f860d 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
@@ -38,8 +38,7 @@ import
org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoo
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService.ContainerBlockInfo;
+import
org.apache.hadoop.ozone.container.common.impl.BlockDeletingService.ContainerBlockInfo;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
@@ -120,7 +119,7 @@ public class TestContainerDeletionChoosingPolicy {
long totPendingBlocks = 0;
for (ContainerBlockInfo pr : result0) {
- totPendingBlocks += pr.getBlocks();
+ totPendingBlocks += pr.getNumBlocksToDelete();
}
Assert.assertTrue(totPendingBlocks >= blockLimitPerInterval);
@@ -192,7 +191,7 @@ public class TestContainerDeletionChoosingPolicy {
.chooseContainerForBlockDeletion(blockLimitPerInterval,
deletionPolicy);
long totPendingBlocks = 0;
for (ContainerBlockInfo pr : result0) {
- totPendingBlocks += pr.getBlocks();
+ totPendingBlocks += pr.getNumBlocksToDelete();
}
Assert.assertTrue(totPendingBlocks >= blockLimitPerInterval);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
index ee2bcfee6d..fdf4dd0d57 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import com.google.common.annotations.VisibleForTesting;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]