This is an automated email from the ASF dual-hosted git repository. ljain pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 431e9097b4d0e7f3d5de04ee86cc8d4ba0c05e30 Author: Aryan Gupta <[email protected]> AuthorDate: Wed Jan 6 20:15:44 2021 +0530 HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702) --- .../java/org/apache/hadoop/ozone/OzoneConsts.java | 6 +- .../commandhandler/DeleteBlocksCommandHandler.java | 191 ++++++++------ .../background/BlockDeletingService.java | 171 +++++++++++-- .../metadata/AbstractDatanodeDBDefinition.java | 2 +- .../metadata/DatanodeSchemaOneDBDefinition.java | 5 + .../metadata/DatanodeSchemaTwoDBDefinition.java | 32 ++- .../metadata/DatanodeStoreSchemaTwoImpl.java | 14 +- ...mpl.java => DeletedBlocksTransactionCodec.java} | 35 +-- .../container/common/TestBlockDeletingService.java | 279 +++++++++++++++++---- .../hadoop/ozone/TestStorageContainerManager.java | 13 +- .../ozone/TestStorageContainerManagerHelper.java | 30 +++ 11 files changed, 604 insertions(+), 174 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index d2d6e35..9836452 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -254,11 +254,15 @@ public final class OzoneConsts { // versions, requiring this property to be tracked on a per container basis. // V1: All data in default column family. public static final String SCHEMA_V1 = "1"; - // V2: Metadata, block data, and deleted blocks in their own column families. + // V2: Metadata, block data, and delete transactions in their own + // column families. public static final String SCHEMA_V2 = "2"; // Most recent schema version that all new containers should be created with. public static final String SCHEMA_LATEST = SCHEMA_V2; + public static final String[] SCHEMA_VERSIONS = + new String[] {SCHEMA_V1, SCHEMA_V2}; + // Supported store types. public static final String OZONE = "ozone"; public static final String S3 = "s3"; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 91ab4c9..10e6797 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -42,6 +42,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.metadata.DatanodeStore; +import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus; @@ -59,6 +61,8 @@ import java.util.function.Consumer; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_NOT_FOUND; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; /** * Handle block deletion commands. @@ -116,6 +120,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler { DeleteBlockTransactionResult.newBuilder(); txResultBuilder.setTxID(entry.getTxID()); long containerId = entry.getContainerID(); + int newDeletionBlocks = 0; try { Container cont = containerSet.getContainer(containerId); if (cont == null) { @@ -129,7 +134,16 @@ public class DeleteBlocksCommandHandler implements CommandHandler { cont.getContainerData(); cont.writeLock(); try { - deleteKeyValueContainerBlocks(containerData, entry); + if (containerData.getSchemaVersion().equals(SCHEMA_V1)) { + markBlocksForDeletionSchemaV1(containerData, entry); + } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) { + markBlocksForDeletionSchemaV2(containerData, entry, + newDeletionBlocks, entry.getTxID()); + } else { + throw new UnsupportedOperationException( + "Only schema version 1 and schema version 2 are " + + "supported."); + } } finally { cont.writeUnlock(); } @@ -187,107 +201,140 @@ public class DeleteBlocksCommandHandler implements CommandHandler { * @param delTX a block deletion transaction. * @throws IOException if I/O error occurs. */ - private void deleteKeyValueContainerBlocks( - KeyValueContainerData containerData, DeletedBlocksTransaction delTX) - throws IOException { + + private void markBlocksForDeletionSchemaV2( + KeyValueContainerData containerData, DeletedBlocksTransaction delTX, + int newDeletionBlocks, long txnID) throws IOException { long containerId = delTX.getContainerID(); - if (LOG.isDebugEnabled()) { - LOG.debug("Processing Container : {}, DB path : {}", containerId, - containerData.getMetadataPath()); + if (!isTxnIdValid(containerId, containerData, delTX)) { + return; } - - if (delTX.getTxID() < containerData.getDeleteTransactionId()) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Ignoring delete blocks for containerId: %d." - + " Outdated delete transactionId %d < %d", containerId, - delTX.getTxID(), containerData.getDeleteTransactionId())); + try (ReferenceCountedDB containerDB = BlockUtils + .getDB(containerData, conf)) { + DatanodeStore ds = containerDB.getStore(); + DatanodeStoreSchemaTwoImpl dnStoreTwoImpl = + (DatanodeStoreSchemaTwoImpl) ds; + Table<Long, DeletedBlocksTransaction> delTxTable = + dnStoreTwoImpl.getDeleteTransactionTable(); + try (BatchOperation batch = containerDB.getStore().getBatchHandler() + .initBatchOperation()) { + delTxTable.putWithBatch(batch, txnID, delTX); + newDeletionBlocks += delTX.getLocalIDList().size(); + updateMetaData(containerData, delTX, newDeletionBlocks, containerDB, + batch); + containerDB.getStore().getBatchHandler().commitBatchOperation(batch); } - return; } + } + private void markBlocksForDeletionSchemaV1( + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) + throws IOException { + long containerId = delTX.getContainerID(); + if (!isTxnIdValid(containerId, containerData, delTX)) { + return; + } int newDeletionBlocks = 0; - try(ReferenceCountedDB containerDB = - BlockUtils.getDB(containerData, conf)) { + try (ReferenceCountedDB containerDB = BlockUtils + .getDB(containerData, conf)) { Table<String, BlockData> blockDataTable = - containerDB.getStore().getBlockDataTable(); + containerDB.getStore().getBlockDataTable(); Table<String, ChunkInfoList> deletedBlocksTable = - containerDB.getStore().getDeletedBlocksTable(); + containerDB.getStore().getDeletedBlocksTable(); - for (Long blkLong : delTX.getLocalIDList()) { - String blk = blkLong.toString(); - BlockData blkInfo = blockDataTable.get(blk); - if (blkInfo != null) { - String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk; - - if (blockDataTable.get(deletingKey) != null - || deletedBlocksTable.get(blk) != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Ignoring delete for block %s in container %d." - + " Entry already added.", blk, containerId)); + try (BatchOperation batch = containerDB.getStore().getBatchHandler() + .initBatchOperation()) { + for (Long blkLong : delTX.getLocalIDList()) { + String blk = blkLong.toString(); + BlockData blkInfo = blockDataTable.get(blk); + if (blkInfo != null) { + String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk; + if (blockDataTable.get(deletingKey) != null + || deletedBlocksTable.get(blk) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Ignoring delete for block %s in container %d." + + " Entry already added.", blk, containerId)); + } + continue; } - continue; - } - - try(BatchOperation batch = containerDB.getStore() - .getBatchHandler().initBatchOperation()) { // Found the block in container db, // use an atomic update to change its state to deleting. blockDataTable.putWithBatch(batch, deletingKey, blkInfo); blockDataTable.deleteWithBatch(batch, blk); - containerDB.getStore().getBatchHandler() - .commitBatchOperation(batch); newDeletionBlocks++; if (LOG.isDebugEnabled()) { LOG.debug("Transited Block {} to DELETING state in container {}", blk, containerId); } - } catch (IOException e) { - // if some blocks failed to delete, we fail this TX, - // without sending this ACK to SCM, SCM will resend the TX - // with a certain number of retries. - throw new IOException( - "Failed to delete blocks for TXID = " + delTX.getTxID(), e); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Block {} not found or already under deletion in" - + " container {}, skip deleting it.", blk, containerId); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Block {} not found or already under deletion in" + + " container {}, skip deleting it.", blk, containerId); + } } } + updateMetaData(containerData, delTX, newDeletionBlocks, containerDB, + batch); + containerDB.getStore().getBatchHandler().commitBatchOperation(batch); + } catch (IOException e) { + // if some blocks failed to delete, we fail this TX, + // without sending this ACK to SCM, SCM will resend the TX + // with a certain number of retries. + throw new IOException( + "Failed to delete blocks for TXID = " + delTX.getTxID(), e); } + } + } - if (newDeletionBlocks > 0) { - // Finally commit the DB counters. - try(BatchOperation batchOperation = - containerDB.getStore().getBatchHandler().initBatchOperation()) { - Table< String, Long > metadataTable = containerDB.getStore() - .getMetadataTable(); + private void updateMetaData(KeyValueContainerData containerData, + DeletedBlocksTransaction delTX, int newDeletionBlocks, + ReferenceCountedDB containerDB, BatchOperation batchOperation) + throws IOException { + if (newDeletionBlocks > 0) { + // Finally commit the DB counters. + Table<String, Long> metadataTable = + containerDB.getStore().getMetadataTable(); - // In memory is updated only when existing delete transactionID is - // greater. - if (delTX.getTxID() > containerData.getDeleteTransactionId()) { - // Update in DB pending delete key count and delete transaction ID. - metadataTable.putWithBatch(batchOperation, - OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID()); - } + // In memory is updated only when existing delete transactionID is + // greater. + if (delTX.getTxID() > containerData.getDeleteTransactionId()) { + // Update in DB pending delete key count and delete transaction ID. + metadataTable + .putWithBatch(batchOperation, OzoneConsts.DELETE_TRANSACTION_KEY, + delTX.getTxID()); + } - long pendingDeleteBlocks = - containerData.getNumPendingDeletionBlocks() + newDeletionBlocks; - metadataTable.putWithBatch(batchOperation, - OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks); + long pendingDeleteBlocks = + containerData.getNumPendingDeletionBlocks() + newDeletionBlocks; + metadataTable + .putWithBatch(batchOperation, OzoneConsts.PENDING_DELETE_BLOCK_COUNT, + pendingDeleteBlocks); - containerDB.getStore().getBatchHandler() - .commitBatchOperation(batchOperation); + // update pending deletion blocks count and delete transaction ID in + // in-memory container status + containerData.updateDeleteTransactionId(delTX.getTxID()); + containerData.incrPendingDeletionBlocks(newDeletionBlocks); + } + } - // update pending deletion blocks count and delete transaction ID in - // in-memory container status - containerData.updateDeleteTransactionId(delTX.getTxID()); + private boolean isTxnIdValid(long containerId, + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) { + boolean b = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Processing Container : {}, DB path : {}", containerId, + containerData.getMetadataPath()); + } - containerData.incrPendingDeletionBlocks(newDeletionBlocks); - } + if (delTX.getTxID() < containerData.getDeleteTransactionId()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Ignoring delete blocks for containerId: %d." + + " Outdated delete transactionId %d < %d", containerId, + delTX.getTxID(), containerData.getDeleteTransactionId())); } + b = false; } + return b; } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java index b03b7d7..3dab1fa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java @@ -20,11 +20,12 @@ package org.apache.hadoop.ozone.container.keyvalue.statemachine.background; import java.io.File; import java.io.IOException; +import java.util.UUID; import java.util.LinkedList; +import java.util.Objects; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -32,14 +33,15 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.utils.BackgroundService; -import org.apache.hadoop.hdds.utils.BackgroundTask; -import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy; @@ -50,14 +52,22 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.metadata.DatanodeStore; +import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.Time; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import com.google.common.collect.Lists; + import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; + import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +76,7 @@ import org.slf4j.LoggerFactory; * A per-datanode container block deleting service takes in charge * of deleting staled ozone blocks. */ -// TODO: Fix BlockDeletingService to work with new StorageLayer + public class BlockDeletingService extends BackgroundService { private static final Logger LOG = @@ -244,21 +254,54 @@ public class BlockDeletingService extends BackgroundService { @Override public BackgroundTaskResult call() throws Exception { - ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); + ContainerBackgroundTaskResult crr; final Container container = ozoneContainer.getContainerSet() .getContainer(containerData.getContainerID()); container.writeLock(); + File dataDir = new File(containerData.getChunksPath()); long startTime = Time.monotonicNow(); // Scan container's db and get list of under deletion blocks try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) { + if (containerData.getSchemaVersion().equals(SCHEMA_V1)) { + crr = deleteViaSchema1(meta, container, dataDir, startTime); + } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) { + crr = deleteViaSchema2(meta, container, dataDir, startTime); + } else { + throw new UnsupportedOperationException( + "Only schema version 1 and schema version 2 are supported."); + } + return crr; + } finally { + container.writeUnlock(); + } + } + + public boolean checkDataDir(File dataDir) { + boolean b = true; + if (!dataDir.exists() || !dataDir.isDirectory()) { + LOG.error("Invalid container data dir {} : " + + "does not exist or not a directory", dataDir.getAbsolutePath()); + b = false; + } + return b; + } + + public ContainerBackgroundTaskResult deleteViaSchema1( + ReferenceCountedDB meta, Container container, File dataDir, + long startTime) throws IOException { + ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); + if (!checkDataDir(dataDir)) { + return crr; + } + try { Table<String, BlockData> blockDataTable = - meta.getStore().getBlockDataTable(); + meta.getStore().getBlockDataTable(); // # of blocks to delete is throttled KeyPrefixFilter filter = MetadataKeyFilters.getDeletingKeyFilter(); List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks = blockDataTable.getSequentialRangeKVs(null, blockLimitPerTask, - filter); + filter); if (toDeleteBlocks.isEmpty()) { LOG.debug("No under deletion block found in container : {}", containerData.getContainerID()); @@ -267,12 +310,6 @@ public class BlockDeletingService extends BackgroundService { List<String> succeedBlocks = new LinkedList<>(); LOG.debug("Container : {}, To-Delete blocks : {}", containerData.getContainerID(), toDeleteBlocks.size()); - File dataDir = new File(containerData.getChunksPath()); - if (!dataDir.exists() || !dataDir.isDirectory()) { - LOG.error("Invalid container data dir {} : " - + "does not exist or not a directory", dataDir.getAbsolutePath()); - return crr; - } Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher() .getHandler(container.getContainerType())); @@ -292,7 +329,7 @@ public class BlockDeletingService extends BackgroundService { // Once blocks are deleted... remove the blockID from blockDataTable. try(BatchOperation batch = meta.getStore().getBatchHandler() - .initBatchOperation()) { + .initBatchOperation()) { for (String entry : succeedBlocks) { blockDataTable.deleteWithBatch(batch, entry); } @@ -312,8 +349,106 @@ public class BlockDeletingService extends BackgroundService { } crr.addAll(succeedBlocks); return crr; - } finally { - container.writeUnlock(); + } catch (IOException exception) { + LOG.warn( + "Deletion operation was not successful for container: " + container + .getContainerData().getContainerID(), exception); + throw exception; + } + } + + public ContainerBackgroundTaskResult deleteViaSchema2( + ReferenceCountedDB meta, Container container, File dataDir, + long startTime) throws IOException { + ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); + if (!checkDataDir(dataDir)) { + return crr; + } + try { + Table<String, BlockData> blockDataTable = + meta.getStore().getBlockDataTable(); + DatanodeStore ds = meta.getStore(); + DatanodeStoreSchemaTwoImpl dnStoreTwoImpl = + (DatanodeStoreSchemaTwoImpl) ds; + Table<Long, DeletedBlocksTransaction> + deleteTxns = dnStoreTwoImpl.getDeleteTransactionTable(); + List<DeletedBlocksTransaction> delBlocks = new ArrayList<>(); + int totalBlocks = 0; + try (TableIterator<Long, + ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter = + dnStoreTwoImpl.getDeleteTransactionTable().iterator()) { + while (iter.hasNext() && (totalBlocks < blockLimitPerTask)) { + DeletedBlocksTransaction delTx = iter.next().getValue(); + totalBlocks += delTx.getLocalIDList().size(); + delBlocks.add(delTx); + } + } + + if (delBlocks.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No transaction found in container : {}", + containerData.getContainerID()); + } + return crr; + } + + LOG.debug("Container : {}, To-Delete blocks : {}", + containerData.getContainerID(), delBlocks.size()); + + Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher() + .getHandler(container.getContainerType())); + + deleteTransactions(delBlocks, handler, blockDataTable, container); + + // Once blocks are deleted... remove the blockID from blockDataTable + // and also remove the transactions from txnTable. + try(BatchOperation batch = meta.getStore().getBatchHandler() + .initBatchOperation()) { + for (DeletedBlocksTransaction delTx : delBlocks) { + deleteTxns.deleteWithBatch(batch, delTx.getTxID()); + for (Long blk : delTx.getLocalIDList()) { + String bID = blk.toString(); + meta.getStore().getBlockDataTable().deleteWithBatch(batch, bID); + } + } + meta.getStore().getBatchHandler().commitBatchOperation(batch); + containerData.updateAndCommitDBCounters(meta, batch, + totalBlocks); + // update count of pending deletion blocks and block count in + // in-memory container status. + containerData.decrPendingDeletionBlocks(totalBlocks); + containerData.decrKeyCount(totalBlocks); + } + + LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", + containerData.getContainerID(), totalBlocks, + Time.monotonicNow() - startTime); + + return crr; + } catch (IOException exception) { + LOG.warn( + "Deletion operation was not successful for container: " + container + .getContainerData().getContainerID(), exception); + throw exception; + } + } + + private void deleteTransactions(List<DeletedBlocksTransaction> delBlocks, + Handler handler, Table<String, BlockData> blockDataTable, + Container container) throws IOException { + for (DeletedBlocksTransaction entry : delBlocks) { + for (Long blkLong : entry.getLocalIDList()) { + String blk = blkLong.toString(); + BlockData blkInfo = blockDataTable.get(blk); + LOG.debug("Deleting block {}", blk); + try { + handler.deleteBlock(container, blkInfo); + } catch (InvalidProtocolBufferException e) { + LOG.error("Failed to parse block info for block {}", blk, e); + } catch (IOException e) { + LOG.error("Failed to delete files for block {}", blk, e); + } + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index 8895475..2fb1174 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -60,7 +60,7 @@ public abstract class AbstractDatanodeDBDefinition implements DBDefinition { @Override public DBColumnFamilyDefinition[] getColumnFamilies() { return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(), - getMetadataColumnFamily(), getDeletedBlocksColumnFamily()}; + getMetadataColumnFamily(), getDeletedBlocksColumnFamily()}; } public abstract DBColumnFamilyDefinition<String, BlockData> diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java index faf399d..7d5e053 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java @@ -88,4 +88,9 @@ public class DatanodeSchemaOneDBDefinition getDeletedBlocksColumnFamily() { return DELETED_BLOCKS; } + + public DBColumnFamilyDefinition[] getColumnFamilies() { + return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(), + getMetadataColumnFamily(), getDeletedBlocksColumnFamily() }; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 2ac56f2..1fabd13 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.ozone.container.metadata; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; import org.apache.hadoop.hdds.utils.db.LongCodec; import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; /** * This class defines the RocksDB structure for datanodes following schema - * version 2, where the block data, metadata, and deleted block ids are put in - * their own separate column families. + * version 2, where the block data, metadata, and transactions which are to be + * deleted are put in their own separate column families. */ public class DatanodeSchemaTwoDBDefinition extends AbstractDatanodeDBDefinition { @@ -34,7 +37,7 @@ public class DatanodeSchemaTwoDBDefinition extends public static final DBColumnFamilyDefinition<String, BlockData> BLOCK_DATA = new DBColumnFamilyDefinition<>( - "block_data", + "blockData", String.class, new StringCodec(), BlockData.class, @@ -52,17 +55,33 @@ public class DatanodeSchemaTwoDBDefinition extends public static final DBColumnFamilyDefinition<String, ChunkInfoList> DELETED_BLOCKS = new DBColumnFamilyDefinition<>( - "deleted_blocks", + "deletedBlocks", String.class, new StringCodec(), ChunkInfoList.class, new ChunkInfoListCodec()); + public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction> + DELETE_TRANSACTION = + new DBColumnFamilyDefinition<>( + "deleteTxns", + Long.class, + new LongCodec(), + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class, + new DeletedBlocksTransactionCodec()); + protected DatanodeSchemaTwoDBDefinition(String dbPath) { super(dbPath); } @Override + public DBColumnFamilyDefinition[] getColumnFamilies() { + return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(), + getMetadataColumnFamily(), getDeletedBlocksColumnFamily(), + getDeleteTransactionsColumnFamily()}; + } + + @Override public DBColumnFamilyDefinition<String, BlockData> getBlockDataColumnFamily() { return BLOCK_DATA; @@ -78,4 +97,9 @@ public class DatanodeSchemaTwoDBDefinition extends getDeletedBlocksColumnFamily() { return DELETED_BLOCKS; } + + public DBColumnFamilyDefinition<Long, DeletedBlocksTransaction> + getDeleteTransactionsColumnFamily() { + return DELETE_TRANSACTION; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java index df9b8c0..db8fe6b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.ozone.container.metadata; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.utils.db.Table; import java.io.IOException; @@ -26,10 +29,13 @@ import java.io.IOException; * three column families/tables: * 1. A block data table. * 2. A metadata table. - * 3. A deleted blocks table. + * 3. A Delete Transaction Table. */ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore { + private final Table<Long, DeletedBlocksTransaction> + deleteTransactionTable; + /** * Constructs the datanode store and starts the DB Services. * @@ -41,5 +47,11 @@ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore { throws IOException { super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath), openReadOnly); + this.deleteTransactionTable = new DatanodeSchemaTwoDBDefinition(dbPath) + .getDeleteTransactionsColumnFamily().getTable(getStore()); + } + + public Table<Long, DeletedBlocksTransaction> getDeleteTransactionTable() { + return deleteTransactionTable; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java similarity index 54% copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java index df9b8c0..90c26fe 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java @@ -17,29 +17,30 @@ */ package org.apache.hadoop.ozone.container.metadata; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import java.io.IOException; /** - * Constructs a datanode store in accordance with schema version 2, which uses - * three column families/tables: - * 1. A block data table. - * 2. A metadata table. - * 3. A deleted blocks table. + * Supports encoding and decoding {@link DeletedBlocksTransaction} objects. */ -public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore { +public class DeletedBlocksTransactionCodec + implements Codec<DeletedBlocksTransaction> { - /** - * Constructs the datanode store and starts the DB Services. - * - * @param config - Ozone Configuration. - * @throws IOException - on Failure. - */ - public DatanodeStoreSchemaTwoImpl(ConfigurationSource config, - long containerID, String dbPath, boolean openReadOnly) + @Override public byte[] toPersistedFormat( + DeletedBlocksTransaction deletedBlocksTransaction) { + return deletedBlocksTransaction.toByteArray(); + } + + @Override public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData) throws IOException { - super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath), - openReadOnly); + return DeletedBlocksTransaction.parseFrom(rawData); + } + + @Override public DeletedBlocksTransaction copyObject( + DeletedBlocksTransaction deletedBlocksTransaction) { + throw new UnsupportedOperationException(); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index 2eb6a39..96d4228 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -21,9 +21,10 @@ package org.apache.hadoop.ozone.container.common; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,9 +35,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.MutableConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -55,7 +60,6 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; @@ -64,11 +68,14 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy; import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService; +import org.apache.hadoop.ozone.container.metadata.DatanodeStore; +import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; import org.junit.AfterClass; @@ -82,7 +89,11 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTA import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK; +import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -101,16 +112,38 @@ public class TestBlockDeletingService { private static MutableConfigurationSource conf; private final ChunkLayOutVersion layout; + private final String schemaVersion; private int blockLimitPerTask; private static VolumeSet volumeSet; - public TestBlockDeletingService(ChunkLayOutVersion layout) { - this.layout = layout; + public TestBlockDeletingService(LayoutInfo layoutInfo) { + this.layout = layoutInfo.layout; + this.schemaVersion = layoutInfo.schemaVersion; } @Parameterized.Parameters public static Iterable<Object[]> parameters() { - return ChunkLayoutTestInfo.chunkLayoutParameters(); + return LayoutInfo.layoutList.stream().map(each -> new Object[] {each}) + .collect(toList()); + } + + public static class LayoutInfo { + private final String schemaVersion; + private final ChunkLayOutVersion layout; + + public LayoutInfo(String schemaVersion, ChunkLayOutVersion layout) { + this.schemaVersion = schemaVersion; + this.layout = layout; + } + + private static List<LayoutInfo> layoutList = new ArrayList<>(); + static { + for (ChunkLayOutVersion ch : ChunkLayOutVersion.getAllVersions()) { + for (String sch : SCHEMA_VERSIONS) { + layoutList.add(new LayoutInfo(sch, ch)); + } + } + } } @BeforeClass @@ -158,6 +191,7 @@ public class TestBlockDeletingService { } byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8); ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr)); + int txnID = 0; for (int x = 0; x < numOfContainers; x++) { long containerID = ContainerTestHelper.getTestContainerID(); KeyValueContainerData data = @@ -165,55 +199,164 @@ public class TestBlockDeletingService { ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(), datanodeUuid); data.closeContainer(); + data.setSchemaVersion(schemaVersion); KeyValueContainer container = new KeyValueContainer(data, conf); container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId); containerSet.addContainer(container); data = (KeyValueContainerData) containerSet.getContainer( containerID).getContainerData(); - long chunkLength = 100; - try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) { - for (int j = 0; j < numOfBlocksPerContainer; j++) { - BlockID blockID = - ContainerTestHelper.getTestBlockID(containerID); - String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX + - blockID.getLocalID(); - BlockData kd = new BlockData(blockID); - List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList(); - for (int k = 0; k < numOfChunksPerBlock; k++) { - final String chunkName = String.format("block.%d.chunk.%d", j, k); - final long offset = k * chunkLength; - ContainerProtos.ChunkInfo info = - ContainerProtos.ChunkInfo.newBuilder() - .setChunkName(chunkName) - .setLen(chunkLength) - .setOffset(offset) - .setChecksumData(Checksum.getNoChecksumDataProto()) - .build(); - chunks.add(info); - ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength); - ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength); - chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, - WRITE_STAGE); - chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, - COMMIT_STAGE); - } - kd.setChunks(chunks); - metadata.getStore().getBlockDataTable().put( - deleteStateName, kd); - container.getContainerData().incrPendingDeletionBlocks(1); - } - container.getContainerData().setKeyCount(numOfBlocksPerContainer); - // Set block count, bytes used and pending delete block count. - metadata.getStore().getMetadataTable().put( - OzoneConsts.BLOCK_COUNT, (long)numOfBlocksPerContainer); - metadata.getStore().getMetadataTable().put( - OzoneConsts.CONTAINER_BYTES_USED, - chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer); - metadata.getStore().getMetadataTable().put( - OzoneConsts.PENDING_DELETE_BLOCK_COUNT, - (long)numOfBlocksPerContainer); + if (data.getSchemaVersion().equals(SCHEMA_V1)) { + createPendingDeleteBlocksSchema1(numOfBlocksPerContainer, data, + containerID, numOfChunksPerBlock, buffer, chunkManager, container); + } else if (data.getSchemaVersion().equals(SCHEMA_V2)) { + createPendingDeleteBlocksSchema2(numOfBlocksPerContainer, txnID, + containerID, numOfChunksPerBlock, buffer, chunkManager, container, + data); + } else { + throw new UnsupportedOperationException( + "Only schema version 1 and schema version 2 are " + + "supported."); + } + } + } + + @SuppressWarnings("checkstyle:parameternumber") + private void createPendingDeleteBlocksSchema1(int numOfBlocksPerContainer, + KeyValueContainerData data, long containerID, int numOfChunksPerBlock, + ChunkBuffer buffer, ChunkManager chunkManager, + KeyValueContainer container) { + BlockID blockID = null; + try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) { + for (int j = 0; j < numOfBlocksPerContainer; j++) { + blockID = ContainerTestHelper.getTestBlockID(containerID); + String deleteStateName = + OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID(); + BlockData kd = new BlockData(blockID); + List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList(); + putChunksInBlock(numOfChunksPerBlock, j, chunks, buffer, chunkManager, + container, blockID); + kd.setChunks(chunks); + metadata.getStore().getBlockDataTable().put(deleteStateName, kd); + container.getContainerData().incrPendingDeletionBlocks(1); + } + updateMetaData(data, container, numOfBlocksPerContainer, + numOfChunksPerBlock); + } catch (IOException exception) { + LOG.info("Exception " + exception); + LOG.warn("Failed to put block: " + blockID + " in BlockDataTable."); + } + } + + @SuppressWarnings("checkstyle:parameternumber") + private void createPendingDeleteBlocksSchema2(int numOfBlocksPerContainer, + int txnID, long containerID, int numOfChunksPerBlock, ChunkBuffer buffer, + ChunkManager chunkManager, KeyValueContainer container, + KeyValueContainerData data) { + List<Long> containerBlocks = new ArrayList<>(); + int blockCount = 0; + for (int i = 0; i < numOfBlocksPerContainer; i++) { + txnID = txnID + 1; + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + BlockData kd = new BlockData(blockID); + List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList(); + putChunksInBlock(numOfChunksPerBlock, i, chunks, buffer, chunkManager, + container, blockID); + kd.setChunks(chunks); + String bID = null; + try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) { + bID = blockID.getLocalID() + ""; + metadata.getStore().getBlockDataTable().put(bID, kd); + } catch (IOException exception) { + LOG.info("Exception = " + exception); + LOG.warn("Failed to put block: " + bID + " in BlockDataTable."); + } + container.getContainerData().incrPendingDeletionBlocks(1); + + // In below if statements we are checking if a single container + // consists of more blocks than 'blockLimitPerTask' then we create + // (totalBlocksInContainer / blockLimitPerTask) transactions which + // consists of blocks equal to blockLimitPerTask and last transaction + // consists of blocks equal to + // (totalBlocksInContainer % blockLimitPerTask). + containerBlocks.add(blockID.getLocalID()); + blockCount++; + if (blockCount == blockLimitPerTask || i == (numOfBlocksPerContainer + - 1)) { + createTxn(data, containerBlocks, txnID, containerID); + containerBlocks.clear(); + blockCount = 0; + } + } + updateMetaData(data, container, numOfBlocksPerContainer, + numOfChunksPerBlock); + } + + private void createTxn(KeyValueContainerData data, List<Long> containerBlocks, + int txnID, long containerID) { + try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) { + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction dtx = + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + .newBuilder().setTxID(txnID).setContainerID(containerID) + .addAllLocalID(containerBlocks).setCount(0).build(); + try (BatchOperation batch = metadata.getStore().getBatchHandler() + .initBatchOperation()) { + DatanodeStore ds = metadata.getStore(); + DatanodeStoreSchemaTwoImpl dnStoreTwoImpl = + (DatanodeStoreSchemaTwoImpl) ds; + dnStoreTwoImpl.getDeleteTransactionTable() + .putWithBatch(batch, (long) txnID, dtx); + metadata.getStore().getBatchHandler().commitBatchOperation(batch); } + } catch (IOException exception) { + LOG.warn("Transaction creation was not successful for txnID: " + txnID + + " consisting of " + containerBlocks.size() + " blocks."); + } + } + + private void putChunksInBlock(int numOfChunksPerBlock, int i, + List<ContainerProtos.ChunkInfo> chunks, ChunkBuffer buffer, + ChunkManager chunkManager, KeyValueContainer container, BlockID blockID) { + long chunkLength = 100; + try { + for (int k = 0; k < numOfChunksPerBlock; k++) { + final String chunkName = String.format("block.%d.chunk.%d", i, k); + final long offset = k * chunkLength; + ContainerProtos.ChunkInfo info = + ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName) + .setLen(chunkLength).setOffset(offset) + .setChecksumData(Checksum.getNoChecksumDataProto()).build(); + chunks.add(info); + ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength); + ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength); + chunkManager + .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE); + chunkManager + .writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE); + } + } catch (IOException ex) { + LOG.warn("Putting chunks in blocks was not successful for BlockID: " + + blockID); + } + } + + private void updateMetaData(KeyValueContainerData data, + KeyValueContainer container, int numOfBlocksPerContainer, + int numOfChunksPerBlock) { + long chunkLength = 100; + try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) { + container.getContainerData().setKeyCount(numOfBlocksPerContainer); + // Set block count, bytes used and pending delete block count. + metadata.getStore().getMetadataTable() + .put(OzoneConsts.BLOCK_COUNT, (long) numOfBlocksPerContainer); + metadata.getStore().getMetadataTable() + .put(OzoneConsts.CONTAINER_BYTES_USED, + chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer); + metadata.getStore().getMetadataTable() + .put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT, + (long) numOfBlocksPerContainer); + } catch (IOException exception) { + LOG.warn("Meta Data update was not successful for container: "+container); } } @@ -231,11 +374,32 @@ public class TestBlockDeletingService { * Get under deletion blocks count from DB, * note this info is parsed from container.db. */ - private int getUnderDeletionBlocksCount(ReferenceCountedDB meta) - throws IOException { - return meta.getStore().getBlockDataTable() - .getRangeKVs(null, 100, - MetadataKeyFilters.getDeletingKeyFilter()).size(); + private int getUnderDeletionBlocksCount(ReferenceCountedDB meta, + KeyValueContainerData data) throws IOException { + if (data.getSchemaVersion().equals(SCHEMA_V1)) { + return meta.getStore().getBlockDataTable() + .getRangeKVs(null, 100, MetadataKeyFilters.getDeletingKeyFilter()) + .size(); + } else if (data.getSchemaVersion().equals(SCHEMA_V2)) { + int pendingBlocks = 0; + DatanodeStore ds = meta.getStore(); + DatanodeStoreSchemaTwoImpl dnStoreTwoImpl = + (DatanodeStoreSchemaTwoImpl) ds; + try ( + TableIterator<Long, ? extends Table.KeyValue<Long, + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> + iter = dnStoreTwoImpl.getDeleteTransactionTable().iterator()) { + while (iter.hasNext()) { + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + delTx = iter.next().getValue(); + pendingBlocks += delTx.getLocalIDList().size(); + } + } + return pendingBlocks; + } else { + throw new UnsupportedOperationException( + "Only schema version 1 and schema version 2 are supported."); + } } @@ -261,6 +425,7 @@ public class TestBlockDeletingService { // Ensure 1 container was created List<ContainerData> containerData = Lists.newArrayList(); containerSet.listContainer(0L, 1, containerData); + KeyValueContainerData data = (KeyValueContainerData) containerData.get(0); Assert.assertEquals(1, containerData.size()); try(ReferenceCountedDB meta = BlockUtils.getDB( @@ -280,7 +445,7 @@ public class TestBlockDeletingService { Assert.assertEquals(0, transactionId); // Ensure there are 3 blocks under deletion and 0 deleted blocks - Assert.assertEquals(3, getUnderDeletionBlocksCount(meta)); + Assert.assertEquals(3, getUnderDeletionBlocksCount(meta, data)); Assert.assertEquals(3, meta.getStore().getMetadataTable() .get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue()); @@ -348,6 +513,9 @@ public class TestBlockDeletingService { public void testBlockDeletionTimeout() throws Exception { conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10); conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2); + this.blockLimitPerTask = + conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); ContainerSet containerSet = new ContainerSet(); createToDeleteBlocks(containerSet, 1, 3, 1); ContainerMetrics metrics = ContainerMetrics.create(conf); @@ -394,7 +562,7 @@ public class TestBlockDeletingService { LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG); GenericTestUtils.waitFor(() -> { try { - return getUnderDeletionBlocksCount(meta) == 0; + return getUnderDeletionBlocksCount(meta, data) == 0; } catch (IOException ignored) { } return false; @@ -445,6 +613,9 @@ public class TestBlockDeletingService { TopNOrderedContainerDeletionChoosingPolicy.class.getName()); conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1); conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1); + this.blockLimitPerTask = + conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); ContainerSet containerSet = new ContainerSet(); int containerCount = 2; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 548f073..7bccd8e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -92,11 +92,15 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; -import static org.apache.hadoop.hdds.HddsConfigKeys.*; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION; import static org.junit.Assert.fail; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test class that exercises the StorageContainerManager. @@ -272,8 +276,6 @@ public class TestStorageContainerManager { Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog, keyLocations, helper); - Set<Long> containerIDs = containerBlocks.keySet(); - // Verify a few TX gets created in the TX log. Assert.assertTrue(delLog.getNumOfValidTransactions() > 0); @@ -289,8 +291,7 @@ public class TestStorageContainerManager { return false; } }, 1000, 10000); - Assert.assertTrue(helper.getAllBlocks(containerIDs).isEmpty()); - + Assert.assertTrue(helper.verifyBlocksWithTxnTable(containerBlocks)); // Continue the work, add some TXs that with known container names, // but unknown block IDs. for (Long containerID : containerBlocks.keySet()) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java index c67fe30..fe0e075 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,9 +33,14 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.metadata.DatanodeStore; +import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -137,6 +143,30 @@ public class TestStorageContainerManagerHelper { return allBlocks; } + public boolean verifyBlocksWithTxnTable(Map<Long, List<Long>> containerBlocks) + throws IOException { + Set<Long> containerIDs = containerBlocks.keySet(); + for (Long entry : containerIDs) { + ReferenceCountedDB meta = getContainerMetadata(entry); + DatanodeStore ds = meta.getStore(); + DatanodeStoreSchemaTwoImpl dnStoreTwoImpl = + (DatanodeStoreSchemaTwoImpl) ds; + List<? extends Table.KeyValue<Long, DeletedBlocksTransaction>> + txnsInTxnTable = dnStoreTwoImpl.getDeleteTransactionTable() + .getRangeKVs(null, Integer.MAX_VALUE, null); + List<Long> conID = new ArrayList<>(); + for (Table.KeyValue<Long, DeletedBlocksTransaction> txn : + txnsInTxnTable) { + conID.addAll(txn.getValue().getLocalIDList()); + } + if (!conID.equals(containerBlocks.get(entry))) { + return false; + } + meta.close(); + } + return true; + } + private ReferenceCountedDB getContainerMetadata(Long containerID) throws IOException { ContainerWithPipeline containerWithPipeline = cluster --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
