This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ed9f219db Remove the global lock in SegmentCompletionManager (#11679)
9ed9f219db is described below

commit 9ed9f219dbb81b14e937654cc7b9c2e82190fc90
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Sep 26 12:10:52 2023 -0700

    Remove the global lock in SegmentCompletionManager (#11679)
---
 .../core/realtime/SegmentCompletionManager.java    | 78 +++++++---------------
 1 file changed, 25 insertions(+), 53 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index baab4b6ab7..8900ffef5c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,14 +19,13 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -78,8 +77,6 @@ public class SegmentCompletionManager {
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
   private final LeadControllerManager _leadControllerManager;
-  private final Lock[] _fsmLocks;
-  private static final int NUM_FSM_LOCKS = 20;
 
   // Half hour max commit time for all segments
   private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -98,12 +95,8 @@ public class SegmentCompletionManager {
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
     _leadControllerManager = leadControllerManager;
-    SegmentCompletionProtocol
-        
.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds,
 TimeUnit.SECONDS));
-    _fsmLocks = new Lock[NUM_FSM_LOCKS];
-    for (int i = 0; i < NUM_FSM_LOCKS; i++) {
-      _fsmLocks[i] = new ReentrantLock();
-    }
+    SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
+        TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, 
TimeUnit.SECONDS));
   }
 
   public String getControllerVipUrl() {
@@ -122,51 +115,30 @@ public class SegmentCompletionManager {
     return 
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
   }
 
-  // We need to make sure that we never create multiple FSMs for the same 
segment
-  // Obtain locks based on segment name, so as to disallow same segment names 
entering together
-  private SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName 
segmentName, String msgType) {
-    final String segmentNameStr = segmentName.getSegmentName();
-
-    int lockIndex = (segmentNameStr.hashCode() & Integer.MAX_VALUE) % 
NUM_FSM_LOCKS;
-    Lock lock = _fsmLocks[lockIndex];
+  private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName 
llcSegmentName, String msgType) {
+    return _fsmMap.computeIfAbsent(llcSegmentName.getSegmentName(), k -> 
createFsm(llcSegmentName, msgType));
+  }
 
+  private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String 
msgType) {
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
+    String segmentName = llcSegmentName.getSegmentName();
+    SegmentZKMetadata segmentMetadata = 
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
+    Preconditions.checkState(segmentMetadata != null, "Failed to find ZK 
metadata for segment: %s", segmentName);
     SegmentCompletionFSM fsm;
-    try {
-      lock.lock();
-      fsm = _fsmMap.get(segmentNameStr);
-      if (fsm == null) {
-        // Look up propertystore to see if this is a completed segment
-        // TODO if we keep a list of last few committed segments, we don't 
need to go to zk for this.
-        final String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
-        SegmentZKMetadata segmentMetadata =
-            _segmentManager.getSegmentZKMetadata(realtimeTableName, 
segmentName.getSegmentName(), null);
-        if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
-          // Best to go through the state machine for this case as well, so 
that all code regarding state handling is
-          // in one place
-          // Also good for synchronization, because it is possible that 
multiple threads take this path, and we don't
-          // want
-          // multiple instances of the FSM to be created for the same commit 
sequence at the same time.
-          StreamPartitionMsgOffsetFactory factory = 
getStreamPartitionMsgOffsetFactory(segmentName);
-          final StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
-          fsm = SegmentCompletionFSM
-              .fsmInCommit(_segmentManager, this, segmentName, 
segmentMetadata.getNumReplicas(), endOffset);
-        } else if 
(msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
-          fsm = SegmentCompletionFSM
-              .fsmStoppedConsuming(_segmentManager, this, segmentName, 
segmentMetadata.getNumReplicas());
-        } else {
-          // Segment is in the process of completing, and this is the first 
one to respond. Create fsm
-          fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, 
segmentName, segmentMetadata.getNumReplicas());
-        }
-        LOGGER.info("Created FSM {}", fsm);
-        _fsmMap.put(segmentNameStr, fsm);
-      }
-    } catch (Exception e) {
-      // Server gone wonky. Segment does not exist in propstore
-      LOGGER.error("Exception getting FSM for segment {}", segmentNameStr, e);
-      throw new RuntimeException("Exception getting FSM for segment " + 
segmentNameStr, e);
-    } finally {
-      lock.unlock();
-    }
+    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
+      // Segment is already committed
+      StreamPartitionMsgOffsetFactory factory = 
getStreamPartitionMsgOffsetFactory(llcSegmentName);
+      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
+      fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, 
llcSegmentName, segmentMetadata.getNumReplicas(),
+          endOffset);
+    } else if 
(msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
+      fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, 
llcSegmentName,
+          segmentMetadata.getNumReplicas());
+    } else {
+      // Segment is in the process of completing, and this is the first one to 
respond. Create fsm
+      fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, 
llcSegmentName, segmentMetadata.getNumReplicas());
+    }
+    LOGGER.info("Created FSM {}", fsm);
     return fsm;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to