errose28 commented on code in PR #4029:
URL: https://github.com/apache/ozone/pull/4029#discussion_r1098036622


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBRWBatchOperation.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadWriteBatch;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator.managed;
+
+/**
+ * Read Write Batch operation implementation for rocks db.
+ */
+public class RDBRWBatchOperation implements RWBatchOperation {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RDBRWBatchOperation.class);
+  private final ManagedReadWriteBatch writeBatch;
+  
+  private final AtomicLong operationCount = new AtomicLong(0);
+  
+  private final AtomicBoolean isActive = new AtomicBoolean(true);
+
+  private final Object lock = new Object();
+
+  public RDBRWBatchOperation() {
+    writeBatch = new ManagedReadWriteBatch();
+  }
+
+  @Override
+  public void commit(RocksDatabase db) throws IOException {
+    db.batchWrite(writeBatch);
+  }
+
+  @Override
+  public void commit(RocksDatabase db, ManagedWriteOptions writeOptions)
+      throws IOException {
+    db.batchWrite(writeBatch, writeOptions);
+  }
+
+  @Override
+  public void close() {
+    synchronized (lock) {
+      isActive.set(false);
+    }
+
+    waitForNoOperation();
+    writeBatch.close();
+  }

Review Comment:
   Instead of spin waiting on close and holding up the caller indefinitely, can 
we we make close use `operationCount` as a reference counter? Close would 
either decrement the count if it is non-zero, or close the object if it is 
zero. The count could be incremented once in the constructor, and once in each 
call to `newIteratorWithBase`. Then the underlying RocksDB object would be 
closed on the last user's call to `close`, whether that is the buffer or the 
iterator. The lock/unlock operation methods would no longer be needed since the 
class would internalize the functionality.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RWBatchOperation.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.rocksdb.ColumnFamilyHandle;
+
+/**
+ * Class represents a read write batch operation,
+ * collects multiple db operation.
+ */
+public interface RWBatchOperation extends BatchOperation {
+  ManagedRocksIterator newIteratorWithBase(
+      ColumnFamilyHandle handle, ManagedRocksIterator newIterator)
+      throws IOException;
+
+  void lockOperation() throws IOException;
+  
+  void releaseOperation();

Review Comment:
   When iterating with the batch, does the error happen just when the batch is 
closed, or does it also happen if the batch is committed but not closed? 
Currently `SCMHADBTransactionBufferImpl#flush` does not block the flush if 
iteration is ongoing. It only blocks the close.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBRWBatchOperation.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadWriteBatch;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator.managed;
+
+/**
+ * Read Write Batch operation implementation for rocks db.
+ */
+public class RDBRWBatchOperation implements RWBatchOperation {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RDBRWBatchOperation.class);
+  private final ManagedReadWriteBatch writeBatch;
+  
+  private final AtomicLong operationCount = new AtomicLong(0);
+  
+  private final AtomicBoolean isActive = new AtomicBoolean(true);
+
+  private final Object lock = new Object();
+
+  public RDBRWBatchOperation() {
+    writeBatch = new ManagedReadWriteBatch();
+  }
+
+  @Override
+  public void commit(RocksDatabase db) throws IOException {
+    db.batchWrite(writeBatch);
+  }
+
+  @Override
+  public void commit(RocksDatabase db, ManagedWriteOptions writeOptions)
+      throws IOException {
+    db.batchWrite(writeBatch, writeOptions);
+  }
+
+  @Override
+  public void close() {
+    synchronized (lock) {
+      isActive.set(false);
+    }
+
+    waitForNoOperation();
+    writeBatch.close();
+  }
+
+  private void waitForNoOperation() {
+    while (operationCount.get() > 0) {
+      try {
+        Thread.sleep(1);
+      } catch (InterruptedException ex) {
+        LOG.error("RWBatch Interrupted exception while wait", ex);
+        return;
+      }
+    }
+  }
+
+  @Override
+  public void delete(ColumnFamily family, byte[] key) throws IOException {
+    family.batchDelete(writeBatch, key);
+  }
+
+  @Override
+  public void put(ColumnFamily family, byte[] key, byte[] value)
+      throws IOException {
+    family.batchPut(writeBatch, key, value);
+  }
+
+  @Override
+  public ManagedRocksIterator newIteratorWithBase(
+      ColumnFamilyHandle handle, ManagedRocksIterator newIterator)
+      throws IOException {
+    synchronized (lock) {
+      if (!isActive.get()) {
+        throw new IOException("RWBatch is closed, retry");
+      }
+      return managed(writeBatch.newIteratorWithBase(handle, 
newIterator.get()));
+    }
+  }
+
+  @Override
+  public void lockOperation() throws IOException {
+    synchronized (lock) {
+      if (!isActive.get()) {
+        throw new IOException("RWBatch is closed, retry");
+      }

Review Comment:
   I don't see retry logic implemented farther up the stack, so this would 
probably just crash SCM. If the intent is to crash since this is an error then 
a check with `Preconditions` and corresponding error message might be better.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBRWStoreIterator.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+
+/**
+ * RocksDB ReadWrite store iterator.
+ */
+public class RDBRWStoreIterator extends RDBStoreIterator {
+  private RWBatchOperation rwBatch;
+
+  public RDBRWStoreIterator(
+      ManagedRocksIterator iterator, RWBatchOperation rwBatch,
+      RDBTable table) throws IOException {
+    super();
+    initIterator(iterator, table);

Review Comment:
   Can use the `RDBStoreIterator` constructor that takes an iterator and table 
instance instead of this new method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to