This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b831d8e6fb HDDS-8036. Unprotected flush in SCMHADBTransactionBuffer 
(#4319)
b831d8e6fb is described below

commit b831d8e6fb1aec8d9a50677c64b7a1361deb0944
Author: Nibiru <[email protected]>
AuthorDate: Tue Feb 28 17:19:37 2023 +0800

    HDDS-8036. Unprotected flush in SCMHADBTransactionBuffer (#4319)
---
 .../hdds/scm/ha/SCMHADBTransactionBufferImpl.java  | 87 ++++++++++++++--------
 .../hdds/scm/ha/SCMHADBTransactionBufferStub.java  | 27 +++++--
 2 files changed, 77 insertions(+), 37 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
index 7ad66af5ea..d5171048a5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 
@@ -42,6 +43,7 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
   private BatchOperation currentBatchOperation;
   private TransactionInfo latestTrxInfo;
   private SnapshotInfo latestSnapshot;
+  private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
 
   public SCMHADBTransactionBufferImpl(StorageContainerManager scm)
       throws IOException {
@@ -56,13 +58,23 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
   @Override
   public <KEY, VALUE> void addToBuffer(
       Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
-    table.putWithBatch(getCurrentBatchOperation(), key, value);
+    rwLock.readLock().lock();
+    try {
+      table.putWithBatch(getCurrentBatchOperation(), key, value);
+    } finally {
+      rwLock.readLock().unlock();
+    }
   }
 
   @Override
   public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
       throws IOException {
-    table.deleteWithBatch(getCurrentBatchOperation(), key);
+    rwLock.readLock().lock();
+    try {
+      table.deleteWithBatch(getCurrentBatchOperation(), key);
+    } finally {
+      rwLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -92,43 +104,54 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
 
   @Override
   public void flush() throws IOException {
-    // write latest trx info into trx table in the same batch
-    Table<String, TransactionInfo> transactionInfoTable
-        = metadataStore.getTransactionInfoTable();
-    transactionInfoTable.putWithBatch(currentBatchOperation,
-        TRANSACTION_INFO_KEY, latestTrxInfo);
-
-    metadataStore.getStore().commitBatchOperation(currentBatchOperation);
-    currentBatchOperation.close();
-    this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
-    // reset batch operation
-    currentBatchOperation = metadataStore.getStore().initBatchOperation();
-
-    DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
-        .getDeletedBlockLog();
-    Preconditions.checkArgument(
-        deletedBlockLog instanceof DeletedBlockLogImpl);
-    ((DeletedBlockLogImpl) deletedBlockLog).onFlush();
+    rwLock.writeLock().lock();
+    try {
+      // write latest trx info into trx table in the same batch
+      Table<String, TransactionInfo> transactionInfoTable
+          = metadataStore.getTransactionInfoTable();
+      transactionInfoTable.putWithBatch(currentBatchOperation,
+          TRANSACTION_INFO_KEY, latestTrxInfo);
+
+      metadataStore.getStore().commitBatchOperation(currentBatchOperation);
+      currentBatchOperation.close();
+      this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
+      // reset batch operation
+      currentBatchOperation = metadataStore.getStore().initBatchOperation();
+
+      DeletedBlockLog deletedBlockLog = scm.getScmBlockManager()
+          .getDeletedBlockLog();
+      Preconditions.checkArgument(
+          deletedBlockLog instanceof DeletedBlockLogImpl);
+      ((DeletedBlockLogImpl) deletedBlockLog).onFlush();
+    } finally {
+      rwLock.writeLock().unlock();
+    }
   }
 
   @Override
   public void init() throws IOException {
     metadataStore = scm.getScmMetadataStore();
 
-    // initialize a batch operation during construction time
-    currentBatchOperation = this.metadataStore.getStore().initBatchOperation();
-    latestTrxInfo = this.metadataStore.getTransactionInfoTable()
-        .get(TRANSACTION_INFO_KEY);
-    if (latestTrxInfo == null) {
-      // transaction table is empty
-      latestTrxInfo =
-          TransactionInfo
-              .builder()
-              .setTransactionIndex(-1)
-              .setCurrentTerm(0)
-              .build();
+    rwLock.writeLock().lock();
+    try {
+      // initialize a batch operation during construction time
+      currentBatchOperation = this.metadataStore.getStore().
+          initBatchOperation();
+      latestTrxInfo = this.metadataStore.getTransactionInfoTable()
+          .get(TRANSACTION_INFO_KEY);
+      if (latestTrxInfo == null) {
+        // transaction table is empty
+        latestTrxInfo =
+            TransactionInfo
+                .builder()
+                .setTransactionIndex(-1)
+                .setCurrentTerm(0)
+                .build();
+      }
+      latestSnapshot = latestTrxInfo.toSnapshotInfo();
+    } finally {
+      rwLock.writeLock().unlock();
     }
-    latestSnapshot = latestTrxInfo.toSnapshotInfo();
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
index 10d6f29ef8..2387fcdf7f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 // TODO: Move this class to test package after fixing Recon
 /**
@@ -32,6 +33,7 @@ import java.io.IOException;
 public class SCMHADBTransactionBufferStub implements SCMHADBTransactionBuffer {
   private DBStore dbStore;
   private BatchOperation currentBatchOperation;
+  private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
 
   public SCMHADBTransactionBufferStub() {
   }
@@ -54,13 +56,23 @@ public class SCMHADBTransactionBufferStub implements 
SCMHADBTransactionBuffer {
   @Override
   public <KEY, VALUE> void addToBuffer(
       Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
-    table.putWithBatch(getCurrentBatchOperation(), key, value);
+    rwLock.readLock().lock();
+    try {
+      table.putWithBatch(getCurrentBatchOperation(), key, value);
+    } finally {
+      rwLock.readLock().unlock();
+    }
   }
 
   @Override
   public <KEY, VALUE> void removeFromBuffer(Table<KEY, VALUE> table, KEY key)
       throws IOException {
-    table.deleteWithBatch(getCurrentBatchOperation(), key);
+    rwLock.readLock().lock();
+    try {
+      table.deleteWithBatch(getCurrentBatchOperation(), key);
+    } finally {
+      rwLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -86,9 +98,14 @@ public class SCMHADBTransactionBufferStub implements 
SCMHADBTransactionBuffer {
   @Override
   public void flush() throws IOException {
     if (dbStore != null) {
-      dbStore.commitBatchOperation(getCurrentBatchOperation());
-      currentBatchOperation.close();
-      currentBatchOperation = null;
+      rwLock.writeLock().lock();
+      try {
+        dbStore.commitBatchOperation(getCurrentBatchOperation());
+        currentBatchOperation.close();
+        currentBatchOperation = null;
+      } finally {
+        rwLock.writeLock().unlock();
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to