klsince commented on code in PR #11692:
URL: https://github.com/apache/pinot/pull/11692#discussion_r1338061446


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -702,29 +674,50 @@ public void removeExpiredPrimaryKeys() {
    */
   protected abstract void doRemoveExpiredPrimaryKeys();
 
+  protected synchronized boolean startOperation() {
+    if (_stopped || _numPendingOperations == 0) {
+      return false;
+    }
+    _numPendingOperations++;
+    return true;
+  }
+
+  protected synchronized void finishOperation() {
+    _numPendingOperations--;
+    if (_numPendingOperations == 0) {
+      notifyAll();
+    }
+  }
+
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    if (_stopped) {
+      _logger.warn("Metadata manager is already stopped");
+      return;
+    }
     _stopped = true;
-    int numPendingOperations = _numPendingOperations.decrementAndGet();
+    _numPendingOperations--;
     _logger.info("Stopped the metadata manager with {} pending operations, 
current primary key count: {}",
-        numPendingOperations, getNumPrimaryKeys());
+        _numPendingOperations, getNumPrimaryKeys());
   }
 
   @Override
-  public void close()
+  public synchronized void close()
       throws IOException {
     Preconditions.checkState(_stopped, "Must stop the metadata manager before 
closing it");
+    if (_closed) {
+      _logger.warn("Metadata manager is already closed");
+      return;
+    }
+    _closed = true;
     _logger.info("Closing the metadata manager");
-    synchronized (_numPendingOperations) {
-      int numPendingOperations;
-      while ((numPendingOperations = _numPendingOperations.get()) != 0) {
-        _logger.info("Waiting for {} pending operations to finish", 
numPendingOperations);
-        try {
-          _numPendingOperations.wait();
-        } catch (InterruptedException e) {
-          throw new RuntimeException(
-              String.format("Interrupted while waiting for %d pending 
operations to finish", numPendingOperations), e);
-        }
+    while (_numPendingOperations != 0) {
+      _logger.info("Waiting for {} pending operations to finish", 
_numPendingOperations);
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            String.format("Interrupted while waiting for %d pending operations 
to finish", _numPendingOperations), e);
       }
     }
     doClose();

Review Comment:
   ah!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to