This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b831d8e6fb HDDS-8036. Unprotected flush in SCMHADBTransactionBuffer
(#4319)
b831d8e6fb is described below
commit b831d8e6fb1aec8d9a50677c64b7a1361deb0944
Author: Nibiru <[email protected]>
AuthorDate: Tue Feb 28 17:19:37 2023 +0800
HDDS-8036. Unprotected flush in SCMHADBTransactionBuffer (#4319)
---
.../hdds/scm/ha/SCMHADBTransactionBufferImpl.java | 87 ++++++++++++++--------
.../hdds/scm/ha/SCMHADBTransactionBufferStub.java | 27 +++++--
2 files changed, 77 insertions(+), 37 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
index 7ad66af5ea..d5171048a5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
@@ -42,6 +43,7 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
private BatchOperation currentBatchOperation;
private TransactionInfo latestTrxInfo;
private SnapshotInfo latestSnapshot;
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public SCMHADBTransactionBufferImpl(StorageContainerManager scm)
throws IOException {
@@ -56,13 +58,23 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
@Override
public <KEY, VALUE> void addToBuffer(
Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
- table.putWithBatch(getCurrentBatchOperation(), key, value);
+ rwLock.readLock().lock();
+ try {
+ table.putWithBatch(getCurrentBatchOperation(), key, value);
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
@Override
public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
throws IOException {
- table.deleteWithBatch(getCurrentBatchOperation(), key);
+ rwLock.readLock().lock();
+ try {
+ table.deleteWithBatch(getCurrentBatchOperation(), key);
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
@Override
@@ -92,43 +104,54 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
@Override
public void flush() throws IOException {
- // write latest trx info into trx table in the same batch
- Table<String, TransactionInfo> transactionInfoTable
- = metadataStore.getTransactionInfoTable();
- transactionInfoTable.putWithBatch(currentBatchOperation,
- TRANSACTION_INFO_KEY, latestTrxInfo);
-
- metadataStore.getStore().commitBatchOperation(currentBatchOperation);
- currentBatchOperation.close();
- this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
- // reset batch operation
- currentBatchOperation = metadataStore.getStore().initBatchOperation();
-
- DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
- .getDeletedBlockLog();
- Preconditions.checkArgument(
- deletedBlockLog instanceof DeletedBlockLogImpl);
- ((DeletedBlockLogImpl) deletedBlockLog).onFlush();
+ rwLock.writeLock().lock();
+ try {
+ // write latest trx info into trx table in the same batch
+ Table<String, TransactionInfo> transactionInfoTable
+ = metadataStore.getTransactionInfoTable();
+ transactionInfoTable.putWithBatch(currentBatchOperation,
+ TRANSACTION_INFO_KEY, latestTrxInfo);
+
+ metadataStore.getStore().commitBatchOperation(currentBatchOperation);
+ currentBatchOperation.close();
+ this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
+ // reset batch operation
+ currentBatchOperation = metadataStore.getStore().initBatchOperation();
+
+ DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
+ .getDeletedBlockLog();
+ Preconditions.checkArgument(
+ deletedBlockLog instanceof DeletedBlockLogImpl);
+ ((DeletedBlockLogImpl) deletedBlockLog).onFlush();
+ } finally {
+ rwLock.writeLock().unlock();
+ }
}
@Override
public void init() throws IOException {
metadataStore = scm.getScmMetadataStore();
- // initialize a batch operation during construction time
- currentBatchOperation = this.metadataStore.getStore().initBatchOperation();
- latestTrxInfo = this.metadataStore.getTransactionInfoTable()
- .get(TRANSACTION_INFO_KEY);
- if (latestTrxInfo == null) {
- // transaction table is empty
- latestTrxInfo =
- TransactionInfo
- .builder()
- .setTransactionIndex(-1)
- .setCurrentTerm(0)
- .build();
+ rwLock.writeLock().lock();
+ try {
+ // initialize a batch operation during construction time
+ currentBatchOperation = this.metadataStore.getStore().
+ initBatchOperation();
+ latestTrxInfo = this.metadataStore.getTransactionInfoTable()
+ .get(TRANSACTION_INFO_KEY);
+ if (latestTrxInfo == null) {
+ // transaction table is empty
+ latestTrxInfo =
+ TransactionInfo
+ .builder()
+ .setTransactionIndex(-1)
+ .setCurrentTerm(0)
+ .build();
+ }
+ latestSnapshot = latestTrxInfo.toSnapshotInfo();
+ } finally {
+ rwLock.writeLock().unlock();
}
- latestSnapshot = latestTrxInfo.toSnapshotInfo();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
index 10d6f29ef8..2387fcdf7f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
// TODO: Move this class to test package after fixing Recon
/**
@@ -32,6 +33,7 @@ import java.io.IOException;
public class SCMHADBTransactionBufferStub implements SCMHADBTransactionBuffer {
private DBStore dbStore;
private BatchOperation currentBatchOperation;
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public SCMHADBTransactionBufferStub() {
}
@@ -54,13 +56,23 @@ public class SCMHADBTransactionBufferStub implements
SCMHADBTransactionBuffer {
@Override
public <KEY, VALUE> void addToBuffer(
Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
- table.putWithBatch(getCurrentBatchOperation(), key, value);
+ rwLock.readLock().lock();
+ try {
+ table.putWithBatch(getCurrentBatchOperation(), key, value);
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
@Override
public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
throws IOException {
- table.deleteWithBatch(getCurrentBatchOperation(), key);
+ rwLock.readLock().lock();
+ try {
+ table.deleteWithBatch(getCurrentBatchOperation(), key);
+ } finally {
+ rwLock.readLock().unlock();
+ }
}
@Override
@@ -86,9 +98,14 @@ public class SCMHADBTransactionBufferStub implements
SCMHADBTransactionBuffer {
@Override
public void flush() throws IOException {
if (dbStore != null) {
- dbStore.commitBatchOperation(getCurrentBatchOperation());
- currentBatchOperation.close();
- currentBatchOperation = null;
+ rwLock.writeLock().lock();
+ try {
+ dbStore.commitBatchOperation(getCurrentBatchOperation());
+ currentBatchOperation.close();
+ currentBatchOperation = null;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]