This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9ed9f219db Remove the global lock in SegmentCompletionManager (#11679)
9ed9f219db is described below
commit 9ed9f219dbb81b14e937654cc7b9c2e82190fc90
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Sep 26 12:10:52 2023 -0700
Remove the global lock in SegmentCompletionManager (#11679)
---
.../core/realtime/SegmentCompletionManager.java | 78 +++++++---------------
1 file changed, 25 insertions(+), 53 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index baab4b6ab7..8900ffef5c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,14 +19,13 @@
package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -78,8 +77,6 @@ public class SegmentCompletionManager {
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
- private final Lock[] _fsmLocks;
- private static final int NUM_FSM_LOCKS = 20;
// Half hour max commit time for all segments
private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -98,12 +95,8 @@ public class SegmentCompletionManager {
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
- SegmentCompletionProtocol
-
.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds,
TimeUnit.SECONDS));
- _fsmLocks = new Lock[NUM_FSM_LOCKS];
- for (int i = 0; i < NUM_FSM_LOCKS; i++) {
- _fsmLocks[i] = new ReentrantLock();
- }
+ SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
+ TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds,
TimeUnit.SECONDS));
}
public String getControllerVipUrl() {
@@ -122,51 +115,30 @@ public class SegmentCompletionManager {
return
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
}
- // We need to make sure that we never create multiple FSMs for the same
segment
- // Obtain locks based on segment name, so as to disallow same segment names
entering together
- private SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName
segmentName, String msgType) {
- final String segmentNameStr = segmentName.getSegmentName();
-
- int lockIndex = (segmentNameStr.hashCode() & Integer.MAX_VALUE) %
NUM_FSM_LOCKS;
- Lock lock = _fsmLocks[lockIndex];
+ private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName
llcSegmentName, String msgType) {
+ return _fsmMap.computeIfAbsent(llcSegmentName.getSegmentName(), k ->
createFsm(llcSegmentName, msgType));
+ }
+ private SegmentCompletionFSM createFsm(LLCSegmentName llcSegmentName, String
msgType) {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
+ String segmentName = llcSegmentName.getSegmentName();
+ SegmentZKMetadata segmentMetadata =
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
+ Preconditions.checkState(segmentMetadata != null, "Failed to find ZK
metadata for segment: %s", segmentName);
SegmentCompletionFSM fsm;
- try {
- lock.lock();
- fsm = _fsmMap.get(segmentNameStr);
- if (fsm == null) {
- // Look up propertystore to see if this is a completed segment
- // TODO if we keep a list of last few committed segments, we don't
need to go to zk for this.
- final String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
- SegmentZKMetadata segmentMetadata =
- _segmentManager.getSegmentZKMetadata(realtimeTableName,
segmentName.getSegmentName(), null);
- if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
- // Best to go through the state machine for this case as well, so
that all code regarding state handling is
- // in one place
- // Also good for synchronization, because it is possible that
multiple threads take this path, and we don't
- // want
- // multiple instances of the FSM to be created for the same commit
sequence at the same time.
- StreamPartitionMsgOffsetFactory factory =
getStreamPartitionMsgOffsetFactory(segmentName);
- final StreamPartitionMsgOffset endOffset =
factory.create(segmentMetadata.getEndOffset());
- fsm = SegmentCompletionFSM
- .fsmInCommit(_segmentManager, this, segmentName,
segmentMetadata.getNumReplicas(), endOffset);
- } else if
(msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
- fsm = SegmentCompletionFSM
- .fsmStoppedConsuming(_segmentManager, this, segmentName,
segmentMetadata.getNumReplicas());
- } else {
- // Segment is in the process of completing, and this is the first
one to respond. Create fsm
- fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this,
segmentName, segmentMetadata.getNumReplicas());
- }
- LOGGER.info("Created FSM {}", fsm);
- _fsmMap.put(segmentNameStr, fsm);
- }
- } catch (Exception e) {
- // Server gone wonky. Segment does not exist in propstore
- LOGGER.error("Exception getting FSM for segment {}", segmentNameStr, e);
- throw new RuntimeException("Exception getting FSM for segment " +
segmentNameStr, e);
- } finally {
- lock.unlock();
- }
+ if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
+ // Segment is already committed
+ StreamPartitionMsgOffsetFactory factory =
getStreamPartitionMsgOffsetFactory(llcSegmentName);
+ StreamPartitionMsgOffset endOffset =
factory.create(segmentMetadata.getEndOffset());
+ fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this,
llcSegmentName, segmentMetadata.getNumReplicas(),
+ endOffset);
+ } else if
(msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
+ fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this,
llcSegmentName,
+ segmentMetadata.getNumReplicas());
+ } else {
+ // Segment is in the process of completing, and this is the first one to
respond. Create fsm
+ fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this,
llcSegmentName, segmentMetadata.getNumReplicas());
+ }
+ LOGGER.info("Created FSM {}", fsm);
return fsm;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]