This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new b328608 HDDS-3205. Handle BlockDeletingService in SCM HA (#1780)
b328608 is described below
commit b328608d2cd4de0bfabcc946c0b56025d1cbde25
Author: runzhiwang <[email protected]>
AuthorDate: Fri Jan 29 00:22:14 2021 +0800
HDDS-3205. Handle BlockDeletingService in SCM HA (#1780)
---
.../src/main/proto/SCMRatisProtocol.proto | 1 +
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 7 +-
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 11 -
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 21 --
...lockLogImpl.java => DeletedBlockLogImplV2.java} | 177 ++++++--------
.../scm/block/DeletedBlockLogStateManager.java | 45 ++++
.../scm/block/DeletedBlockLogStateManagerImpl.java | 265 +++++++++++++++++++++
.../hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java | 20 +-
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 2 +-
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 10 +
.../hdds/scm/server/StorageContainerManager.java | 4 +-
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 78 +++---
.../hadoop/ozone/TestStorageContainerManager.java | 43 +++-
.../client/rpc/TestDeleteWithSlowFollower.java | 12 +-
14 files changed, 515 insertions(+), 181 deletions(-)
diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 8818a83..c0938e8 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -23,6 +23,7 @@ option java_generate_equals_and_hash = true;
enum RequestType {
PIPELINE = 1;
CONTAINER = 2;
+ BLOCK = 3;
}
message Method {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index fb5d5d5..8acf985 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -101,8 +101,11 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service.
- deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(),
- scm.getScmMetadataStore());
+ deletedBlockLog = new DeletedBlockLogImplV2(conf,
scm.getContainerManager(),
+ scm.getScmHAManager().getRatisServer(),
+ scm.getScmMetadataStore().getDeletedBlocksTXTable(),
+ scm.getScmHAManager().getDBTransactionBuffer(),
+ scm.getScmContext());
Duration svcInterval = conf.getObject(
ScmConfig.class).getBlockDeletionInterval();
long serviceTimeout =
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index 9a5d74f..ddcd2d1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -81,17 +81,6 @@ public interface DeletedBlockLog extends Closeable {
UUID dnID);
/**
- * Creates a block deletion transaction and adds that into the log.
- *
- * @param containerID - container ID.
- * @param blocks - blocks that belong to the same container.
- *
- * @throws IOException
- */
- void addTransaction(long containerID, List<Long> blocks)
- throws IOException;
-
- /**
* Creates block deletion transactions for a set of containers,
* add into the log and persist them atomically. An object key
* might be stored in multiple containers and multiple blocks,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 8a46b66..b61f135 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -260,27 +260,6 @@ public class DeletedBlockLogImpl
return false;
}
- /**
- * {@inheritDoc}
- *
- * @param containerID - container ID.
- * @param blocks - blocks that belong to the same container.
- * @throws IOException
- */
- @Override
- public void addTransaction(long containerID, List<Long> blocks)
- throws IOException {
- lock.lock();
- try {
- Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
- DeletedBlocksTransaction tx =
- constructNewTransaction(nextTXID, containerID, blocks);
- scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
- } finally {
- lock.unlock();
- }
- }
-
@Override
public int getNumOfValidTransactions() throws IOException {
lock.lock();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
similarity index 74%
copy from
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
copy to
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
index 8a46b66..179d228 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImplV2.java
@@ -41,10 +41,12 @@ import
org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -52,6 +54,7 @@ import com.google.common.collect.Lists;
import static java.lang.Math.min;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +68,7 @@ import org.slf4j.LoggerFactory;
* equally same chance to be retrieved which only depends on the nature
* order of the transaction ID.
*/
-public class DeletedBlockLogImpl
+public class DeletedBlockLogImplV2
implements DeletedBlockLog, EventHandler<DeleteBlockStatus> {
public static final Logger LOG =
@@ -73,18 +76,23 @@ public class DeletedBlockLogImpl
private final int maxRetry;
private final ContainerManagerV2 containerManager;
- private final SCMMetadataStore scmMetadataStore;
private final Lock lock;
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
+ // The access to DeletedBlocksTXTable is protected by
+ // DeletedBlockLogStateManager.
+ private final DeletedBlockLogStateManager deletedBlockLogStateManager;
+ private final SCMContext scmContext;
- public DeletedBlockLogImpl(ConfigurationSource conf,
- ContainerManagerV2 containerManager,
- SCMMetadataStore scmMetadataStore) {
+ public DeletedBlockLogImplV2(ConfigurationSource conf,
+ ContainerManagerV2 containerManager,
+ SCMRatisServer ratisServer,
+ Table<Long, DeletedBlocksTransaction> deletedBlocksTXTable,
+ DBTransactionBuffer dbTxBuffer,
+ SCMContext scmContext) {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
this.containerManager = containerManager;
- this.scmMetadataStore = scmMetadataStore;
this.lock = new ReentrantLock();
// transactionToDNsCommitMap is updated only when
@@ -92,6 +100,14 @@ public class DeletedBlockLogImpl
// maps transaction to dns which have committed it.
transactionToDNsCommitMap = new ConcurrentHashMap<>();
+ this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl
+ .newBuilder()
+ .setConfiguration(conf)
+ .setDeletedBlocksTable(deletedBlocksTXTable)
+ .setRatisServer(ratisServer)
+ .setSCMDBTransactionBuffer(dbTxBuffer)
+ .build();
+ this.scmContext = scmContext;
}
@@ -103,7 +119,7 @@ public class DeletedBlockLogImpl
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ deletedBlockLogStateManager.getReadOnlyIterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
@@ -125,47 +141,18 @@ public class DeletedBlockLogImpl
*/
@Override
public void incrementCount(List<Long> txIDs) throws IOException {
- for (Long txID : txIDs) {
- lock.lock();
- try {
- DeletedBlocksTransaction block =
- scmMetadataStore.getDeletedBlocksTXTable().get(txID);
- if (block == null) {
- if (LOG.isDebugEnabled()) {
- // This can occur due to race condition between retry and old
- // service task where old task removes the transaction and the new
- // task is resending
- LOG.debug("Deleted TXID {} not found.", txID);
- }
- continue;
- }
- DeletedBlocksTransaction.Builder builder = block.toBuilder();
- int currentCount = block.getCount();
- if (currentCount > -1) {
- builder.setCount(++currentCount);
- }
- // if the retry time exceeds the maxRetry value
- // then set the retry value to -1, stop retrying, admins can
- // analyze those blocks and purge them manually by SCMCli.
- if (currentCount > maxRetry) {
- builder.setCount(-1);
- }
- scmMetadataStore.getDeletedBlocksTXTable().put(txID,
- builder.build());
- } catch (IOException ex) {
- LOG.warn("Cannot increase count for txID " + txID, ex);
- // We do not throw error here, since we don't want to abort the loop.
- // Just log and continue processing the rest of txids.
- } finally {
- lock.unlock();
- }
+ lock.lock();
+ try {
+ deletedBlockLogStateManager
+ .increaseRetryCountOfTransactionInDB(new ArrayList<>(txIDs));
+ } finally {
+ lock.unlock();
}
}
- private DeletedBlocksTransaction constructNewTransaction(long txID,
- long containerID,
- List<Long> blocks) {
+ private DeletedBlocksTransaction constructNewTransaction(
+ long txID, long containerID, List<Long> blocks) {
return DeletedBlocksTransaction.newBuilder()
.setTxID(txID)
.setContainerID(containerID)
@@ -187,6 +174,7 @@ public class DeletedBlockLogImpl
List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
lock.lock();
try {
+ ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
Set<UUID> dnsWithCommittedTxn;
for (DeleteBlockTransactionResult transactionResult :
transactionResults) {
@@ -229,7 +217,7 @@ public class DeletedBlockLogImpl
if (LOG.isDebugEnabled()) {
LOG.debug("Purging txId={} from block deletion log", txID);
}
- scmMetadataStore.getDeletedBlocksTXTable().delete(txID);
+ txIDsToBeDeleted.add(txID);
}
}
if (LOG.isDebugEnabled()) {
@@ -241,6 +229,12 @@ public class DeletedBlockLogImpl
transactionResult.getTxID(), e);
}
}
+ try {
+ deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
+ } catch (IOException e) {
+ LOG.warn("Could not commit delete block transactions: "
+ + txIDsToBeDeleted, e);
+ }
} finally {
lock.unlock();
}
@@ -260,27 +254,6 @@ public class DeletedBlockLogImpl
return false;
}
- /**
- * {@inheritDoc}
- *
- * @param containerID - container ID.
- * @param blocks - blocks that belong to the same container.
- * @throws IOException
- */
- @Override
- public void addTransaction(long containerID, List<Long> blocks)
- throws IOException {
- lock.lock();
- try {
- Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
- DeletedBlocksTransaction tx =
- constructNewTransaction(nextTXID, containerID, blocks);
- scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
- } finally {
- lock.unlock();
- }
- }
-
@Override
public int getNumOfValidTransactions() throws IOException {
lock.lock();
@@ -288,7 +261,7 @@ public class DeletedBlockLogImpl
final AtomicInteger num = new AtomicInteger(0);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ deletedBlockLogStateManager.getReadOnlyIterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() > -1) {
@@ -303,6 +276,22 @@ public class DeletedBlockLogImpl
}
/**
+ * Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes
+ * leader.
+ */
+ public void clearTransactionToDNsCommitMap() {
+ transactionToDNsCommitMap.clear();
+ }
+
+ /**
+ * Called in SCMDBTransactionBuffer#flush when the cached deleting operations
+ * are flushed.
+ */
+ public void onFlush() {
+ deletedBlockLogStateManager.onFlush();
+ }
+
+ /**
* {@inheritDoc}
*
* @param containerBlocksMap a map of containerBlocks.
@@ -313,18 +302,17 @@ public class DeletedBlockLogImpl
throws IOException {
lock.lock();
try {
- try(BatchOperation batch =
- scmMetadataStore.getStore().initBatchOperation()) {
- for (Map.Entry< Long, List< Long > > entry :
- containerBlocksMap.entrySet()) {
- long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
- DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
- entry.getKey(), entry.getValue());
- scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
- nextTXID, tx);
- }
- scmMetadataStore.getStore().commitBatchOperation(batch);
+ ArrayList<DeletedBlocksTransaction> txsToBeAdded = new ArrayList<>();
+ for (Map.Entry< Long, List< Long > > entry :
+ containerBlocksMap.entrySet()) {
+ // TODO(runzhiwang): Should use distributed sequence id generator
+ long nextTXID = UniqueId.next();
+ DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
+ entry.getKey(), entry.getValue());
+ txsToBeAdded.add(tx);
}
+
+ deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
} finally {
lock.unlock();
}
@@ -364,10 +352,9 @@ public class DeletedBlockLogImpl
new DatanodeDeletedBlockTransactions();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ deletedBlockLogStateManager.getReadOnlyIterator()) {
int numBlocksAdded = 0;
- List<DeletedBlocksTransaction> txnsToBePurged =
- new ArrayList<>();
+ ArrayList<Long> txIDs = new ArrayList<>();
while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
iter.next();
DeletedBlocksTransaction txn = keyValue.getValue();
@@ -383,10 +370,11 @@ public class DeletedBlockLogImpl
} catch (ContainerNotFoundException ex) {
LOG.warn("Container: " + id + " was not found for the transaction:
"
+ txn);
- txnsToBePurged.add(txn);
+ txIDs.add(txn.getTxID());
}
}
- purgeTransactions(txnsToBePurged);
+
+ deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
}
return transactions;
} finally {
@@ -394,21 +382,14 @@ public class DeletedBlockLogImpl
}
}
- public void purgeTransactions(List<DeletedBlocksTransaction> txnsToBePurged)
- throws IOException {
- try (BatchOperation batch = scmMetadataStore.getBatchHandler()
- .initBatchOperation()) {
- for (int i = 0; i < txnsToBePurged.size(); i++) {
- scmMetadataStore.getDeletedBlocksTXTable()
- .deleteWithBatch(batch, txnsToBePurged.get(i).getTxID());
- }
- scmMetadataStore.getBatchHandler().commitBatchOperation(batch);
+ @Override
+ public void onMessage(
+ DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
+ if (!scmContext.isLeader()) {
+ LOG.warn("Skip commit transactions since current SCM is not leader.");
+ return;
}
- }
- @Override
- public void onMessage(DeleteBlockStatus deleteBlockStatus,
- EventPublisher publisher) {
ContainerBlocksDeletionACKProto ackProto =
deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
commitTransactions(ackProto.getResultsList(),
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java
new file mode 100644
index 0000000..f152a11
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public interface DeletedBlockLogStateManager {
+ @Replicate
+ void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
+ throws IOException;
+
+ @Replicate
+ void removeTransactionsFromDB(ArrayList<Long> txIDs)
+ throws IOException;
+
+ @Replicate
+ void increaseRetryCountOfTransactionInDB(ArrayList<Long> txIDs)
+ throws IOException;
+
+ TableIterator<Long,
+ KeyValue<Long, DeletedBlocksTransaction>> getReadOnlyIterator();
+
+ void onFlush();
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
new file mode 100644
index 0000000..ab72c62
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+
+public class DeletedBlockLogStateManagerImpl
+ implements DeletedBlockLogStateManager {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class);
+
+ private final Table<Long, DeletedBlocksTransaction> deletedTable;
+ private final DBTransactionBuffer transactionBuffer;
+ private final int maxRetry;
+ private Set<Long> deletingTxIDs;
+
+ public DeletedBlockLogStateManagerImpl(
+ ConfigurationSource conf,
+ Table<Long, DeletedBlocksTransaction> deletedTable,
+ DBTransactionBuffer txBuffer) {
+ this.maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
+ OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
+ this.deletedTable = deletedTable;
+ this.transactionBuffer = txBuffer;
+ this.deletingTxIDs = ConcurrentHashMap.newKeySet();
+ }
+
+ public TableIterator<Long, TypedTable.KeyValue<Long,
+ DeletedBlocksTransaction>> getReadOnlyIterator() {
+ return new TableIterator<Long, TypedTable.KeyValue<Long,
+ DeletedBlocksTransaction>>() {
+
+ private TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+ deletedTable.iterator();
+ private TypedTable.KeyValue<Long, DeletedBlocksTransaction> nextTx;
+
+ {
+ findNext();
+ }
+
+ private void findNext() {
+ while (iter.hasNext()) {
+ TypedTable.KeyValue<Long, DeletedBlocksTransaction> next = iter
+ .next();
+ long txID;
+ try {
+ txID = next.getKey();
+ } catch (IOException e) {
+ throw new IllegalStateException("");
+ }
+
+ if (!deletingTxIDs.contains(txID)) {
+ nextTx = next;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("DeletedBlocksTransaction matching txID:{}",
+ txID);
+ }
+ return;
+ }
+ }
+ nextTx = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextTx != null;
+ }
+
+ @Override
+ public TypedTable.KeyValue<Long, DeletedBlocksTransaction> next() {
+ if (nextTx == null) {
+ throw new NoSuchElementException("DeletedBlocksTransaction " +
+ "Iterator reached end");
+ }
+ TypedTable.KeyValue<Long, DeletedBlocksTransaction> returnTx = nextTx;
+ findNext();
+ return returnTx;
+ }
+
+ @Override
+ public void close() throws IOException {
+ iter.close();
+ }
+
+ @Override
+ public void seekToFirst() {
+ throw new UnsupportedOperationException("seekToFirst");
+ }
+
+ @Override
+ public void seekToLast() {
+ throw new UnsupportedOperationException("seekToLast");
+ }
+
+ @Override
+ public TypedTable.KeyValue<Long, DeletedBlocksTransaction> seek(
+ Long key) throws IOException {
+ throw new UnsupportedOperationException("seek");
+ }
+
+ @Override
+ public Long key() throws IOException {
+ throw new UnsupportedOperationException("key");
+ }
+
+ @Override
+ public TypedTable.KeyValue<Long, DeletedBlocksTransaction> value() {
+ throw new UnsupportedOperationException("value");
+ }
+
+ @Override
+ public void removeFromDB() throws IOException {
+ throw new UnsupportedOperationException("read-only");
+ }
+ };
+ }
+
+ @Override
+ public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
+ throws IOException {
+ for (DeletedBlocksTransaction tx : txs) {
+ deletedTable.putWithBatch(
+ transactionBuffer.getCurrentBatchOperation(), tx.getTxID(), tx);
+ }
+ }
+
+ @Override
+ public void removeTransactionsFromDB(ArrayList<Long> txIDs)
+ throws IOException {
+ for (Long txID : txIDs) {
+ deletedTable.deleteWithBatch(
+ transactionBuffer.getCurrentBatchOperation(), txID);
+ }
+ }
+
+ @Override
+ public void increaseRetryCountOfTransactionInDB(
+ ArrayList<Long> txIDs) throws IOException {
+ for (Long txID : txIDs) {
+ DeletedBlocksTransaction block =
+ deletedTable.get(txID);
+ if (block == null) {
+ if (LOG.isDebugEnabled()) {
+ // This can occur due to race condition between retry and old
+ // service task where old task removes the transaction and the new
+ // task is resending
+ LOG.debug("Deleted TXID {} not found.", txID);
+ }
+ continue;
+ }
+ DeletedBlocksTransaction.Builder builder = block.toBuilder();
+ int currentCount = block.getCount();
+ if (currentCount > -1) {
+ builder.setCount(++currentCount);
+ }
+ // if the retry time exceeds the maxRetry value
+ // then set the retry value to -1, stop retrying, admins can
+ // analyze those blocks and purge them manually by SCMCli.
+ if (currentCount > maxRetry) {
+ builder.setCount(-1);
+ }
+ deletedTable.putWithBatch(
+ transactionBuffer.getCurrentBatchOperation(), txID, builder.build());
+ }
+ }
+
+
+ public void onFlush() {
+ deletingTxIDs.clear();
+ LOG.info("Clear cached deletingTxIDs.");
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for ContainerStateManager.
+ */
+ public static class Builder {
+ private ConfigurationSource conf;
+ private SCMRatisServer scmRatisServer;
+ private Table<Long, DeletedBlocksTransaction> table;
+ private DBTransactionBuffer transactionBuffer;
+
+ public Builder setConfiguration(final ConfigurationSource config) {
+ conf = config;
+ return this;
+ }
+
+ public Builder setRatisServer(final SCMRatisServer ratisServer) {
+ scmRatisServer = ratisServer;
+ return this;
+ }
+
+ public Builder setDeletedBlocksTable(
+ final Table<Long, DeletedBlocksTransaction> deletedBlocksTable) {
+ table = deletedBlocksTable;
+ return this;
+ }
+
+ public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
+ this.transactionBuffer = buffer;
+ return this;
+ }
+
+ public DeletedBlockLogStateManager build() {
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkNotNull(scmRatisServer);
+ Preconditions.checkNotNull(table);
+ Preconditions.checkNotNull(table);
+
+ final DeletedBlockLogStateManager impl =
+ new DeletedBlockLogStateManagerImpl(conf, table, transactionBuffer);
+
+ final SCMHAInvocationHandler invocationHandler =
+ new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.BLOCK,
+ impl, scmRatisServer);
+
+ return (DeletedBlockLogStateManager) Proxy.newProxyInstance(
+ SCMHAInvocationHandler.class.getClassLoader(),
+ new Class<?>[]{DeletedBlockLogStateManager.class},
+ invocationHandler);
+ }
+
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
index e5af076..451ed1b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
@@ -16,7 +16,11 @@
*/
package org.apache.hadoop.hdds.scm.ha;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -32,17 +36,21 @@ import static
org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
* operation in DB.
*/
public class SCMDBTransactionBuffer implements DBTransactionBuffer {
+ private final StorageContainerManager scm;
private final SCMMetadataStore metadataStore;
private BatchOperation currentBatchOperation;
private SCMTransactionInfo latestTrxInfo;
private SnapshotInfo latestSnapshot;
- public SCMDBTransactionBuffer(SCMMetadataStore store) throws IOException {
- this.metadataStore = store;
+ public SCMDBTransactionBuffer(StorageContainerManager scm)
+ throws IOException {
+ this.scm = scm;
+ this.metadataStore = scm.getScmMetadataStore();
// initialize a batch operation during construction time
currentBatchOperation = this.metadataStore.getStore().initBatchOperation();
- latestTrxInfo = store.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ latestTrxInfo = this.metadataStore.getTransactionInfoTable()
+ .get(TRANSACTION_INFO_KEY);
if (latestTrxInfo == null) {
// transaction table is empty
latestTrxInfo =
@@ -98,6 +106,12 @@ public class SCMDBTransactionBuffer implements
DBTransactionBuffer {
this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
// reset batch operation
currentBatchOperation = metadataStore.getStore().initBatchOperation();
+
+ DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
+ .getDeletedBlockLog();
+ Preconditions.checkArgument(
+ deletedBlockLog instanceof DeletedBlockLogImplV2);
+ ((DeletedBlockLogImplV2) deletedBlockLog).onFlush();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index b795380..1880363 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -48,7 +48,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
final StorageContainerManager scm) throws IOException {
this.conf = conf;
this.transactionBuffer =
- new SCMDBTransactionBuffer(scm.getScmMetadataStore());
+ new SCMDBTransactionBuffer(scm);
this.ratisServer = new SCMRatisServerImpl(
conf.getObject(SCMHAConfiguration.class), conf, scm,
transactionBuffer);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index a04f0d8..13c5661 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -26,7 +26,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -156,6 +159,13 @@ public class SCMStateMachine extends BaseStateMachine {
scm.getScmContext().updateLeaderAndTerm(true, term);
scm.getSCMServiceManager().notifyStatusChanged();
+
+ DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
+ .getDeletedBlockLog();
+ Preconditions.checkArgument(
+ deletedBlockLog instanceof DeletedBlockLogImplV2);
+ ((DeletedBlockLogImplV2) deletedBlockLog)
+ .clearTransactionToDNsCommitMap();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 809dda9..ce5e5f3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -61,7 +61,7 @@ import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -362,7 +362,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
- (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
+ (DeletedBlockLogImplV2) scmBlockManager.getDeletedBlockLog());
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 37b2d61..ef6d102 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -77,13 +81,14 @@ import static org.mockito.Mockito.when;
*/
public class TestDeletedBlockLog {
- private static DeletedBlockLogImpl deletedBlockLog;
+ private static DeletedBlockLogImplV2 deletedBlockLog;
private static final int BLOCKS_PER_TXN = 5;
private OzoneConfiguration conf;
private File testDir;
private ContainerManagerV2 containerManager;
private StorageContainerManager scm;
private List<DatanodeDetails> dnList;
+ private DBTransactionBuffer dbTransactionBuffer;
@Before
public void setup() throws Exception {
@@ -94,8 +99,12 @@ public class TestDeletedBlockLog {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
scm = TestUtils.getScm(conf);
containerManager = Mockito.mock(ContainerManagerV2.class);
- deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
- scm.getScmMetadataStore());
+ dbTransactionBuffer =
+ new MockDBTransactionBuffer(scm.getScmMetadataStore().getStore());
+ deletedBlockLog = new DeletedBlockLogImplV2(conf, containerManager,
+ MockSCMHAManager.getInstance(true).getRatisServer(),
+ scm.getScmMetadataStore().getDeletedBlocksTXTable(),
+ dbTransactionBuffer, SCMContext.emptyContext());
dnList = new ArrayList<>(3);
setupContainerManager();
}
@@ -155,31 +164,48 @@ public class TestDeletedBlockLog {
return blockMap;
}
+ private void addTransactions(Map<Long, List<Long>> containerBlocksMap)
+ throws IOException {
+ dbTransactionBuffer.getCurrentBatchOperation();
+ deletedBlockLog.addTransactions(containerBlocksMap);
+ dbTransactionBuffer.flush();
+ }
+
+ private void incrementCount(List<Long> txIDs) throws IOException {
+ dbTransactionBuffer.getCurrentBatchOperation();
+ deletedBlockLog.incrementCount(txIDs);
+ dbTransactionBuffer.flush();
+ }
+
private void commitTransactions(
List<DeleteBlockTransactionResult> transactionResults,
- DatanodeDetails... dns) {
+ DatanodeDetails... dns) throws IOException {
+ dbTransactionBuffer.getCurrentBatchOperation();
for (DatanodeDetails dnDetails : dns) {
deletedBlockLog
.commitTransactions(transactionResults, dnDetails.getUuid());
}
+ dbTransactionBuffer.flush();
}
private void commitTransactions(
- List<DeleteBlockTransactionResult> transactionResults) {
+ List<DeleteBlockTransactionResult> transactionResults)
+ throws IOException {
commitTransactions(transactionResults,
dnList.toArray(new DatanodeDetails[3]));
}
private void commitTransactions(
Collection<DeletedBlocksTransaction> deletedBlocksTransactions,
- DatanodeDetails... dns) {
+ DatanodeDetails... dns) throws IOException {
commitTransactions(deletedBlocksTransactions.stream()
.map(this::createDeleteBlockTransactionResult)
.collect(Collectors.toList()), dns);
}
private void commitTransactions(
- Collection<DeletedBlocksTransaction> deletedBlocksTransactions) {
+ Collection<DeletedBlocksTransaction> deletedBlocksTransactions)
+ throws IOException {
commitTransactions(deletedBlocksTransactions.stream()
.map(this::createDeleteBlockTransactionResult)
.collect(Collectors.toList()));
@@ -210,9 +236,7 @@ public class TestDeletedBlockLog {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
// Create 30 TXs in the log.
- for (Map.Entry<Long, List<Long>> entry : generateData(30).entrySet()){
- deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
- }
+ addTransactions(generateData(30));
// This will return all TXs, total num 30.
List<DeletedBlocksTransaction> blocks =
@@ -221,12 +245,12 @@ public class TestDeletedBlockLog {
.collect(Collectors.toList());
for (int i = 0; i < maxRetry; i++) {
- deletedBlockLog.incrementCount(txIDs);
+ incrementCount(txIDs);
}
// Increment another time so it exceed the maxRetry.
// On this call, count will be set to -1 which means TX eventually fails.
- deletedBlockLog.incrementCount(txIDs);
+ incrementCount(txIDs);
blocks = getTransactions(40 * BLOCKS_PER_TXN);
for (DeletedBlocksTransaction block : blocks) {
Assert.assertEquals(-1, block.getCount());
@@ -239,9 +263,7 @@ public class TestDeletedBlockLog {
@Test
public void testCommitTransactions() throws Exception {
- for (Map.Entry<Long, List<Long>> entry : generateData(50).entrySet()){
- deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
- }
+ addTransactions(generateData(50));
List<DeletedBlocksTransaction> blocks =
getTransactions(20 * BLOCKS_PER_TXN);
// Add an invalid txn.
@@ -279,10 +301,7 @@ public class TestDeletedBlockLog {
for (int i = 0; i < 100; i++) {
int state = random.nextInt(4);
if (state == 0) {
- for (Map.Entry<Long, List<Long>> entry :
- generateData(10).entrySet()){
- deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
- }
+ addTransactions(generateData(10));
added += 10;
} else if (state == 1) {
blocks = getTransactions(20);
@@ -290,7 +309,7 @@ public class TestDeletedBlockLog {
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
- deletedBlockLog.incrementCount(txIDs);
+ incrementCount(txIDs);
} else if (state == 2) {
commitTransactions(blocks);
committed += blocks.size();
@@ -312,14 +331,14 @@ public class TestDeletedBlockLog {
@Test
public void testPersistence() throws Exception {
- for (Map.Entry<Long, List<Long>> entry : generateData(50).entrySet()){
- deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
- }
+ addTransactions(generateData(50));
// close db and reopen it again to make sure
// transactions are stored persistently.
deletedBlockLog.close();
- deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
- scm.getScmMetadataStore());
+ deletedBlockLog = new DeletedBlockLogImplV2(conf, containerManager,
+ MockSCMHAManager.getInstance(true).getRatisServer(),
+ scm.getScmMetadataStore().getDeletedBlocksTXTable(),
+ dbTransactionBuffer, SCMContext.emptyContext());
List<DeletedBlocksTransaction> blocks =
getTransactions(BLOCKS_PER_TXN * 10);
Assert.assertEquals(10, blocks.size());
@@ -339,10 +358,11 @@ public class TestDeletedBlockLog {
long containerID;
// Creates {TXNum} TX in the log.
- for (Map.Entry<Long, List<Long>> entry : generateData(txNum).entrySet()) {
+ Map<Long, List<Long>> deletedBlocks = generateData(txNum);
+ addTransactions(deletedBlocks);
+ for (Map.Entry<Long, List<Long>> entry :deletedBlocks.entrySet()) {
count++;
containerID = entry.getKey();
- deletedBlockLog.addTransaction(containerID, entry.getValue());
if (count % 2 == 0) {
mockContainerInfo(containerID, dnId1);
@@ -365,7 +385,9 @@ public class TestDeletedBlockLog {
builder.setTxID(11);
builder.setContainerID(containerID);
builder.setCount(0);
- deletedBlockLog.addTransaction(containerID, new LinkedList<>());
+ Map<Long, List<Long>> deletedBlocksMap = new HashMap<>();
+ deletedBlocksMap.put(containerID, new LinkedList<>());
+ addTransactions(deletedBlocksMap);
// get should return two transactions for the same container
blocks = getTransactions(txNum);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 635fe30..a36c275 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -41,7 +41,9 @@ import java.lang.reflect.Modifier;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -281,8 +283,9 @@ public class TestStorageContainerManager {
cluster.getStorageContainerManager());
}
- Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
- keyLocations, helper);
+ Map<Long, List<Long>> containerBlocks = createDeleteTXLog(
+ cluster.getStorageContainerManager(),
+ delLog, keyLocations, helper);
Set<Long> containerIDs = containerBlocks.keySet();
// Verify a few TX gets created in the TX log.
@@ -295,6 +298,8 @@ public class TestStorageContainerManager {
// empty again.
GenericTestUtils.waitFor(() -> {
try {
+ cluster.getStorageContainerManager().getScmHAManager()
+ .getDBTransactionBuffer().flush();
return delLog.getNumOfValidTransactions() == 0;
} catch (IOException e) {
return false;
@@ -306,10 +311,13 @@ public class TestStorageContainerManager {
// but unknown block IDs.
for (Long containerID : containerBlocks.keySet()) {
// Add 2 TXs per container.
- delLog.addTransaction(containerID,
- Collections.singletonList(RandomUtils.nextLong()));
- delLog.addTransaction(containerID,
- Collections.singletonList(RandomUtils.nextLong()));
+ Map<Long, List<Long>> deletedBlocks = new HashMap<>();
+ List<Long> blocks = new ArrayList<>();
+ blocks.add(RandomUtils.nextLong());
+ blocks.add(RandomUtils.nextLong());
+ deletedBlocks.put(containerID, blocks);
+ addTransactions(cluster.getStorageContainerManager(), delLog,
+ deletedBlocks);
}
// Verify a few TX gets created in the TX log.
@@ -319,11 +327,13 @@ public class TestStorageContainerManager {
// eventually these TX will success.
GenericTestUtils.waitFor(() -> {
try {
+ cluster.getStorageContainerManager().getScmHAManager()
+ .getDBTransactionBuffer().flush();
return delLog.getFailedTransactions().size() == 0;
} catch (IOException e) {
return false;
}
- }, 1000, 10000);
+ }, 1000, 20000);
} finally {
cluster.shutdown();
}
@@ -374,7 +384,8 @@ public class TestStorageContainerManager {
cluster.getStorageContainerManager());
}
- createDeleteTXLog(delLog, keyLocations, helper);
+ createDeleteTXLog(cluster.getStorageContainerManager(),
+ delLog, keyLocations, helper);
// Verify a few TX gets created in the TX log.
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
@@ -400,7 +411,9 @@ public class TestStorageContainerManager {
}
}
- private Map<Long, List<Long>> createDeleteTXLog(DeletedBlockLog delLog,
+ private Map<Long, List<Long>> createDeleteTXLog(
+ StorageContainerManager scm,
+ DeletedBlockLog delLog,
Map<String, OmKeyInfo> keyLocations,
TestStorageContainerManagerHelper helper) throws IOException {
// These keys will be written into a bunch of containers,
@@ -438,9 +451,7 @@ public class TestStorageContainerManager {
}
});
}
- for (Map.Entry<Long, List<Long>> tx : containerBlocks.entrySet()) {
- delLog.addTransaction(tx.getKey(), tx.getValue());
- }
+ addTransactions(scm, delLog, containerBlocks);
return containerBlocks;
}
@@ -667,6 +678,14 @@ public class TestStorageContainerManager {
}
}
+ private void addTransactions(StorageContainerManager scm,
+ DeletedBlockLog delLog,
+ Map<Long, List<Long>> containerBlocksMap)
+ throws IOException {
+ delLog.addTransactions(containerBlocksMap);
+ scm.getScmHAManager().getDBTransactionBuffer().flush();
+ }
+
@SuppressWarnings("visibilitymodifier")
static class CloseContainerCommandMatcher
extends ArgumentMatcher<CommandForDatanode> {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 5a44a2c..bbbd702 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -280,9 +280,15 @@ public class TestDeleteWithSlowFollower {
client.getObjectStore().getVolume(volumeName).getBucket(bucketName).
deleteKey("ratis");
GenericTestUtils.waitFor(() -> {
- return
- dnStateMachine.getCommandDispatcher().getDeleteBlocksCommandHandler()
- .getInvocationCount() >= 1;
+ try {
+ cluster.getStorageContainerManager().getScmHAManager()
+ .getDBTransactionBuffer().flush();
+ return
+ dnStateMachine.getCommandDispatcher()
+ .getDeleteBlocksCommandHandler().getInvocationCount() >= 1;
+ } catch (IOException e) {
+ return false;
+ }
}, 500, 100000);
Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId);
Assert.assertTrue(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]