Repository: hadoop Updated Branches: refs/heads/HDFS-7240 013c36f3c -> f6b937810
HDFS-12443. Ozone: Improve SCM block deletion throttling algorithm. Contributed by Yiqun Lin. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6b93781 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6b93781 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6b93781 Branch: refs/heads/HDFS-7240 Commit: f6b937810228a21b0695faccbfd4372dffaf2cc6 Parents: 013c36f Author: Yiqun Lin <[email protected]> Authored: Mon Nov 6 20:21:51 2017 +0800 Committer: Yiqun Lin <[email protected]> Committed: Mon Nov 6 20:21:51 2017 +0800 ---------------------------------------------------------------------- .../DeleteBlocksCommandHandler.java | 4 +- .../ozone/scm/StorageContainerManager.java | 3 +- .../hadoop/ozone/scm/block/BlockManager.java | 5 + .../ozone/scm/block/BlockManagerImpl.java | 7 +- .../block/DatanodeDeletedBlockTransactions.java | 130 ++++++++++++ .../hadoop/ozone/scm/block/DeletedBlockLog.java | 10 + .../ozone/scm/block/DeletedBlockLogImpl.java | 22 +++ .../scm/block/SCMBlockDeletingService.java | 197 +++++++------------ .../ozone/TestStorageContainerManager.java | 161 +++++++++++---- .../ozone/scm/block/TestDeletedBlockLog.java | 102 ++++++++++ 10 files changed, 470 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index f16a4ff..1c859ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -165,7 +165,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler { try { containerDB.writeBatch(batch); newDeletionBlocks++; - LOG.info("Transited Block {} to DELETING state in container {}", + LOG.debug("Transited Block {} to DELETING state in container {}", blk, containerId); } catch (IOException e) { // if some blocks failed to delete, we fail this TX, @@ -175,7 +175,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler { "Failed to delete blocks for TXID = " + delTX.getTxID(), e); } } else { - LOG.info("Block {} not found or already under deletion in" + LOG.debug("Block {} not found or already under deletion in" + " container {}, skip deleting it.", blk, containerId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index d341e2c..b6f5239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -811,7 +811,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl + "success={}", result.getTxID(), result.getSuccess()); } if (result.getSuccess()) { - LOG.info("Purging TXID={} from block deletion log", result.getTxID()); + LOG.debug("Purging TXID={} from block deletion log", + result.getTxID()); this.getScmBlockManager().getDeletedBlockLog() .commitTransactions(Collections.singletonList(result.getTxID())); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java index da13f4a..b0f4da6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java @@ -78,4 +78,9 @@ public interface BlockManager extends Closeable { * @throws IOException */ void stop() throws IOException; + + /** + * @return the block deleting service executed in SCM. + */ + SCMBlockDeletingService getSCMBlockDeletingService(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index a6b5a5f..ff138e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -148,7 +148,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { blockDeletingService = new SCMBlockDeletingService( deletedBlockLog, containerManager, nodeManager, svcInterval, - serviceTimeout); + serviceTimeout, conf); } /** @@ -525,4 +525,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // factor. Hence returning 0 for now. // containers.get(OzoneProtos.LifeCycleState.OPEN).size(); } + + @Override + public SCMBlockDeletingService getSCMBlockDeletingService() { + return this.blockDeletingService; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java new file mode 100644 index 0000000..31c61dd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.block; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.scm.container.Mapping; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; + +import com.google.common.collect.ArrayListMultimap; + +/** + * A wrapper class to hold info about datanode and all deleted block + * transactions that will be sent to this datanode. + */ +public class DatanodeDeletedBlockTransactions { + private int nodeNum; + // The throttle size for each datanode. + private int maximumAllowedTXNum; + // Current counter of inserted TX. + private int currentTXNum; + private Mapping mappingService; + // A list of TXs mapped to a certain datanode ID. + private final ArrayListMultimap<DatanodeID, DeletedBlocksTransaction> + transactions; + + DatanodeDeletedBlockTransactions(Mapping mappingService, + int maximumAllowedTXNum, int nodeNum) { + this.transactions = ArrayListMultimap.create(); + this.mappingService = mappingService; + this.maximumAllowedTXNum = maximumAllowedTXNum; + this.nodeNum = nodeNum; + } + + public void addTransaction(DeletedBlocksTransaction tx) throws IOException { + ContainerInfo info = null; + try { + info = mappingService.getContainer(tx.getContainerName()); + } catch (IOException e) { + SCMBlockDeletingService.LOG.warn("Got container info error.", e); + } + + if (info == null) { + SCMBlockDeletingService.LOG.warn( + "Container {} not found, continue to process next", + tx.getContainerName()); + return; + } + + for (DatanodeID dnID : info.getPipeline().getMachines()) { + if (transactions.containsKey(dnID)) { + List<DeletedBlocksTransaction> txs = transactions.get(dnID); + if (txs != null && txs.size() < maximumAllowedTXNum) { + boolean hasContained = false; + for (DeletedBlocksTransaction t : txs) { + if (t.getContainerName().equals(tx.getContainerName())) { + hasContained = true; + break; + } + } + + if (!hasContained) { + txs.add(tx); + currentTXNum++; + } + } + } else { + currentTXNum++; + transactions.put(dnID, tx); + } + SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID, + tx.getTxID()); + } + } + + Set<DatanodeID> getDatanodes() { + return transactions.keySet(); + } + + boolean isEmpty() { + return transactions.isEmpty(); + } + + boolean hasTransactions(DatanodeID dnID) { + return transactions.containsKey(dnID) && !transactions.get(dnID).isEmpty(); + } + + List<DeletedBlocksTransaction> getDatanodeTransactions( + DatanodeID dnID) { + return transactions.get(dnID); + } + + List<String> getTransactionIDList(DatanodeID dnID) { + if (hasTransactions(dnID)) { + return transactions.get(dnID).stream() + .map(DeletedBlocksTransaction::getTxID).map(String::valueOf) + .collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + boolean isFull() { + return currentTXNum >= maximumAllowedTXNum * nodeNum; + } + + int getTXNum() { + return currentTXNum; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java index bcbbe15..d8af853 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java @@ -46,6 +46,16 @@ public interface DeletedBlockLog extends Closeable { throws IOException; /** + * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions. + * Once DatanodeDeletedBlockTransactions is full, the scan behavior will + * stop. + * @param transactions a list of TXs will be set into. + * @throws IOException + */ + void getTransactions(DatanodeDeletedBlockTransactions transactions) + throws IOException; + + /** * Return all failed transactions in the log. A transaction is considered * to be failed if it has been sent more than MAX_RETRY limit and its * count is reset to -1. http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java index e7e92d1..d14da62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java @@ -326,4 +326,26 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { deletedStore.close(); } } + + @Override + public void getTransactions(DatanodeDeletedBlockTransactions transactions) + throws IOException { + lock.lock(); + try { + deletedStore.iterate(null, (key, value) -> { + if (!Arrays.equals(LATEST_TXID, key)) { + DeletedBlocksTransaction block = DeletedBlocksTransaction + .parseFrom(value); + + if (block.getCount() > -1 && block.getCount() <= maxRetry) { + transactions.addTransaction(block); + } + return !transactions.isFull(); + } + return true; + }); + } finally { + lock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java index 3058b1e..a723c2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java @@ -16,14 +16,16 @@ */ package org.apache.hadoop.ozone.scm.block; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.ozone.scm.container.Mapping; import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BackgroundService; import org.apache.hadoop.utils.BackgroundTask; @@ -32,13 +34,12 @@ import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; + import java.io.IOException; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * A background service running in SCM to delete blocks. This service scans @@ -49,7 +50,7 @@ import java.util.stream.Collectors; */ public class SCMBlockDeletingService extends BackgroundService { - private static final Logger LOG = + static final Logger LOG = LoggerFactory.getLogger(SCMBlockDeletingService.class); // ThreadPoolSize=2, 1 for scheduler and the other for the scanner. @@ -58,28 +59,36 @@ public class SCMBlockDeletingService extends BackgroundService { private final Mapping mappingService; private final NodeManager nodeManager; - // Default container size is 5G and block size is 256MB, a full container - // at most contains 20 blocks. At most each TX contains 20 blocks. - // When SCM sends block deletion TXs to datanode, each command we allow - // at most 50 containers so that will limit number of to be deleted blocks - // less than 1000. - // TODO - a better throttle algorithm - // Note, this is not an accurate limit of blocks. When we scan - // the log, worst case we may get 50 TX for 50 different datanodes, - // that will cause the deletion message sent by SCM extremely small. - // As a result, the deletion will be slow. An improvement is to scan - // log multiple times until we get enough TXs for each datanode, or - // the entire log is scanned. - private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50; + // Block delete limit size is dynamically calculated based on container + // delete limit size (ozone.block.deleting.container.limit.per.interval) + // that configured for datanode. To ensure DN not wait for + // delete commands, we use this value multiply by a factor 2 as the final + // limit TX size for each node. + // Currently we implement a throttle algorithm that throttling delete blocks + // for each datanode. Each node is limited by the calculation size. Firstly + // current node info is fetched from nodemanager, then scan entire delLog + // from the beginning to end. If one node reaches maximum value, its records + // will be skipped. If not, keep scanning until it reaches maximum value. + // Once all node are full, the scan behavior will stop. + private int blockDeleteLimitSize; public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, Mapping mapper, NodeManager nodeManager, - int interval, long serviceTimeout) { + int interval, long serviceTimeout, Configuration conf) { super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); this.deletedBlockLog = deletedBlockLog; this.mappingService = mapper; this.nodeManager = nodeManager; + + int containerLimit = conf.getInt( + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); + Preconditions.checkArgument(containerLimit > 0, + "Container limit size should be " + "positive."); + // Use container limit value multiply by a factor 2 to ensure DN + // not wait for orders. + this.blockDeleteLimitSize = containerLimit * 2; } @Override @@ -104,126 +113,60 @@ public class SCMBlockDeletingService extends BackgroundService { // Scan SCM DB in HB interval and collect a throttled list of // to delete blocks. LOG.debug("Running DeletedBlockTransactionScanner"); - DatanodeDeletedBlockTransactions transactions = - getToDeleteContainerBlocks(); + DatanodeDeletedBlockTransactions transactions = null; + List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + if (datanodes != null) { + transactions = new DatanodeDeletedBlockTransactions(mappingService, + blockDeleteLimitSize, datanodes.size()); + try { + deletedBlockLog.getTransactions(transactions); + } catch (IOException e) { + // We may tolerant a number of failures for sometime + // but if it continues to fail, at some point we need to raise + // an exception and probably fail the SCM ? At present, it simply + // continues to retry the scanning. + LOG.error("Failed to get block deletion transactions from delTX log", + e); + } + LOG.debug("Scanned deleted blocks log and got {} delTX to process.", + transactions.getTXNum()); + } + if (transactions != null && !transactions.isEmpty()) { for (DatanodeID datanodeID : transactions.getDatanodes()) { List<DeletedBlocksTransaction> dnTXs = transactions .getDatanodeTransactions(datanodeID); - dnTxCount += dnTXs.size(); - // TODO commandQueue needs a cap. - // We should stop caching new commands if num of un-processed - // command is bigger than a limit, e.g 50. In case datanode goes - // offline for sometime, the cached commands be flooded. - nodeManager.addDatanodeCommand(datanodeID, - new DeleteBlocksCommand(dnTXs)); - LOG.debug( - "Added delete block command for datanode {} in the queue," - + " number of delete block transactions: {}, TxID list: {}", - datanodeID, dnTXs.size(), - String.join(",", transactions.getTransactionIDList(datanodeID))); + if (dnTXs != null && !dnTXs.isEmpty()) { + dnTxCount += dnTXs.size(); + // TODO commandQueue needs a cap. + // We should stop caching new commands if num of un-processed + // command is bigger than a limit, e.g 50. In case datanode goes + // offline for sometime, the cached commands be flooded. + nodeManager.addDatanodeCommand(datanodeID, + new DeleteBlocksCommand(dnTXs)); + LOG.debug( + "Added delete block command for datanode {} in the queue," + + " number of delete block transactions: {}, TxID list: {}", + datanodeID, dnTXs.size(), String.join(",", + transactions.getTransactionIDList(datanodeID))); + } } } if (dnTxCount > 0) { - LOG.info("Totally added {} delete blocks command for" - + " {} datanodes, task elapsed time: {}ms", + LOG.info( + "Totally added {} delete blocks command for" + + " {} datanodes, task elapsed time: {}ms", dnTxCount, transactions.getDatanodes().size(), Time.monotonicNow() - startTime); } return EmptyTaskResult.newResult(); } - - // Scan deleteBlocks.db to get a number of to-delete blocks. - // this is going to be properly throttled. - private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() { - DatanodeDeletedBlockTransactions dnTXs = - new DatanodeDeletedBlockTransactions(); - List<DeletedBlocksTransaction> txs = null; - try { - // Get a limited number of TXs to send via HB at a time. - txs = deletedBlockLog - .getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT); - LOG.debug("Scanned deleted blocks log and got {} delTX to process", - txs.size()); - } catch (IOException e) { - // We may tolerant a number of failures for sometime - // but if it continues to fail, at some point we need to raise - // an exception and probably fail the SCM ? At present, it simply - // continues to retry the scanning. - LOG.error("Failed to get block deletion transactions from delTX log", - e); - } - - if (txs != null) { - for (DeletedBlocksTransaction tx : txs) { - try { - ContainerInfo info = mappingService - .getContainer(tx.getContainerName()); - // Find out the datanode where this TX is supposed to send to. - info.getPipeline().getMachines() - .forEach(entry -> dnTXs.addTransaction(entry, tx)); - } catch (IOException e) { - LOG.warn("Container {} not found, continue to process next", - tx.getContainerName(), e); - } - } - } - return dnTXs; - } } - /** - * A wrapper class to hold info about datanode and all deleted block - * transactions that will be sent to this datanode. - */ - private static class DatanodeDeletedBlockTransactions { - - // A list of TXs mapped to a certain datanode ID. - private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions; - - DatanodeDeletedBlockTransactions() { - this.transactions = Maps.newHashMap(); - } - - void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) { - if (transactions.containsKey(dnID)) { - transactions.get(dnID).add(tx); - } else { - List<DeletedBlocksTransaction> first = Lists.newArrayList(); - first.add(tx); - transactions.put(dnID, first); - } - LOG.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID()); - } - - Set<DatanodeID> getDatanodes() { - return transactions.keySet(); - } - - boolean isEmpty() { - return transactions.isEmpty(); - } - - boolean hasTransactions(DatanodeID dnID) { - return transactions.containsKey(dnID) && - !transactions.get(dnID).isEmpty(); - } - - List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) { - return transactions.get(dnID); - } - - List<String> getTransactionIDList(DatanodeID dnID) { - if (hasTransactions(dnID)) { - return transactions.get(dnID).stream() - .map(DeletedBlocksTransaction::getTxID) - .map(String::valueOf) - .collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } - } + @VisibleForTesting + public void setBlockDeleteTXNum(int numTXs) { + blockDeleteLimitSize = numTXs; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 6e6cc8d..251b949 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -22,9 +22,17 @@ import java.io.IOException; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ozone.scm.StorageContainerManager; import org.apache.hadoop.ozone.scm.block.DeletedBlockLog; +import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService; +import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.junit.Rule; @@ -166,11 +174,17 @@ public class TestStorageContainerManager { @Test public void testBlockDeletionTransactions() throws Exception { + int numKeys = 5; OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5); conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000); conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5); conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000); + // Reset container provision size, otherwise only one container + // is created by default. + conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, + numKeys); + MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1) .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); @@ -180,48 +194,14 @@ public class TestStorageContainerManager { .getScmBlockManager().getDeletedBlockLog(); Assert.assertEquals(0, delLog.getNumOfValidTransactions()); - // Create 20 random names keys. + // Create {numKeys} random names keys. TestStorageContainerManagerHelper helper = new TestStorageContainerManagerHelper(cluster, conf); - Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096); - - // These keys will be written into a bunch of containers, - // gets a set of container names, verify container containerBlocks - // on datanodes. - Set<String> containerNames = new HashSet<>(); - for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) { - entry.getValue().getKeyLocationList() - .forEach(loc -> containerNames.add(loc.getContainerName())); - } + Map<String, KsmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096); - // Total number of containerBlocks of these containers should be equal to - // total number of containerBlocks via creation call. - int totalCreatedBlocks = 0; - for (KsmKeyInfo info : keyLocations.values()) { - totalCreatedBlocks += info.getKeyLocationList().size(); - } - Assert.assertTrue(totalCreatedBlocks > 0); - Assert.assertEquals(totalCreatedBlocks, - helper.getAllBlocks(containerNames).size()); - - // Create a deletion TX for each key. - Map<String, List<String>> containerBlocks = Maps.newHashMap(); - for (KsmKeyInfo info : keyLocations.values()) { - List<KsmKeyLocationInfo> list = info.getKeyLocationList(); - list.forEach(location -> { - if (containerBlocks.containsKey(location.getContainerName())) { - containerBlocks.get(location.getContainerName()) - .add(location.getBlockID()); - } else { - List<String> blks = Lists.newArrayList(); - blks.add(location.getBlockID()); - containerBlocks.put(location.getContainerName(), blks); - } - }); - } - for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) { - delLog.addTransaction(tx.getKey(), tx.getValue()); - } + Map<String, List<String>> containerBlocks = createDeleteTXLog(delLog, + keyLocations, helper); + Set<String> containerNames = containerBlocks.keySet(); // Verify a few TX gets created in the TX log. Assert.assertTrue(delLog.getNumOfValidTransactions() > 0); @@ -268,4 +248,105 @@ public class TestStorageContainerManager { } } } -} + + @Test + public void testBlockDeletingThrottling() throws Exception { + int numKeys = 15; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5); + conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000); + conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5); + conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000); + conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, + numKeys); + + MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf) + .numDataNodes(1).setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .build(); + + DeletedBlockLog delLog = cluster.getStorageContainerManager() + .getScmBlockManager().getDeletedBlockLog(); + Assert.assertEquals(0, delLog.getNumOfValidTransactions()); + + int limitSize = 1; + // Reset limit value to 1, so that we only allow one TX is dealt per + // datanode. + SCMBlockDeletingService delService = cluster.getStorageContainerManager() + .getScmBlockManager().getSCMBlockDeletingService(); + delService.setBlockDeleteTXNum(limitSize); + + // Create {numKeys} random names keys. + TestStorageContainerManagerHelper helper = + new TestStorageContainerManagerHelper(cluster, conf); + Map<String, KsmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096); + + createDeleteTXLog(delLog, keyLocations, helper); + // Verify a few TX gets created in the TX log. + Assert.assertTrue(delLog.getNumOfValidTransactions() > 0); + + // Verify the size in delete commands is expected. + GenericTestUtils.waitFor(() -> { + NodeManager nodeManager = cluster.getStorageContainerManager() + .getScmNodeManager(); + ReportState reportState = ReportState.newBuilder() + .setState(ReportState.states.noContainerReports).setCount(0).build(); + List<SCMCommand> commands = nodeManager.sendHeartbeat( + nodeManager.getNodes(NodeState.HEALTHY).get(0), null, reportState); + + if (commands != null) { + for (SCMCommand cmd : commands) { + if (cmd.getType() == Type.deleteBlocksCommand) { + List<DeletedBlocksTransaction> deletedTXs = + ((DeleteBlocksCommand) cmd).blocksTobeDeleted(); + return deletedTXs != null && deletedTXs.size() == limitSize; + } + } + } + return false; + }, 500, 10000); + } + + private Map<String, List<String>> createDeleteTXLog(DeletedBlockLog delLog, + Map<String, KsmKeyInfo> keyLocations, + TestStorageContainerManagerHelper helper) throws IOException { + // These keys will be written into a bunch of containers, + // gets a set of container names, verify container containerBlocks + // on datanodes. + Set<String> containerNames = new HashSet<>(); + for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) { + entry.getValue().getKeyLocationList() + .forEach(loc -> containerNames.add(loc.getContainerName())); + } + + // Total number of containerBlocks of these containers should be equal to + // total number of containerBlocks via creation call. + int totalCreatedBlocks = 0; + for (KsmKeyInfo info : keyLocations.values()) { + totalCreatedBlocks += info.getKeyLocationList().size(); + } + Assert.assertTrue(totalCreatedBlocks > 0); + Assert.assertEquals(totalCreatedBlocks, + helper.getAllBlocks(containerNames).size()); + + // Create a deletion TX for each key. + Map<String, List<String>> containerBlocks = Maps.newHashMap(); + for (KsmKeyInfo info : keyLocations.values()) { + List<KsmKeyLocationInfo> list = info.getKeyLocationList(); + list.forEach(location -> { + if (containerBlocks.containsKey(location.getContainerName())) { + containerBlocks.get(location.getContainerName()) + .add(location.getBlockID()); + } else { + List<String> blks = Lists.newArrayList(); + blks.add(location.getBlockID()); + containerBlocks.put(location.getContainerName(), blks); + } + }); + } + for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) { + delLog.addTransaction(tx.getKey(), tx.getValue()); + } + + return containerBlocks; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java index 0aea9b8..c4006e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java @@ -19,9 +19,14 @@ package org.apache.hadoop.ozone.scm.block; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.scm.container.ContainerMapping; +import org.apache.hadoop.ozone.scm.container.Mapping; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataStore; @@ -29,11 +34,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; @@ -42,6 +50,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; +import static org.mockito.Mockito.mock; /** * Tests for DeletedBlockLog. @@ -237,4 +246,97 @@ public class TestDeletedBlockLog { blocks = deletedBlockLog.getTransactions(10); Assert.assertEquals(10, blocks.size()); } + + @Test + public void testDeletedBlockTransactions() throws IOException { + int txNum = 10; + int maximumAllowedTXNum = 5; + List<DeletedBlocksTransaction> blocks = null; + List<String> containerNames = new LinkedList<>(); + + int count = 0; + String containerName = null; + DatanodeID dnID1 = new DatanodeID(null, null, "node1", 0, 0, 0, 0); + DatanodeID dnID2 = new DatanodeID(null, null, "node2", 0, 0, 0, 0); + Mapping mappingService = mock(ContainerMapping.class); + // Creates {TXNum} TX in the log. + for (Map.Entry<String, List<String>> entry : generateData(txNum) + .entrySet()) { + count++; + containerName = entry.getKey(); + containerNames.add(containerName); + deletedBlockLog.addTransaction(containerName, entry.getValue()); + + // make TX[1-6] for datanode1; TX[7-10] for datanode2 + if (count <= (maximumAllowedTXNum + 1)) { + mockContainerInfo(mappingService, containerName, dnID1); + } else { + mockContainerInfo(mappingService, containerName, dnID2); + } + } + + DatanodeDeletedBlockTransactions transactions = + new DatanodeDeletedBlockTransactions(mappingService, + maximumAllowedTXNum, 2); + deletedBlockLog.getTransactions(transactions); + + List<Long> txIDs = new LinkedList<>(); + for (DatanodeID dnID : transactions.getDatanodes()) { + List<DeletedBlocksTransaction> txs = transactions + .getDatanodeTransactions(dnID); + for (DeletedBlocksTransaction tx : txs) { + txIDs.add(tx.getTxID()); + } + } + + // delete TX ID + deletedBlockLog.commitTransactions(txIDs); + blocks = deletedBlockLog.getTransactions(txNum); + // There should be one block remained since dnID1 reaches + // the maximum value (5). + Assert.assertEquals(1, blocks.size()); + + Assert.assertFalse(transactions.isFull()); + // The number of TX in dnID1 won't more than maximum value. + Assert.assertEquals(maximumAllowedTXNum, + transactions.getDatanodeTransactions(dnID1).size()); + + int size = transactions.getDatanodeTransactions(dnID2).size(); + // add duplicated container in dnID2, this should be failed. + DeletedBlocksTransaction.Builder builder = + DeletedBlocksTransaction.newBuilder(); + builder.setTxID(11); + builder.setContainerName(containerName); + builder.setCount(0); + transactions.addTransaction(builder.build()); + + // The number of TX in dnID2 should not be changed. + Assert.assertEquals(size, + transactions.getDatanodeTransactions(dnID2).size()); + + // Add new TX in dnID2, then dnID2 will reach maximum value. + containerName = "newContainer"; + builder = DeletedBlocksTransaction.newBuilder(); + builder.setTxID(12); + builder.setContainerName(containerName); + builder.setCount(0); + mockContainerInfo(mappingService, containerName, dnID2); + transactions.addTransaction(builder.build()); + // Since all node are full, then transactions is full. + Assert.assertTrue(transactions.isFull()); + } + + private void mockContainerInfo(Mapping mappingService, String containerName, + DatanodeID dnID) throws IOException { + Pipeline pipeline = new Pipeline("fake"); + pipeline.addMember(dnID); + + ContainerInfo.Builder builder = new ContainerInfo.Builder(); + builder.setPipeline(pipeline); + builder.setContainerName(containerName); + + ContainerInfo conatinerInfo = builder.build(); + Mockito.doReturn(conatinerInfo).when(mappingService) + .getContainer(containerName); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
