This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a9cd82b98 Make segment completion FSM pluggable (#14088)
2a9cd82b98 is described below
commit 2a9cd82b9849bd7e9b6db5b7403b502632b7b41d
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Dec 13 18:32:19 2024 +0530
Make segment completion FSM pluggable (#14088)
* WIP: Make Completion FSM pluggable
* Add factory and make all test pass
* Fix test failures
* Add javadoc comments
* Simplify fsm factory by just storing class names instead of constructors
* More refactoring plus removed hardcoded
* rebase with master
* Fix schemes
* refactor
* Refactoring
* Make segment completion protocol configurable from table config
* fix bug
---------
Co-authored-by: Kartik Khare <[email protected]>
---
.../pinot/controller/BaseControllerStarter.java | 6 +-
.../realtime/BlockingSegmentCompletionFSM.java | 902 ++++++++++++++++++++
.../core/realtime/SegmentCompletionConfig.java | 58 ++
.../helix/core/realtime/SegmentCompletionFSM.java | 133 +++
.../core/realtime/SegmentCompletionFSMFactory.java | 123 +++
.../core/realtime/SegmentCompletionManager.java | 921 +--------------------
.../helix/core/realtime/SegmentCompletionTest.java | 4 +-
.../pinot/spi/stream/StreamConfigProperties.java | 5 +
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
9 files changed, 1272 insertions(+), 882 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 9a48b6ad6f..342413d355 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -99,6 +99,7 @@ import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
@@ -489,9 +490,12 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config,
_controllerMetrics);
// TODO: Need to put this inside HelixResourceManager when
HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+
+ SegmentCompletionConfig segmentCompletionConfig = new
SegmentCompletionConfig(_config);
+
_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager,
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
- _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
+ _leadControllerManager, _config.getSegmentCommitTimeoutSeconds(),
segmentCompletionConfig);
_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
new file mode 100644
index 0000000000..b119928a46
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
@@ -0,0 +1,902 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
+import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class implements the FSM on the controller side for each completing
segment.
+ *
+ * An FSM is is created when we first hear about a segment (typically through
the segmentConsumed message).
+ * When an FSM is created, it may have one of two start states (HOLDING, or
COMMITTED), depending on the
+ * constructor used.
+ *
+ * We kick off an FSM in the COMMITTED state (rare) when we find that
PROPERTYSTORE already has the segment
+ * with the Status set to DONE.
+ *
+ * We kick off an FSM in the HOLDING state (typical) when a sementConsumed()
message arrives from the
+ * first server we hear from.
+ *
+ * The FSM does not have a timer. It is clocked by the servers, which,
typically, are retransmitting their
+ * segmentConsumed() message every so often
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS).
+ *
+ * See https://github.com/linkedin/pinot/wiki/Low-level-kafka-consumers
+ */
+public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(BlockingSegmentCompletionFSM.class);
+
+ public enum BlockingSegmentCompletionFSMState {
+ PARTIAL_CONSUMING, // Indicates that at least one replica has reported
that it has stopped consuming.
+ HOLDING, // the segment has started finalizing.
+ COMMITTER_DECIDED, // We know who the committer will be, we will let them
know next time they call segmentConsumed()
+ COMMITTER_NOTIFIED, // we notified the committer to commit.
+ COMMITTER_UPLOADING, // committer is uploading.
+ COMMITTING, // we are in the process of committing to zk
+ COMMITTED, // We already committed a segment.
+ ABORTED, // state machine is aborted. we will start a fresh one when
the next segmentConsumed comes in.
+ }
+
+ // We will have some variation between hosts, so we add 10% to the max hold
time to pick a winner.
+ // If there is more than 10% variation, then it is handled as an error case
(i.e. the first few to
+ // come in will have a winner, and the later ones will just download the
segment)
+ private static final long MAX_TIME_TO_PICK_WINNER_MS =
+ SegmentCompletionProtocol.MAX_HOLD_TIME_MS +
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
+
+ // Once we pick a winner, the winner may get notified in the next call, so
add one hold time plus some.
+ // It may be that the winner is not the server that we are currently
processing a segmentConsumed()
+ // message from. In that case, we will wait for the next segmetnConsumed()
message from the picked winner.
+ // If the winner does not come back to us within that time, we abort the
state machine and start over.
+ private static final long MAX_TIME_TO_NOTIFY_WINNER_MS =
+ MAX_TIME_TO_PICK_WINNER_MS + SegmentCompletionProtocol.MAX_HOLD_TIME_MS
+ (
+ SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
+
+ public final Logger _logger;
+
+ BlockingSegmentCompletionFSMState _state =
BlockingSegmentCompletionFSMState.HOLDING;
+ // Typically start off in HOLDING state.
+ final long _startTimeMs;
+ private final LLCSegmentName _segmentName;
+ private final String _rawTableName;
+ private final String _realtimeTableName;
+ private final int _numReplicas;
+ private final Set<String> _excludedServerStateMap;
+ private final Map<String, StreamPartitionMsgOffset> _commitStateMap;
+ private final StreamPartitionMsgOffsetFactory
_streamPartitionMsgOffsetFactory;
+ private StreamPartitionMsgOffset _winningOffset = null;
+ private String _winner;
+ private final PinotLLCRealtimeSegmentManager _segmentManager;
+ private final SegmentCompletionManager _segmentCompletionManager;
+ private final long _maxTimeToPickWinnerMs;
+ private final long _maxTimeToNotifyWinnerMs;
+ private final long _initialCommitTimeMs;
+ // Once the winner is notified, they are expected to commit right away. At
this point, it is the segment build
+ // time that we need to consider.
+ // We may need to add some time here to allow for getting the lock? For now 0
+ // We may need to add some time for the committer come back to us (after the
build)? For now 0.
+ private long _maxTimeAllowedToCommitMs;
+ private final String _controllerVipUrl;
+
+ public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager
segmentManager,
+ SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName,
+ SegmentZKMetadata segmentMetadata) {
+ _segmentName = segmentName;
+ _rawTableName = _segmentName.getTableName();
+ _realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
+ _numReplicas = segmentMetadata.getNumReplicas();
+ _segmentManager = segmentManager;
+ _commitStateMap = new HashMap<>(HashUtil.getHashMapCapacity(_numReplicas));
+ _excludedServerStateMap = new HashSet<>(_numReplicas);
+ _segmentCompletionManager = segmentCompletionManager;
+ _startTimeMs = _segmentCompletionManager.getCurrentTimeMs();
+ _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
+ _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
+ _streamPartitionMsgOffsetFactory =
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
+ long initialCommitTimeMs = MAX_TIME_TO_NOTIFY_WINNER_MS +
_segmentManager.getCommitTimeoutMS(_realtimeTableName);
+ Long savedCommitTime =
_segmentCompletionManager.getCommitTime(_rawTableName);
+ if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
+ initialCommitTimeMs = savedCommitTime;
+ }
+ _logger = LoggerFactory.getLogger("SegmentCompletionFSM_" +
segmentName.getSegmentName());
+ int maxCommitTimeForAllSegmentsSeconds =
SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds();
+ if (initialCommitTimeMs > maxCommitTimeForAllSegmentsSeconds * 1000L) {
+ // The table has a really high value configured for max commit time. Set
it to a higher value than default
+ // and go from there.
+ _logger
+ .info("Configured max commit time {}s too high for table {},
changing to {}s", initialCommitTimeMs / 1000,
+ _realtimeTableName, maxCommitTimeForAllSegmentsSeconds);
+ initialCommitTimeMs = maxCommitTimeForAllSegmentsSeconds * 1000L;
+ }
+ _initialCommitTimeMs = initialCommitTimeMs;
+ _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
+ _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
+
+ if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
+ StreamPartitionMsgOffsetFactory factory =
+
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
+ StreamPartitionMsgOffset endOffset =
factory.create(segmentMetadata.getEndOffset());
+ _state = BlockingSegmentCompletionFSMState.COMMITTED;
+ _winningOffset = endOffset;
+ _winner = "UNKNOWN";
+ }
+ }
+
+ @Override
+ public void transitionToInitialState(String msgType) {
+ if (_state == BlockingSegmentCompletionFSMState.COMMITTED) {
+ // Already set; no need to do anything here.
+ return;
+ }
+
+ // If we receive a STOPPED_CONSUMING message before any others, switch to
PARTIAL_CONSUMING
+ if (SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING.equals(msgType)) {
+ _state = BlockingSegmentCompletionFSMState.PARTIAL_CONSUMING;
+ }
+ // Otherwise, we remain in HOLDING
+ }
+
+ @Override
+ public String toString() {
+ return "{" + _segmentName.getSegmentName() + "," + _state + "," +
_startTimeMs + "," + _winner + ","
+ + _winningOffset + "," + _controllerVipUrl + "}";
+ }
+
+ // SegmentCompletionManager releases the FSM from the hashtable when it is
done.
+ public boolean isDone() {
+ return _state.equals(BlockingSegmentCompletionFSMState.COMMITTED) ||
_state.equals(
+ BlockingSegmentCompletionFSMState.ABORTED);
+ }
+
+ /*
+ * We just heard from a server that it has reached completion stage, and is
reporting the offset
+ * that the server is at. Since multiple servers can come in at the same
time for this segment,
+ * we need to synchronize on the FSM to handle the messages. The processing
time itself is small,
+ * so we should be OK with this synchronization.
+ */
+ @Override
+ public SegmentCompletionProtocol.Response segmentConsumed(String instanceId,
StreamPartitionMsgOffset offset,
+ final String stopReason) {
+ final long now = _segmentCompletionManager.getCurrentTimeMs();
+ // We can synchronize the entire block for the SegmentConsumed message.
+ synchronized (this) {
+ _logger.info("Processing segmentConsumed({}, {})", instanceId, offset);
+ if (_excludedServerStateMap.contains(instanceId)) {
+ // Could be that the server was restarted, and it started consuming
again, and somehow got to complete
+ // consumption up to this point. We will accept it.
+ _logger.info("Marking instance {} alive again", instanceId);
+ _excludedServerStateMap.remove(instanceId);
+ }
+ _commitStateMap.put(instanceId, offset);
+ switch (_state) {
+ case PARTIAL_CONSUMING:
+ return partialConsumingConsumed(instanceId, offset, now, stopReason);
+
+ case HOLDING:
+ return holdingConsumed(instanceId, offset, now, stopReason);
+
+ case COMMITTER_DECIDED: // This must be a retransmit
+ return committerDecidedConsumed(instanceId, offset, now);
+
+ case COMMITTER_NOTIFIED:
+ return committerNotifiedConsumed(instanceId, offset, now);
+
+ case COMMITTER_UPLOADING:
+ return committerUploadingConsumed(instanceId, offset, now);
+
+ case COMMITTING:
+ return committingConsumed(instanceId, offset, now);
+
+ case COMMITTED:
+ return committedConsumed(instanceId, offset);
+
+ case ABORTED:
+ // FSM has been aborted, just return HOLD
+ return hold(instanceId, offset);
+
+ default:
+ return fail(instanceId, offset);
+ }
+ }
+ }
+
+ /*
+ * A server has sent segmentConsumed() message. The caller will save the
segment if we return
+ * COMMIT_CONTINUE. We need to verify that it is the same server that we
notified as the winner
+ * and the offset is the same as what is coming in with the commit. We can
then move to
+ * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
+ *
+ * In case of discrepancy we move the state machine to ABORTED state so that
this FSM is removed
+ * from the map, and things start over. In this case, we respond to the
server with a 'hold' so
+ * that they re-transmit their segmentConsumed() message and start over.
+ */
+ @Override
+ public SegmentCompletionProtocol.Response segmentCommitStart(String
instanceId, StreamPartitionMsgOffset offset) {
+ long now = _segmentCompletionManager.getCurrentTimeMs();
+ if (_excludedServerStateMap.contains(instanceId)) {
+ _logger.warn("Not accepting commit from {} since it had stoppd
consuming", instanceId);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+ synchronized (this) {
+ _logger.info("Processing segmentCommitStart({}, {})", instanceId,
offset);
+ switch (_state) {
+ case PARTIAL_CONSUMING:
+ return partialConsumingCommit(instanceId, offset, now);
+
+ case HOLDING:
+ return holdingCommit(instanceId, offset, now);
+
+ case COMMITTER_DECIDED:
+ return committerDecidedCommit(instanceId, offset, now);
+
+ case COMMITTER_NOTIFIED:
+ return committerNotifiedCommit(instanceId, offset, now);
+
+ case COMMITTER_UPLOADING:
+ return committerUploadingCommit(instanceId, offset, now);
+
+ case COMMITTING:
+ return committingCommit(instanceId, offset, now);
+
+ case COMMITTED:
+ return committedCommit(instanceId, offset);
+
+ case ABORTED:
+ return hold(instanceId, offset);
+
+ default:
+ return fail(instanceId, offset);
+ }
+ }
+ }
+
+ @Override
+ public SegmentCompletionProtocol.Response stoppedConsuming(String
instanceId, StreamPartitionMsgOffset offset,
+ String reason) {
+ synchronized (this) {
+ _logger.info("Processing stoppedConsuming({}, {})", instanceId, offset);
+ _excludedServerStateMap.add(instanceId);
+ switch (_state) {
+ case PARTIAL_CONSUMING:
+ return partialConsumingStoppedConsuming(instanceId, offset, reason);
+
+ case HOLDING:
+ return holdingStoppedConsuming(instanceId, offset, reason);
+
+ case COMMITTER_DECIDED:
+ return committerDecidedStoppedConsuming(instanceId, offset, reason);
+
+ case COMMITTER_NOTIFIED:
+ return committerNotifiedStoppedConsuming(instanceId, offset, reason);
+
+ case COMMITTER_UPLOADING:
+ return committerUploadingStoppedConsuming(instanceId, offset,
reason);
+
+ case COMMITTING:
+ return committingStoppedConsuming(instanceId, offset, reason);
+
+ case COMMITTED:
+ return committedStoppedConsuming(instanceId, offset, reason);
+
+ case ABORTED:
+ _logger.info("Ignoring StoppedConsuming message from {} in state
{}", instanceId, _state);
+ return SegmentCompletionProtocol.RESP_PROCESSED;
+
+ default:
+ return fail(instanceId, offset);
+ }
+ }
+ }
+
+ @Override
+ public SegmentCompletionProtocol.Response extendBuildTime(final String
instanceId,
+ final StreamPartitionMsgOffset offset, final int extTimeSec) {
+ final long now = _segmentCompletionManager.getCurrentTimeMs();
+ synchronized (this) {
+ _logger.info("Processing extendBuildTime({}, {}, {})", instanceId,
offset, extTimeSec);
+ switch (_state) {
+ case PARTIAL_CONSUMING:
+ case HOLDING:
+ case COMMITTER_DECIDED:
+ return fail(instanceId, offset);
+ case COMMITTER_NOTIFIED:
+ return committerNotifiedExtendBuildTime(instanceId, offset,
extTimeSec, now);
+ case COMMITTER_UPLOADING:
+ case COMMITTING:
+ case COMMITTED:
+ case ABORTED:
+ default:
+ return fail(instanceId, offset);
+ }
+ }
+ }
+
+ /*
+ * We can get this call only when the state is COMMITTER_UPLOADING. Also,
the instanceId should be equal to
+ * the _winner.
+ */
+ @Override
+ public SegmentCompletionProtocol.Response
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
+ CommittingSegmentDescriptor committingSegmentDescriptor) {
+ String instanceId = reqParams.getInstanceId();
+ StreamPartitionMsgOffset offset =
+
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
+ synchronized (this) {
+ if (_excludedServerStateMap.contains(instanceId)) {
+ _logger.warn("Not accepting commitEnd from {} since it had stoppd
consuming", instanceId);
+ return abortAndReturnFailed();
+ }
+ _logger.info("Processing segmentCommitEnd({}, {})", instanceId, offset);
+ if
(!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING) ||
!instanceId.equals(_winner)
+ || offset.compareTo(_winningOffset) != 0) {
+ // State changed while we were out of sync. return a failed commit.
+ _logger.warn("State change during upload: state={} segment={}
winner={} winningOffset={}", _state,
+ _segmentName.getSegmentName(), _winner, _winningOffset);
+ return abortAndReturnFailed();
+ }
+ SegmentCompletionProtocol.Response response =
+ commitSegment(reqParams, committingSegmentDescriptor);
+ if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
+ return abortAndReturnFailed();
+ } else {
+ return response;
+ }
+ }
+ }
+
+ // Helper methods that log the current state and the response sent
+ private SegmentCompletionProtocol.Response fail(String instanceId,
StreamPartitionMsgOffset offset) {
+ _logger.info("{}:FAIL for instance={} offset={}", _state, instanceId,
offset);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+
+ private SegmentCompletionProtocol.Response commit(String instanceId,
StreamPartitionMsgOffset offset) {
+ long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) /
1000;
+ _logger
+ .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state,
instanceId, offset, allowedBuildTimeSec);
+ SegmentCompletionProtocol.Response.Params params =
+ new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
+ .withBuildTimeSeconds(allowedBuildTimeSec)
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+ .withControllerVipUrl(_controllerVipUrl);
+ return new SegmentCompletionProtocol.Response(params);
+ }
+
+ private SegmentCompletionProtocol.Response discard(String instanceId,
StreamPartitionMsgOffset offset) {
+ _logger.warn("{}:DISCARD for instance={} offset={}", _state, instanceId,
offset);
+ return SegmentCompletionProtocol.RESP_DISCARD;
+ }
+
+ private SegmentCompletionProtocol.Response keep(String instanceId,
StreamPartitionMsgOffset offset) {
+ _logger.info("{}:KEEP for instance={} offset={}", _state, instanceId,
offset);
+ return new SegmentCompletionProtocol.Response(
+ new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
+ }
+
+ private SegmentCompletionProtocol.Response catchup(String instanceId,
StreamPartitionMsgOffset offset) {
+ _logger.info("{}:CATCHUP for instance={} offset={}", _state, instanceId,
offset);
+ return new SegmentCompletionProtocol.Response(
+ new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(_winningOffset.toString())
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
+ }
+
+ private SegmentCompletionProtocol.Response hold(String instanceId,
StreamPartitionMsgOffset offset) {
+ _logger.info("{}:HOLD for instance={} offset={}", _state, instanceId,
offset);
+ return new SegmentCompletionProtocol.Response(new
SegmentCompletionProtocol.Response.Params()
+ .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
+ .withStreamPartitionMsgOffset(offset.toString()));
+ }
+
+ private SegmentCompletionProtocol.Response abortAndReturnHold(long now,
String instanceId,
+ StreamPartitionMsgOffset offset) {
+ _state = BlockingSegmentCompletionFSMState.ABORTED;
+ _segmentCompletionManager.getControllerMetrics()
+ .addMeteredTableValue(_rawTableName,
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+ return hold(instanceId, offset);
+ }
+
+ private SegmentCompletionProtocol.Response abortAndReturnFailed() {
+ _state = BlockingSegmentCompletionFSMState.ABORTED;
+ _segmentCompletionManager.getControllerMetrics()
+ .addMeteredTableValue(_rawTableName,
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+
+ private SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long
now, String instanceId,
+ StreamPartitionMsgOffset offset) {
+ if (now > _maxTimeAllowedToCommitMs) {
+ _logger
+ .warn("{}:Aborting FSM (too late) instance={} offset={} now={}
start={}", _state, instanceId, offset, now,
+ _startTimeMs);
+ return abortAndReturnHold(now, instanceId, offset);
+ }
+ return null;
+ }
+
+ private int numReplicasToLookFor() {
+ return _numReplicas - _excludedServerStateMap.size();
+ }
+
+ private SegmentCompletionProtocol.Response partialConsumingConsumed(String
instanceId,
+ StreamPartitionMsgOffset offset, long now, final String stopReason) {
+ // This is the first time we are getting segmentConsumed() for this
segment.
+ // Some instance thinks we can close this segment, so go to HOLDING state,
and process as normal.
+ // We will just be looking for less replicas.
+ _state = BlockingSegmentCompletionFSMState.HOLDING;
+ return holdingConsumed(instanceId, offset, now, stopReason);
+ }
+
+ /*
+ * This is not a good state to get a commit message, but it is possible that
the controller failed while in
+ * COMMITTER_NOTIFIED state, and the first message we got in the new
controller was a stoppedConsuming
+ * message. As long as the committer is not the one who stopped consuming
(which we have already checked before
+ * coming here), we will trust the server that this is a valid commit.
+ */
+ private SegmentCompletionProtocol.Response partialConsumingCommit(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ // Do the same as HOLDING__commit
+ return processCommitWhileHoldingOrPartialConsuming(instanceId, offset,
now);
+ }
+
+ private SegmentCompletionProtocol.Response
partialConsumingStoppedConsuming(String instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, true);
+ }
+
+ /*
+ * If we have waited "enough", or we have all replicas reported, then we can
pick a winner.
+ *
+ * Otherwise, we ask the server that is reporting to come back again later
until one of these conditions hold.
+ *
+ * If we can pick a winner then we go to COMMITTER_DECIDED or
COMMITTER_NOTIIFIED (if the instance
+ * in this call is the same as winner).
+ *
+ * If we can go to COMMITTER_NOTIFIED then we respond with a COMMIT message,
otherwise with a HOLD message.
+ */
+ private SegmentCompletionProtocol.Response holdingConsumed(String
instanceId, StreamPartitionMsgOffset offset,
+ long now, final String stopReason) {
+ SegmentCompletionProtocol.Response response;
+ // If we are past the max time to pick a winner, or we have heard from all
replicas,
+ // we are ready to pick a winner.
+ if (isWinnerPicked(instanceId, now, stopReason)) {
+ if (_winner.equals(instanceId)) {
+ _logger.info("{}:Committer notified winner instance={} offset={}",
_state, instanceId, offset);
+ response = commit(instanceId, offset);
+ _state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED;
+ } else {
+ _logger.info("{}:Committer decided winner={} offset={}", _state,
_winner, _winningOffset);
+ response = catchup(instanceId, offset);
+ _state = BlockingSegmentCompletionFSMState.COMMITTER_DECIDED;
+ }
+ } else {
+ response = hold(instanceId, offset);
+ }
+ return response;
+ }
+
+ /*
+ * This not a good state to receive a commit message, but then it may be
that the controller
+ * failed over while in the COMMITTER_NOTIFIED state...
+ */
+ private SegmentCompletionProtocol.Response holdingCommit(String instanceId,
StreamPartitionMsgOffset offset,
+ long now) {
+ return processCommitWhileHoldingOrPartialConsuming(instanceId, offset,
now);
+ }
+
+ private SegmentCompletionProtocol.Response holdingStoppedConsuming(String
instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, true);
+ }
+
+ /*
+ * We have already decided who the committer is, but have not let them know
yet. If this is the committer that
+ * we decided, then respond back with COMMIT. Otherwise, if the offset is
smaller, respond back with a CATCHUP.
+ * Otherwise, just have the server HOLD. Since the segment is not committed
yet, we cannot ask them to KEEP or
+ * DISCARD etc. If the committer fails for any reason, we will need a new
committer.
+ */
+ private SegmentCompletionProtocol.Response committerDecidedConsumed(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ if (offset.compareTo(_winningOffset) > 0) {
+ _logger.warn("{}:Aborting FSM (offset larger than winning) instance={}
offset={} now={} winning={}", _state,
+ instanceId, offset, now, _winningOffset);
+ return abortAndReturnHold(now, instanceId, offset);
+ }
+ SegmentCompletionProtocol.Response response;
+ if (_winner.equals(instanceId)) {
+ if (_winningOffset.compareTo(offset) == 0) {
+ _logger.info("{}:Notifying winner instance={} offset={}", _state,
instanceId, offset);
+ response = commit(instanceId, offset);
+ _state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED;
+ } else {
+ // Winner coming back with a different offset.
+ _logger
+ .warn("{}:Winner coming back with different offset for instance={}
offset={} prevWinnOffset={}", _state,
+ instanceId, offset, _winningOffset);
+ response = abortAndReturnHold(now, instanceId, offset);
+ }
+ } else if (offset.compareTo(_winningOffset) == 0) {
+ // Wait until winner has posted the segment.
+ response = hold(instanceId, offset);
+ } else {
+ response = catchup(instanceId, offset);
+ }
+ if (now > _maxTimeToNotifyWinnerMs) {
+ // Winner never got back to us. Abort the completion protocol and start
afresh.
+ // We can potentially optimize here to see if this instance has the
highest so far, and re-elect them to
+ // be winner, but for now, we will abort it and restart
+ response = abortAndReturnHold(now, instanceId, offset);
+ }
+ return response;
+ }
+
+ /*
+ * We have already decided who the committer is, but have not let them know
yet. So, we don't expect
+ * a commit() call here.
+ */
+ private SegmentCompletionProtocol.Response committerDecidedCommit(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ return processCommitWhileHoldingOrPartialConsuming(instanceId, offset,
now);
+ }
+
+ private SegmentCompletionProtocol.Response
committerDecidedStoppedConsuming(String instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, false);
+ }
+
+ /*
+ * We have notified the committer. If we get a consumed message from another
server, we can ask them to
+ * catchup (if the offset is lower). If anything else, then we pretty much
ask them to hold.
+ */
+ private SegmentCompletionProtocol.Response committerNotifiedConsumed(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ SegmentCompletionProtocol.Response response;
+ // We have already picked a winner and notified them but we have not heard
from them yet.
+ // Common case here is that another server is coming back to us with its
offset. We either respond back with
+ // HOLD or CATCHUP.
+ // If the winner is coming back again, then we have some more conditions
to look at.
+ response = abortIfTooLateAndReturnHold(now, instanceId, offset);
+ if (response != null) {
+ return response;
+ }
+ if (instanceId.equals(_winner)) {
+ // Winner is coming back to after holding. Somehow they never heard us
return COMMIT.
+ // Allow them to be winner again, since we are still within time to pick
a winner.
+ if (offset.compareTo(_winningOffset) == 0) {
+ response = commit(instanceId, offset);
+ } else {
+ // Something seriously wrong. Abort the FSM
+ response = discard(instanceId, offset);
+ _logger.warn("{}:Aborting for instance={} offset={}", _state,
instanceId, offset);
+ _state = BlockingSegmentCompletionFSMState.ABORTED;
+ }
+ } else {
+ // Common case: A different instance is reporting.
+ if (offset.compareTo(_winningOffset) == 0) {
+ // Wait until winner has posted the segment before asking this server
to KEEP the segment.
+ response = hold(instanceId, offset);
+ } else if (offset.compareTo(_winningOffset) < 0) {
+ response = catchup(instanceId, offset);
+ } else {
+ // We have not yet committed, so ask the new responder to hold. They
may be the new leader in case the
+ // committer fails.
+ response = hold(instanceId, offset);
+ }
+ }
+ return response;
+ }
+
+ /*
+ * We have notified the committer. If we get a consumed message from another
server, we can ask them to
+ * catchup (if the offset is lower). If anything else, then we pretty much
ask them to hold.
+ */
+ private SegmentCompletionProtocol.Response committerNotifiedCommit(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ SegmentCompletionProtocol.Response response = null;
+ response = checkBadCommitRequest(instanceId, offset, now);
+ if (response != null) {
+ return response;
+ }
+ _logger.info("{}:Uploading for instance={} offset={}", _state, instanceId,
offset);
+ _state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING;
+ long commitTimeMs = now - _startTimeMs;
+ if (commitTimeMs > _initialCommitTimeMs) {
+ // We assume that the commit time holds for all partitions. It is
possible, though, that one partition
+ // commits at a lower time than another partition, and the two
partitions are going simultaneously,
+ // and we may not get the maximum value all the time.
+ _segmentCompletionManager.setCommitTime(_segmentName.getTableName(),
commitTimeMs);
+ }
+ return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
+ }
+
+ private SegmentCompletionProtocol.Response
committerNotifiedStoppedConsuming(String instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, false);
+ }
+
+ private SegmentCompletionProtocol.Response
committerNotifiedExtendBuildTime(String instanceId,
+ StreamPartitionMsgOffset offset, int extTimeSec, long now) {
+ SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
+ if (response == null) {
+ long maxTimeAllowedToCommitMs = now + extTimeSec * 1000;
+ if (maxTimeAllowedToCommitMs
+ > _startTimeMs +
SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds() * 1000L) {
+ _logger.warn("Not accepting lease extension from {} startTime={}
requestedTime={}", instanceId, _startTimeMs,
+ maxTimeAllowedToCommitMs);
+ return abortAndReturnFailed();
+ }
+ _maxTimeAllowedToCommitMs = maxTimeAllowedToCommitMs;
+ response = SegmentCompletionProtocol.RESP_PROCESSED;
+ }
+ return response;
+ }
+
+ private SegmentCompletionProtocol.Response committerUploadingConsumed(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ return processConsumedAfterCommitStart(instanceId, offset, now);
+ }
+
+ private SegmentCompletionProtocol.Response committerUploadingCommit(String
instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ return processCommitWhileUploading(instanceId, offset, now);
+ }
+
+ private SegmentCompletionProtocol.Response
committerUploadingStoppedConsuming(String instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, false);
+ }
+
+ private SegmentCompletionProtocol.Response committingConsumed(String
instanceId, StreamPartitionMsgOffset offset,
+ long now) {
+ return processConsumedAfterCommitStart(instanceId, offset, now);
+ }
+
+ private SegmentCompletionProtocol.Response committingCommit(String
instanceId, StreamPartitionMsgOffset offset,
+ long now) {
+ return processCommitWhileUploading(instanceId, offset, now);
+ }
+
+ private SegmentCompletionProtocol.Response committingStoppedConsuming(String
instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, false);
+ }
+
+ private SegmentCompletionProtocol.Response committedConsumed(String
instanceId, StreamPartitionMsgOffset offset) {
+ // Server reporting an offset on an already completed segment. Depending
on the offset, either KEEP or DISCARD.
+ SegmentCompletionProtocol.Response response;
+ if (offset.compareTo(_winningOffset) == 0) {
+ response = keep(instanceId, offset);
+ } else {
+ // Return DISCARD. It is hard to say how long the server will take to
complete things.
+ response = discard(instanceId, offset);
+ }
+ return response;
+ }
+
+ private SegmentCompletionProtocol.Response committedCommit(String
instanceId, StreamPartitionMsgOffset offset) {
+ if (offset.compareTo(_winningOffset) == 0) {
+ return keep(instanceId, offset);
+ }
+ return discard(instanceId, offset);
+ }
+
+ private SegmentCompletionProtocol.Response committedStoppedConsuming(String
instanceId,
+ StreamPartitionMsgOffset offset, String reason) {
+ return processStoppedConsuming(instanceId, offset, reason, false);
+ }
+
+ private SegmentCompletionProtocol.Response processStoppedConsuming(String
instanceId,
+ StreamPartitionMsgOffset offset, String reason, boolean createNew) {
+ _logger
+ .info("Instance {} stopped consuming segment {} at offset {}, state
{}, createNew: {}, reason:{}", instanceId,
+ _segmentName, offset, _state, createNew, reason);
+ try {
+ _segmentManager.segmentStoppedConsuming(_segmentName, instanceId);
+ } catch (Exception e) {
+ _logger.error("Caught exception while processing stopped CONSUMING
segment: {} on instance: {}",
+ _segmentName.getSegmentName(), instanceId, e);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+ return SegmentCompletionProtocol.RESP_PROCESSED;
+ }
+
+ // A common method when the state is > COMMITTER_NOTIFIED.
+ private SegmentCompletionProtocol.Response
processConsumedAfterCommitStart(String instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ SegmentCompletionProtocol.Response response;
+ // We have already picked a winner, and may or many not have heard from
them.
+ // Common case here is that another server is coming back to us with its
offset. We either respond back with
+ // HOLD or CATCHUP.
+ // It may be that we never heard from the committer, or the committer is
taking too long to commit the segment.
+ // In that case, we abort the FSM and start afresh (i.e, return HOLD).
+ // If the winner is coming back again, then we have some more conditions
to look at.
+ response = abortIfTooLateAndReturnHold(now, instanceId, offset);
+ if (response != null) {
+ return response;
+ }
+ if (instanceId.equals(_winner)) {
+ // The winner is coming back to report its offset. Take a decision based
on the offset reported, and whether we
+ // already notified them
+ // Winner is supposedly already in the commit call. Something wrong.
+ LOGGER.warn(
+ "{}:Aborting FSM because winner is reporting a segment while it is
also committing instance={} offset={} "
+ + "now={}", _state, instanceId, offset, now);
+ // Ask them to hold, just in case the committer fails for some reason..
+ return abortAndReturnHold(now, instanceId, offset);
+ } else {
+ // Common case: A different instance is reporting.
+ if (offset.compareTo(_winningOffset) == 0) {
+ // Wait until winner has posted the segment before asking this server
to KEEP the segment.
+ response = hold(instanceId, offset);
+ } else if (offset.compareTo(_winningOffset) < 0) {
+ response = catchup(instanceId, offset);
+ } else {
+ // We have not yet committed, so ask the new responder to hold. They
may be the new leader in case the
+ // committer fails.
+ response = hold(instanceId, offset);
+ }
+ }
+ return response;
+ }
+
+ private SegmentCompletionProtocol.Response
commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
+ CommittingSegmentDescriptor committingSegmentDescriptor) {
+ String instanceId = reqParams.getInstanceId();
+ StreamPartitionMsgOffset offset =
+
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
+ if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING))
{
+ // State changed while we were out of sync. return a failed commit.
+ _logger.warn("State change during upload: state={} segment={} winner={}
winningOffset={}", _state,
+ _segmentName.getSegmentName(), _winner, _winningOffset);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+ _logger.info("Committing segment {} at offset {} winner {}",
_segmentName.getSegmentName(), offset, instanceId);
+ _state = BlockingSegmentCompletionFSMState.COMMITTING;
+ // In case of splitCommit, the segment is uploaded to a unique file name
indicated by segmentLocation,
+ // so we need to move the segment file to its permanent location first
before committing the metadata.
+ // The committingSegmentDescriptor is then updated with the permanent
segment location to be saved in metadata
+ // store.
+ try {
+ _segmentManager.commitSegmentFile(_realtimeTableName,
committingSegmentDescriptor);
+ } catch (Exception e) {
+ _logger.error("Caught exception while committing segment file for
segment: {}", _segmentName.getSegmentName(),
+ e);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+ try {
+ // Convert to a controller uri if the segment location uses local file
scheme.
+ if (CommonConstants.Segment.LOCAL_SEGMENT_SCHEME
+
.equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme()))
{
+ committingSegmentDescriptor.setSegmentLocation(URIUtils
+ .constructDownloadUrl(_controllerVipUrl,
TableNameBuilder.extractRawTableName(_realtimeTableName),
+ _segmentName.getSegmentName()));
+ }
+ _segmentManager.commitSegmentMetadata(_realtimeTableName,
committingSegmentDescriptor);
+ } catch (Exception e) {
+ _logger
+ .error("Caught exception while committing segment metadata for
segment: {}", _segmentName.getSegmentName(),
+ e);
+ return SegmentCompletionProtocol.RESP_FAILED;
+ }
+
+ _state = BlockingSegmentCompletionFSMState.COMMITTED;
+ _logger.info("Committed segment {} at offset {} winner {}",
_segmentName.getSegmentName(), offset, instanceId);
+ return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
+ }
+
+ private SegmentCompletionProtocol.Response
processCommitWhileUploading(String instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
+ SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
+ if (response != null) {
+ return response;
+ }
+ // Another committer (or same) came in while one was uploading. Ask them
to hold in case this one fails.
+ return new SegmentCompletionProtocol.Response(
+ new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
+ }
+
+ private SegmentCompletionProtocol.Response checkBadCommitRequest(String
instanceId, StreamPartitionMsgOffset offset,
+ long now) {
+ SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
+ if (response != null) {
+ return response;
+ } else if (instanceId.equals(_winner) && offset.compareTo(_winningOffset)
!= 0) {
+ // Hmm. Committer has been notified, but either a different one is
committing, or offset is different
+ _logger.warn("{}:Aborting FSM (bad commit req) instance={} offset={}
now={} winning={}", _state, instanceId,
+ offset, now, _winningOffset);
+ return abortAndReturnHold(now, instanceId, offset);
+ }
+ return null;
+ }
+
+ private SegmentCompletionProtocol.Response
processCommitWhileHoldingOrPartialConsuming(String instanceId,
+ StreamPartitionMsgOffset offset, long now) {
+ _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
+ SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
+ if (response != null) {
+ return response;
+ }
+ // We cannot get a commit if we are in this state, so ask them to hold.
Maybe we are starting after a failover.
+ // The server will re-send the segmentConsumed message.
+ return hold(instanceId, offset);
+ }
+
+ /**
+ * Pick a winner if we can, preferring the instance that we are handling
right now,
+ *
+ * We accept the first server to report an offset as long as the server
stopped consumption
+ * due to row limit. The premise is that other servers will also stop at row
limit, and there
+ * is no need to wait for them to report an offset in order to decide on a
winner. The state machine takes care
+ * of the cases where other servers may report different offsets (just in
case).
+ *
+ * If the above condition is not satisfied (i.e. either this is not the
first server, or it did not reach
+ * row limit), then we can pick a winner only if it is too late to pick a
winner, or we have heard from all
+ * servers.
+ *
+ * Otherwise, we wait to hear from more servers.
+ *
+ * @param preferredInstance The instance that is reporting in this thread.
+ * @param now current time
+ * @param stopReason reason reported by instance for stopping consumption.
+ * @return true if winner picked, false otherwise.
+ */
+ private boolean isWinnerPicked(String preferredInstance, long now, final
String stopReason) {
+ if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
+ ||
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
+ && _commitStateMap.size() == 1) {
+ _winner = preferredInstance;
+ _winningOffset = _commitStateMap.get(preferredInstance);
+ return true;
+ } else if (now > _maxTimeToPickWinnerMs || _commitStateMap.size() ==
numReplicasToLookFor()) {
+ _logger.info("{}:Picking winner time={} size={}", _state, now -
_startTimeMs, _commitStateMap.size());
+ StreamPartitionMsgOffset maxOffsetSoFar = null;
+ String winnerSoFar = null;
+ for (Map.Entry<String, StreamPartitionMsgOffset> entry :
_commitStateMap.entrySet()) {
+ if (maxOffsetSoFar == null ||
entry.getValue().compareTo(maxOffsetSoFar) > 0) {
+ maxOffsetSoFar = entry.getValue();
+ winnerSoFar = entry.getKey();
+ }
+ }
+ _winningOffset = maxOffsetSoFar;
+ if (_commitStateMap.get(preferredInstance).compareTo(maxOffsetSoFar) ==
0) {
+ winnerSoFar = preferredInstance;
+ }
+ _winner = winnerSoFar;
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
new file mode 100644
index 0000000000..c5d0ece4c6
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class SegmentCompletionConfig {
+ public static final String FSM_SCHEME =
+ CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION
+ ".fsm.scheme.";
+ public static final String DEFAULT_FSM_SCHEME_KEY =
+ CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION
+ ".fsm.scheme.default";
+ public static final String DEFAULT_FSM_SCHEME = "default";
+ private final Map<String, String> _fsmSchemes = new HashMap<>();
+ private final String _defaultFsmScheme;
+
+ public SegmentCompletionConfig(PinotConfiguration configuration) {
+ // Parse properties to extract FSM schemes
+ // Assuming properties keys are in the format scheme=className
+ for (String key : configuration.getKeys()) {
+ if (key.startsWith(FSM_SCHEME)) {
+ String scheme = key.substring(FSM_SCHEME.length());
+ String className = configuration.getProperty(key);
+ _fsmSchemes.put(scheme, className);
+ }
+ }
+
+ // Get the default FSM scheme
+ _defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY,
DEFAULT_FSM_SCHEME);
+ }
+
+ public Map<String, String> getFsmSchemes() {
+ return _fsmSchemes;
+ }
+
+ public String getDefaultFsmScheme() {
+ return _defaultFsmScheme;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java
new file mode 100644
index 0000000000..516ce4c07d
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * Interface for managing the state machine transitions related to segment
completion
+ * in a real-time table within Apache Pinot.
+ *
+ * The segment completion process is crucial for handling real-time ingestion,
ensuring
+ * that segments are correctly built, committed, or discarded based on the
server's state
+ * and the protocol defined for low-level consumer (LLC) tables. This
interface abstracts
+ * the controller-side logic for responding to server events during the
lifecycle of a
+ * segment's completion.
+ */
+public interface SegmentCompletionFSM {
+
+ /**
+ * Initializes the FSM to its initial state based on the message type.
+ * @param msgType The message type that triggered the FSM initialization.
+ * This is sent by the server when triggering the commit.
+ * The current supported values are segmentConsumed,
segmentCommit
+ *
,segmentCommitStart,segmentUpload,segmentCommitEnd,segmentCommitEndWithMetadata
+ * ,segmentStoppedConsuming,extendBuildTime
+ *
+ */
+ void transitionToInitialState(String msgType);
+
+ /**
+ * Checks whether the segment completion process has completed.
+ *
+ * The process is considered complete when the segment has been either
successfully
+ * committed or marked as aborted. This method helps determine if the FSM
can be
+ * removed from memory.
+ *
+ * @return {@code true} if the FSM has reached a terminal state, {@code
false} otherwise.
+ */
+ boolean isDone();
+
+ /**
+ * Processes the event where a server indicates it has consumed up to a
specified offset.
+ *
+ * This is typically triggered when a server finishes consuming data for a
segment due
+ * to reaching a row limit, an end-of-partition signal, or another stopping
condition.
+ * The FSM evaluates the reported offset and determines the next state or
action for
+ * the server, such as holding, catching up, or committing the segment.
+ *
+ * @param instanceId The ID of the server instance reporting consumption.
+ * @param offset The offset up to which the server has consumed.
+ * @param stopReason The reason the server stopped consuming (e.g., row
limit or end of partition).
+ * @return A response indicating the next action for the server (e.g., HOLD,
CATCHUP, or COMMIT).
+ */
+ SegmentCompletionProtocol.Response segmentConsumed(String instanceId,
StreamPartitionMsgOffset offset,
+ String stopReason);
+
+ /**
+ * Processes the start of a segment commit from a server.
+ *
+ * This occurs when a server signals its intention to commit a segment it
has built.
+ * The FSM verifies whether the server is eligible to commit based on its
previous
+ * state and the reported offset, and transitions to a committing state if
appropriate.
+ *
+ * @param instanceId The ID of the server instance attempting to commit.
+ * @param offset The offset being committed by the server.
+ * @return A response indicating the next action for the server (e.g.,
CONTINUE or FAILED).
+ */
+ SegmentCompletionProtocol.Response segmentCommitStart(String instanceId,
StreamPartitionMsgOffset offset);
+
+ /**
+ * Handles the event where a server indicates it has stopped consuming.
+ *
+ * This is triggered when a server cannot continue consuming for a segment,
potentially
+ * due to resource constraints, errors, or a manual stop. The FSM updates
its state
+ * and determines whether the server can participate in subsequent actions
for the segment.
+ *
+ * @param instanceId The ID of the server instance reporting the stopped
consumption.
+ * @param offset The offset at which the server stopped consuming.
+ * @param reason The reason for stopping consumption (e.g., resource
constraints or errors).
+ * @return A response indicating the next action for the server (e.g.,
PROCESSED or FAILED).
+ */
+ SegmentCompletionProtocol.Response stoppedConsuming(String instanceId,
StreamPartitionMsgOffset offset,
+ String reason);
+
+ /**
+ * Handles a request to extend the time allowed for segment building.
+ *
+ * If a server requires more time to build a segment, it can request an
extension.
+ * The FSM evaluates the request in the context of the current state and the
protocol's
+ * constraints, and either grants or denies the extension.
+ *
+ * @param instanceId The ID of the server instance requesting an extension.
+ * @param offset The offset at which the server is currently consuming.
+ * @param extTimeSec The additional time (in seconds) requested for segment
building.
+ * @return A response indicating whether the extension was accepted or
denied.
+ */
+ SegmentCompletionProtocol.Response extendBuildTime(String instanceId,
StreamPartitionMsgOffset offset,
+ int extTimeSec);
+
+ /**
+ * Processes the end of a segment commit from a server.
+ *
+ * This method is triggered when a server has completed uploading the
segment and
+ * signals the end of the commit process. The FSM validates the commit,
updates metadata,
+ * and finalizes the segment's state. Depending on the outcome, the segment
is either
+ * successfully committed or the FSM transitions to an error state.
+ *
+ * @param reqParams The parameters of the commit request.
+ * @param committingSegmentDescriptor Metadata about the segment being
committed.
+ * @return A response indicating whether the commit was successful or failed.
+ */
+ SegmentCompletionProtocol.Response
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
+ CommittingSegmentDescriptor committingSegmentDescriptor);
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
new file mode 100644
index 0000000000..5ac238d626
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentCompletionFSMFactory {
+ private SegmentCompletionFSMFactory() {
+ // Private constructor to prevent instantiation
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
+ private static final Map<String, Class<? extends SegmentCompletionFSM>>
FSM_CLASS_MAP = new HashMap<>();
+
+ // Static block to register the default FSM
+ static {
+ register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME,
BlockingSegmentCompletionFSM.class);
+ }
+
+ /**
+ * Registers an FSM class with a specific scheme/type.
+ *
+ * @param scheme The scheme or type key.
+ * @param fsmClass The class for the FSM.
+ */
+ public static void register(String scheme, Class<? extends
SegmentCompletionFSM> fsmClass) {
+ Preconditions.checkNotNull(scheme, "Scheme cannot be null");
+ Preconditions.checkNotNull(fsmClass, "FSM Class cannot be null");
+ Preconditions.checkState(FSM_CLASS_MAP.put(scheme, fsmClass) == null,
+ "FSM class already registered for scheme: " + scheme);
+ LOGGER.info("Registered SegmentCompletionFSM class {} for scheme {}",
fsmClass, scheme);
+ }
+
+ /**
+ * Initializes the factory with configurations.
+ *
+ * @param fsmFactoryConfig The configuration object containing FSM schemes
and classes.
+ */
+ public static void init(SegmentCompletionConfig fsmFactoryConfig) {
+ Map<String, String> schemesConfig = fsmFactoryConfig.getFsmSchemes();
+ for (Map.Entry<String, String> entry : schemesConfig.entrySet()) {
+ String scheme = entry.getKey();
+ String className = entry.getValue();
+ try {
+ LOGGER.info("Initializing SegmentCompletionFSM for scheme {},
classname {}", scheme, className);
+ Class<?> clazz = Class.forName(className);
+ register(scheme, (Class<? extends SegmentCompletionFSM>) clazz);
+ } catch (Exception e) {
+ LOGGER.error("Could not register FSM class for class {} with scheme
{}", className, scheme, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Creates an FSM instance based on the scheme/type.
+ *
+ * @param scheme The scheme or type key.
+ * @param manager The SegmentCompletionManager instance.
+ * @param segmentManager The PinotLLCRealtimeSegmentManager instance.
+ * @param llcSegmentName The segment name.
+ * @param segmentMetadata The segment metadata.
+ * @return An instance of SegmentCompletionFSM.
+ */
+ public static SegmentCompletionFSM createFSM(String scheme,
+ SegmentCompletionManager manager,
+ PinotLLCRealtimeSegmentManager segmentManager,
+ LLCSegmentName llcSegmentName,
+ SegmentZKMetadata segmentMetadata) {
+ Class<? extends SegmentCompletionFSM> fsmClass = FSM_CLASS_MAP.get(scheme);
+ Preconditions.checkState(fsmClass != null, "No FSM registered for scheme:
" + scheme);
+ try {
+ return fsmClass.getConstructor(
+ PinotLLCRealtimeSegmentManager.class,
+ SegmentCompletionManager.class,
+ LLCSegmentName.class,
+ SegmentZKMetadata.class
+ ).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata);
+ } catch (Exception e) {
+ LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Checks if a scheme is supported.
+ *
+ * @param factoryType The scheme to check.
+ * @return True if supported, false otherwise.
+ */
+ public static boolean isFactoryTypeSupported(String factoryType) {
+ return FSM_CLASS_MAP.containsKey(factoryType);
+ }
+
+ /**
+ * Clears all registered FSM classes.
+ */
+ public static void shutdown() {
+ FSM_CLASS_MAP.clear();
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 92a9adc1c0..63d302f929 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -20,28 +20,22 @@ package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.LeadControllerManager;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -60,17 +54,6 @@ public class SegmentCompletionManager {
// TODO Can we log using the segment name in the log message?
public static final Logger LOGGER =
LoggerFactory.getLogger(SegmentCompletionManager.class);
- private enum State {
- PARTIAL_CONSUMING, // Indicates that at least one replica has reported
that it has stopped consuming.
- HOLDING, // the segment has started finalizing.
- COMMITTER_DECIDED, // We know who the committer will be, we will let them
know next time they call segmentConsumed()
- COMMITTER_NOTIFIED, // we notified the committer to commit.
- COMMITTER_UPLOADING, // committer is uploading.
- COMMITTING, // we are in the process of committing to zk
- COMMITTED, // We already committed a segment.
- ABORTED, // state machine is aborted. we will start a fresh one when
the next segmentConsumed comes in.
- }
-
private final HelixManager _helixManager;
// A map that holds the FSM for each segment.
private final Map<String, SegmentCompletionFSM> _fsmMap = new
ConcurrentHashMap<>();
@@ -78,6 +61,8 @@ public class SegmentCompletionManager {
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
+ private final SegmentCompletionConfig _segmentCompletionConfig;
+
// Half hour max commit time for all segments
private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -91,13 +76,17 @@ public class SegmentCompletionManager {
public SegmentCompletionManager(HelixManager helixManager,
PinotLLCRealtimeSegmentManager segmentManager,
ControllerMetrics controllerMetrics, LeadControllerManager
leadControllerManager,
- int segmentCommitTimeoutSeconds) {
+ int segmentCommitTimeoutSeconds, SegmentCompletionConfig
segmentCompletionConfig) {
_helixManager = helixManager;
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds,
TimeUnit.SECONDS));
+ _segmentCompletionConfig = segmentCompletionConfig;
+
+ // Initialize the FSM Factory
+ SegmentCompletionFSMFactory.init(_segmentCompletionConfig);
}
public String getControllerVipUrl() {
@@ -108,6 +97,7 @@ public class SegmentCompletionManager {
return System.currentTimeMillis();
}
+
protected StreamPartitionMsgOffsetFactory
getStreamPartitionMsgOffsetFactory(LLCSegmentName llcSegmentName) {
String rawTableName = llcSegmentName.getTableName();
TableConfig tableConfig =
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
@@ -116,6 +106,18 @@ public class SegmentCompletionManager {
return
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
}
+ public Long getCommitTime(String tableName) {
+ return _commitTimeMap.get(tableName);
+ }
+
+ public void setCommitTime(String tableName, long commitTime) {
+ _commitTimeMap.put(tableName, commitTime);
+ }
+
+ public ControllerMetrics getControllerMetrics() {
+ return _controllerMetrics;
+ }
+
private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName
llcSegmentName, String msgType) {
return _fsmMap.computeIfAbsent(llcSegmentName.getSegmentName(), k ->
createFsm(llcSegmentName, msgType));
}
@@ -125,20 +127,27 @@ public class SegmentCompletionManager {
String segmentName = llcSegmentName.getSegmentName();
SegmentZKMetadata segmentMetadata =
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
Preconditions.checkState(segmentMetadata != null, "Failed to find ZK
metadata for segment: %s", segmentName);
- SegmentCompletionFSM fsm;
- if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
- // Segment is already committed
- StreamPartitionMsgOffsetFactory factory =
getStreamPartitionMsgOffsetFactory(llcSegmentName);
- StreamPartitionMsgOffset endOffset =
factory.create(segmentMetadata.getEndOffset());
- fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this,
llcSegmentName, segmentMetadata.getNumReplicas(),
- endOffset);
- } else if
(msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
- fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this,
llcSegmentName,
- segmentMetadata.getNumReplicas());
- } else {
- // Segment is in the process of completing, and this is the first one to
respond. Create fsm
- fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this,
llcSegmentName, segmentMetadata.getNumReplicas());
+
+ TableConfig tableConfig =
_segmentManager.getTableConfig(realtimeTableName);
+ String factoryName = null;
+ try {
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+ factoryName =
streamConfigMap.get(StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME);
+ } catch (Exception e) {
+ // If there is an exception, we default to the default factory.
+ }
+
+ if (factoryName == null) {
+ factoryName = _segmentCompletionConfig.getDefaultFsmScheme();
}
+
+
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
+ "No FSM registered for name: " + factoryName);
+
+ SegmentCompletionFSM fsm =
+ SegmentCompletionFSMFactory.createFSM(factoryName, this,
_segmentManager, llcSegmentName, segmentMetadata);
+ fsm.transitionToInitialState(msgType);
+
LOGGER.info("Created FSM {}", fsm);
return fsm;
}
@@ -302,854 +311,6 @@ public class SegmentCompletionManager {
return response;
}
- /**
- * This class implements the FSM on the controller side for each completing
segment.
- *
- * An FSM is is created when we first hear about a segment (typically
through the segmentConsumed message).
- * When an FSM is created, it may have one of two start states (HOLDING, or
COMMITTED), depending on the
- * constructor used.
- *
- * We kick off an FSM in the COMMITTED state (rare) when we find that
PROPERTYSTORE already has the segment
- * with the Status set to DONE.
- *
- * We kick off an FSM in the HOLDING state (typical) when a sementConsumed()
message arrives from the
- * first server we hear from.
- *
- * The FSM does not have a timer. It is clocked by the servers, which,
typically, are retransmitting their
- * segmentConsumed() message every so often
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS).
- *
- * See https://github.com/linkedin/pinot/wiki/Low-level-kafka-consumers
- */
- private static class SegmentCompletionFSM {
- // We will have some variation between hosts, so we add 10% to the max
hold time to pick a winner.
- // If there is more than 10% variation, then it is handled as an error
case (i.e. the first few to
- // come in will have a winner, and the later ones will just download the
segment)
- private static final long MAX_TIME_TO_PICK_WINNER_MS =
- SegmentCompletionProtocol.MAX_HOLD_TIME_MS +
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
-
- // Once we pick a winner, the winner may get notified in the next call, so
add one hold time plus some.
- // It may be that the winner is not the server that we are currently
processing a segmentConsumed()
- // message from. In that case, we will wait for the next segmetnConsumed()
message from the picked winner.
- // If the winner does not come back to us within that time, we abort the
state machine and start over.
- private static final long MAX_TIME_TO_NOTIFY_WINNER_MS =
- MAX_TIME_TO_PICK_WINNER_MS +
SegmentCompletionProtocol.MAX_HOLD_TIME_MS + (
- SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
-
- public final Logger _logger;
-
- State _state = State.HOLDING; // Typically start off in HOLDING state.
- final long _startTimeMs;
- private final LLCSegmentName _segmentName;
- private final String _rawTableName;
- private final String _realtimeTableName;
- private final int _numReplicas;
- private final Set<String> _excludedServerStateMap;
- private final Map<String, StreamPartitionMsgOffset> _commitStateMap;
- private final StreamPartitionMsgOffsetFactory
_streamPartitionMsgOffsetFactory;
- private StreamPartitionMsgOffset _winningOffset = null;
- private String _winner;
- private final PinotLLCRealtimeSegmentManager _segmentManager;
- private final SegmentCompletionManager _segmentCompletionManager;
- private final long _maxTimeToPickWinnerMs;
- private final long _maxTimeToNotifyWinnerMs;
- private final long _initialCommitTimeMs;
- // Once the winner is notified, they are expected to commit right away. At
this point, it is the segment build
- // time that we need to consider.
- // We may need to add some time here to allow for getting the lock? For
now 0
- // We may need to add some time for the committer come back to us (after
the build)? For now 0.
- private long _maxTimeAllowedToCommitMs;
- private final String _controllerVipUrl;
-
- public static SegmentCompletionFSM
fsmInHolding(PinotLLCRealtimeSegmentManager segmentManager,
- SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName, int numReplicas) {
- return new SegmentCompletionFSM(segmentManager,
segmentCompletionManager, segmentName, numReplicas);
- }
-
- public static SegmentCompletionFSM
fsmInCommit(PinotLLCRealtimeSegmentManager segmentManager,
- SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName, int numReplicas,
- StreamPartitionMsgOffset winningOffset) {
- return new SegmentCompletionFSM(segmentManager,
segmentCompletionManager, segmentName, numReplicas,
- winningOffset);
- }
-
- public static SegmentCompletionFSM
fsmStoppedConsuming(PinotLLCRealtimeSegmentManager segmentManager,
- SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName, int numReplicas) {
- SegmentCompletionFSM fsm =
- new SegmentCompletionFSM(segmentManager, segmentCompletionManager,
segmentName, numReplicas);
- fsm._state = State.PARTIAL_CONSUMING;
- return fsm;
- }
-
- // Ctor that starts the FSM in HOLDING state
- private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
- SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName, int numReplicas) {
- _segmentName = segmentName;
- _rawTableName = _segmentName.getTableName();
- _realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
- _numReplicas = numReplicas;
- _segmentManager = segmentManager;
- _commitStateMap = new
HashMap<>(HashUtil.getHashMapCapacity(_numReplicas));
- _excludedServerStateMap = new HashSet<>(_numReplicas);
- _segmentCompletionManager = segmentCompletionManager;
- _startTimeMs = _segmentCompletionManager.getCurrentTimeMs();
- _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
- _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
- _streamPartitionMsgOffsetFactory =
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
- long initialCommitTimeMs = MAX_TIME_TO_NOTIFY_WINNER_MS +
_segmentManager.getCommitTimeoutMS(_realtimeTableName);
- Long savedCommitTime =
_segmentCompletionManager._commitTimeMap.get(_rawTableName);
- if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
- initialCommitTimeMs = savedCommitTime;
- }
- _logger = LoggerFactory.getLogger("SegmentCompletionFSM_" +
segmentName.getSegmentName());
- if (initialCommitTimeMs > MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS *
1000) {
- // The table has a really high value configured for max commit time.
Set it to a higher value than default
- // and go from there.
- _logger
- .info("Configured max commit time {}s too high for table {},
changing to {}s", initialCommitTimeMs / 1000,
- _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
- initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
- }
- _initialCommitTimeMs = initialCommitTimeMs;
- _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
- _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
- }
-
- // Ctor that starts the FSM in COMMITTED state
- private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
- SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName, int numReplicas,
- StreamPartitionMsgOffset winningOffset) {
- // Constructor used when we get an event after a segment is committed.
- this(segmentManager, segmentCompletionManager, segmentName, numReplicas);
- _state = State.COMMITTED;
- _winningOffset = winningOffset;
- _winner = "UNKNOWN";
- }
-
- @Override
- public String toString() {
- return "{" + _segmentName.getSegmentName() + "," + _state + "," +
_startTimeMs + "," + _winner + ","
- + _winningOffset + "," + _controllerVipUrl + "}";
- }
-
- // SegmentCompletionManager releases the FSM from the hashtable when it is
done.
- public boolean isDone() {
- return _state.equals(State.COMMITTED) || _state.equals(State.ABORTED);
- }
-
- /*
- * We just heard from a server that it has reached completion stage, and
is reporting the offset
- * that the server is at. Since multiple servers can come in at the same
time for this segment,
- * we need to synchronize on the FSM to handle the messages. The
processing time itself is small,
- * so we should be OK with this synchronization.
- */
- public SegmentCompletionProtocol.Response segmentConsumed(String
instanceId, StreamPartitionMsgOffset offset,
- final String stopReason) {
- final long now = _segmentCompletionManager.getCurrentTimeMs();
- // We can synchronize the entire block for the SegmentConsumed message.
- synchronized (this) {
- _logger.info("Processing segmentConsumed({}, {})", instanceId, offset);
- if (_excludedServerStateMap.contains(instanceId)) {
- // Could be that the server was restarted, and it started consuming
again, and somehow got to complete
- // consumption up to this point. We will accept it.
- _logger.info("Marking instance {} alive again", instanceId);
- _excludedServerStateMap.remove(instanceId);
- }
- _commitStateMap.put(instanceId, offset);
- switch (_state) {
- case PARTIAL_CONSUMING:
- return partialConsumingConsumed(instanceId, offset, now,
stopReason);
-
- case HOLDING:
- return holdingConsumed(instanceId, offset, now, stopReason);
-
- case COMMITTER_DECIDED: // This must be a retransmit
- return committerDecidedConsumed(instanceId, offset, now);
-
- case COMMITTER_NOTIFIED:
- return committerNotifiedConsumed(instanceId, offset, now);
-
- case COMMITTER_UPLOADING:
- return committerUploadingConsumed(instanceId, offset, now);
-
- case COMMITTING:
- return committingConsumed(instanceId, offset, now);
-
- case COMMITTED:
- return committedConsumed(instanceId, offset);
-
- case ABORTED:
- // FSM has been aborted, just return HOLD
- return hold(instanceId, offset);
-
- default:
- return fail(instanceId, offset);
- }
- }
- }
-
- /*
- * A server has sent segmentConsumed() message. The caller will save the
segment if we return
- * COMMIT_CONTINUE. We need to verify that it is the same server that we
notified as the winner
- * and the offset is the same as what is coming in with the commit. We can
then move to
- * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
- *
- * In case of discrepancy we move the state machine to ABORTED state so
that this FSM is removed
- * from the map, and things start over. In this case, we respond to the
server with a 'hold' so
- * that they re-transmit their segmentConsumed() message and start over.
- */
- public SegmentCompletionProtocol.Response segmentCommitStart(String
instanceId, StreamPartitionMsgOffset offset) {
- long now = _segmentCompletionManager.getCurrentTimeMs();
- if (_excludedServerStateMap.contains(instanceId)) {
- _logger.warn("Not accepting commit from {} since it had stoppd
consuming", instanceId);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
- synchronized (this) {
- _logger.info("Processing segmentCommitStart({}, {})", instanceId,
offset);
- switch (_state) {
- case PARTIAL_CONSUMING:
- return partialConsumingCommit(instanceId, offset, now);
-
- case HOLDING:
- return holdingCommit(instanceId, offset, now);
-
- case COMMITTER_DECIDED:
- return committerDecidedCommit(instanceId, offset, now);
-
- case COMMITTER_NOTIFIED:
- return committerNotifiedCommit(instanceId, offset, now);
-
- case COMMITTER_UPLOADING:
- return committerUploadingCommit(instanceId, offset, now);
-
- case COMMITTING:
- return committingCommit(instanceId, offset, now);
-
- case COMMITTED:
- return committedCommit(instanceId, offset);
-
- case ABORTED:
- return hold(instanceId, offset);
-
- default:
- return fail(instanceId, offset);
- }
- }
- }
-
- public SegmentCompletionProtocol.Response stoppedConsuming(String
instanceId, StreamPartitionMsgOffset offset,
- String reason) {
- synchronized (this) {
- _logger.info("Processing stoppedConsuming({}, {})", instanceId,
offset);
- _excludedServerStateMap.add(instanceId);
- switch (_state) {
- case PARTIAL_CONSUMING:
- return partialConsumingStoppedConsuming(instanceId, offset,
reason);
-
- case HOLDING:
- return holdingStoppedConsuming(instanceId, offset, reason);
-
- case COMMITTER_DECIDED:
- return committerDecidedStoppedConsuming(instanceId, offset,
reason);
-
- case COMMITTER_NOTIFIED:
- return committerNotifiedStoppedConsuming(instanceId, offset,
reason);
-
- case COMMITTER_UPLOADING:
- return committerUploadingStoppedConsuming(instanceId, offset,
reason);
-
- case COMMITTING:
- return committingStoppedConsuming(instanceId, offset, reason);
-
- case COMMITTED:
- return committedStoppedConsuming(instanceId, offset, reason);
-
- case ABORTED:
- _logger.info("Ignoring StoppedConsuming message from {} in state
{}", instanceId, _state);
- return SegmentCompletionProtocol.RESP_PROCESSED;
-
- default:
- return fail(instanceId, offset);
- }
- }
- }
-
- public SegmentCompletionProtocol.Response extendBuildTime(final String
instanceId,
- final StreamPartitionMsgOffset offset, final int extTimeSec) {
- final long now = _segmentCompletionManager.getCurrentTimeMs();
- synchronized (this) {
- _logger.info("Processing extendBuildTime({}, {}, {})", instanceId,
offset, extTimeSec);
- switch (_state) {
- case PARTIAL_CONSUMING:
- case HOLDING:
- case COMMITTER_DECIDED:
- return fail(instanceId, offset);
- case COMMITTER_NOTIFIED:
- return committerNotifiedExtendBuildTime(instanceId, offset,
extTimeSec, now);
- case COMMITTER_UPLOADING:
- case COMMITTING:
- case COMMITTED:
- case ABORTED:
- default:
- return fail(instanceId, offset);
- }
- }
- }
-
- /*
- * We can get this call only when the state is COMMITTER_UPLOADING. Also,
the instanceId should be equal to
- * the _winner.
- */
- public SegmentCompletionProtocol.Response
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
- CommittingSegmentDescriptor committingSegmentDescriptor) {
- String instanceId = reqParams.getInstanceId();
- StreamPartitionMsgOffset offset =
-
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
- synchronized (this) {
- if (_excludedServerStateMap.contains(instanceId)) {
- _logger.warn("Not accepting commitEnd from {} since it had stoppd
consuming", instanceId);
- return abortAndReturnFailed();
- }
- _logger.info("Processing segmentCommitEnd({}, {})", instanceId,
offset);
- if (!_state.equals(State.COMMITTER_UPLOADING) ||
!instanceId.equals(_winner)
- || offset.compareTo(_winningOffset) != 0) {
- // State changed while we were out of sync. return a failed commit.
- _logger.warn("State change during upload: state={} segment={}
winner={} winningOffset={}", _state,
- _segmentName.getSegmentName(), _winner, _winningOffset);
- return abortAndReturnFailed();
- }
- SegmentCompletionProtocol.Response response = commitSegment(reqParams,
committingSegmentDescriptor);
- if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
- return abortAndReturnFailed();
- } else {
- return response;
- }
- }
- }
-
- // Helper methods that log the current state and the response sent
- private SegmentCompletionProtocol.Response fail(String instanceId,
StreamPartitionMsgOffset offset) {
- _logger.info("{}:FAIL for instance={} offset={}", _state, instanceId,
offset);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
-
- private SegmentCompletionProtocol.Response commit(String instanceId,
StreamPartitionMsgOffset offset) {
- long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) /
1000;
- _logger
- .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state,
instanceId, offset, allowedBuildTimeSec);
- SegmentCompletionProtocol.Response.Params params =
- new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
- .withBuildTimeSeconds(allowedBuildTimeSec)
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
- .withControllerVipUrl(_controllerVipUrl);
- return new SegmentCompletionProtocol.Response(params);
- }
-
- private SegmentCompletionProtocol.Response discard(String instanceId,
StreamPartitionMsgOffset offset) {
- _logger.warn("{}:DISCARD for instance={} offset={}", _state, instanceId,
offset);
- return SegmentCompletionProtocol.RESP_DISCARD;
- }
-
- private SegmentCompletionProtocol.Response keep(String instanceId,
StreamPartitionMsgOffset offset) {
- _logger.info("{}:KEEP for instance={} offset={}", _state, instanceId,
offset);
- return new SegmentCompletionProtocol.Response(
- new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
- }
-
- private SegmentCompletionProtocol.Response catchup(String instanceId,
StreamPartitionMsgOffset offset) {
- _logger.info("{}:CATCHUP for instance={} offset={}", _state, instanceId,
offset);
- return new SegmentCompletionProtocol.Response(
- new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(_winningOffset.toString())
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
- }
-
- private SegmentCompletionProtocol.Response hold(String instanceId,
StreamPartitionMsgOffset offset) {
- _logger.info("{}:HOLD for instance={} offset={}", _state, instanceId,
offset);
- return new SegmentCompletionProtocol.Response(new
SegmentCompletionProtocol.Response.Params()
- .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
- .withStreamPartitionMsgOffset(offset.toString()));
- }
-
- private SegmentCompletionProtocol.Response abortAndReturnHold(long now,
String instanceId,
- StreamPartitionMsgOffset offset) {
- _state = State.ABORTED;
- _segmentCompletionManager._controllerMetrics
- .addMeteredTableValue(_rawTableName,
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
- return hold(instanceId, offset);
- }
-
- private SegmentCompletionProtocol.Response abortAndReturnFailed() {
- _state = State.ABORTED;
- _segmentCompletionManager._controllerMetrics
- .addMeteredTableValue(_rawTableName,
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
-
- private SegmentCompletionProtocol.Response
abortIfTooLateAndReturnHold(long now, String instanceId,
- StreamPartitionMsgOffset offset) {
- if (now > _maxTimeAllowedToCommitMs) {
- _logger
- .warn("{}:Aborting FSM (too late) instance={} offset={} now={}
start={}", _state, instanceId, offset, now,
- _startTimeMs);
- return abortAndReturnHold(now, instanceId, offset);
- }
- return null;
- }
-
- private int numReplicasToLookFor() {
- return _numReplicas - _excludedServerStateMap.size();
- }
-
- private SegmentCompletionProtocol.Response partialConsumingConsumed(String
instanceId,
- StreamPartitionMsgOffset offset, long now, final String stopReason) {
- // This is the first time we are getting segmentConsumed() for this
segment.
- // Some instance thinks we can close this segment, so go to HOLDING
state, and process as normal.
- // We will just be looking for less replicas.
- _state = State.HOLDING;
- return holdingConsumed(instanceId, offset, now, stopReason);
- }
-
- /*
- * This is not a good state to get a commit message, but it is possible
that the controller failed while in
- * COMMITTER_NOTIFIED state, and the first message we got in the new
controller was a stoppedConsuming
- * message. As long as the committer is not the one who stopped consuming
(which we have already checked before
- * coming here), we will trust the server that this is a valid commit.
- */
- private SegmentCompletionProtocol.Response partialConsumingCommit(String
instanceId,
- StreamPartitionMsgOffset offset, long now) {
- // Do the same as HOLDING__commit
- return processCommitWhileHoldingOrPartialConsuming(instanceId, offset,
now);
- }
-
- private SegmentCompletionProtocol.Response
partialConsumingStoppedConsuming(String instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, true);
- }
-
- /*
- * If we have waited "enough", or we have all replicas reported, then we
can pick a winner.
- *
- * Otherwise, we ask the server that is reporting to come back again later
until one of these conditions hold.
- *
- * If we can pick a winner then we go to COMMITTER_DECIDED or
COMMITTER_NOTIIFIED (if the instance
- * in this call is the same as winner).
- *
- * If we can go to COMMITTER_NOTIFIED then we respond with a COMMIT
message, otherwise with a HOLD message.
- */
- private SegmentCompletionProtocol.Response holdingConsumed(String
instanceId, StreamPartitionMsgOffset offset,
- long now, final String stopReason) {
- SegmentCompletionProtocol.Response response;
- // If we are past the max time to pick a winner, or we have heard from
all replicas,
- // we are ready to pick a winner.
- if (isWinnerPicked(instanceId, now, stopReason)) {
- if (_winner.equals(instanceId)) {
- _logger.info("{}:Committer notified winner instance={} offset={}",
_state, instanceId, offset);
- response = commit(instanceId, offset);
- _state = State.COMMITTER_NOTIFIED;
- } else {
- _logger.info("{}:Committer decided winner={} offset={}", _state,
_winner, _winningOffset);
- response = catchup(instanceId, offset);
- _state = State.COMMITTER_DECIDED;
- }
- } else {
- response = hold(instanceId, offset);
- }
- return response;
- }
-
- /*
- * This not a good state to receive a commit message, but then it may be
that the controller
- * failed over while in the COMMITTER_NOTIFIED state...
- */
- private SegmentCompletionProtocol.Response holdingCommit(String
instanceId, StreamPartitionMsgOffset offset,
- long now) {
- return processCommitWhileHoldingOrPartialConsuming(instanceId, offset,
now);
- }
-
- private SegmentCompletionProtocol.Response holdingStoppedConsuming(String
instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, true);
- }
-
- /*
- * We have already decided who the committer is, but have not let them
know yet. If this is the committer that
- * we decided, then respond back with COMMIT. Otherwise, if the offset is
smaller, respond back with a CATCHUP.
- * Otherwise, just have the server HOLD. Since the segment is not
committed yet, we cannot ask them to KEEP or
- * DISCARD etc. If the committer fails for any reason, we will need a new
committer.
- */
- private SegmentCompletionProtocol.Response committerDecidedConsumed(String
instanceId,
- StreamPartitionMsgOffset offset, long now) {
- if (offset.compareTo(_winningOffset) > 0) {
- _logger.warn("{}:Aborting FSM (offset larger than winning) instance={}
offset={} now={} winning={}", _state,
- instanceId, offset, now, _winningOffset);
- return abortAndReturnHold(now, instanceId, offset);
- }
- SegmentCompletionProtocol.Response response;
- if (_winner.equals(instanceId)) {
- if (_winningOffset.compareTo(offset) == 0) {
- _logger.info("{}:Notifying winner instance={} offset={}", _state,
instanceId, offset);
- response = commit(instanceId, offset);
- _state = State.COMMITTER_NOTIFIED;
- } else {
- // Winner coming back with a different offset.
- _logger
- .warn("{}:Winner coming back with different offset for
instance={} offset={} prevWinnOffset={}", _state,
- instanceId, offset, _winningOffset);
- response = abortAndReturnHold(now, instanceId, offset);
- }
- } else if (offset.compareTo(_winningOffset) == 0) {
- // Wait until winner has posted the segment.
- response = hold(instanceId, offset);
- } else {
- response = catchup(instanceId, offset);
- }
- if (now > _maxTimeToNotifyWinnerMs) {
- // Winner never got back to us. Abort the completion protocol and
start afresh.
- // We can potentially optimize here to see if this instance has the
highest so far, and re-elect them to
- // be winner, but for now, we will abort it and restart
- response = abortAndReturnHold(now, instanceId, offset);
- }
- return response;
- }
-
- /*
- * We have already decided who the committer is, but have not let them
know yet. So, we don't expect
- * a commit() call here.
- */
- private SegmentCompletionProtocol.Response committerDecidedCommit(String
instanceId,
- StreamPartitionMsgOffset offset, long now) {
- return processCommitWhileHoldingOrPartialConsuming(instanceId, offset,
now);
- }
-
- private SegmentCompletionProtocol.Response
committerDecidedStoppedConsuming(String instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, false);
- }
-
- /*
- * We have notified the committer. If we get a consumed message from
another server, we can ask them to
- * catchup (if the offset is lower). If anything else, then we pretty much
ask them to hold.
- */
- private SegmentCompletionProtocol.Response
committerNotifiedConsumed(String instanceId,
- StreamPartitionMsgOffset offset, long now) {
- SegmentCompletionProtocol.Response response;
- // We have already picked a winner and notified them but we have not
heard from them yet.
- // Common case here is that another server is coming back to us with its
offset. We either respond back with
- // HOLD or CATCHUP.
- // If the winner is coming back again, then we have some more conditions
to look at.
- response = abortIfTooLateAndReturnHold(now, instanceId, offset);
- if (response != null) {
- return response;
- }
- if (instanceId.equals(_winner)) {
- // Winner is coming back to after holding. Somehow they never heard us
return COMMIT.
- // Allow them to be winner again, since we are still within time to
pick a winner.
- if (offset.compareTo(_winningOffset) == 0) {
- response = commit(instanceId, offset);
- } else {
- // Something seriously wrong. Abort the FSM
- response = discard(instanceId, offset);
- _logger.warn("{}:Aborting for instance={} offset={}", _state,
instanceId, offset);
- _state = State.ABORTED;
- }
- } else {
- // Common case: A different instance is reporting.
- if (offset.compareTo(_winningOffset) == 0) {
- // Wait until winner has posted the segment before asking this
server to KEEP the segment.
- response = hold(instanceId, offset);
- } else if (offset.compareTo(_winningOffset) < 0) {
- response = catchup(instanceId, offset);
- } else {
- // We have not yet committed, so ask the new responder to hold. They
may be the new leader in case the
- // committer fails.
- response = hold(instanceId, offset);
- }
- }
- return response;
- }
-
- /*
- * We have notified the committer. If we get a consumed message from
another server, we can ask them to
- * catchup (if the offset is lower). If anything else, then we pretty much
ask them to hold.
- */
- private SegmentCompletionProtocol.Response committerNotifiedCommit(String
instanceId,
- StreamPartitionMsgOffset offset, long now) {
- SegmentCompletionProtocol.Response response = null;
- response = checkBadCommitRequest(instanceId, offset, now);
- if (response != null) {
- return response;
- }
- _logger.info("{}:Uploading for instance={} offset={}", _state,
instanceId, offset);
- _state = State.COMMITTER_UPLOADING;
- long commitTimeMs = now - _startTimeMs;
- if (commitTimeMs > _initialCommitTimeMs) {
- // We assume that the commit time holds for all partitions. It is
possible, though, that one partition
- // commits at a lower time than another partition, and the two
partitions are going simultaneously,
- // and we may not get the maximum value all the time.
-
_segmentCompletionManager._commitTimeMap.put(_segmentName.getTableName(),
commitTimeMs);
- }
- return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
- }
-
- private SegmentCompletionProtocol.Response
committerNotifiedStoppedConsuming(String instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, false);
- }
-
- private SegmentCompletionProtocol.Response
committerNotifiedExtendBuildTime(String instanceId,
- StreamPartitionMsgOffset offset, int extTimeSec, long now) {
- SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
- if (response == null) {
- long maxTimeAllowedToCommitMs = now + extTimeSec * 1000;
- if (maxTimeAllowedToCommitMs > _startTimeMs +
MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000) {
- _logger.warn("Not accepting lease extension from {} startTime={}
requestedTime={}", instanceId, _startTimeMs,
- maxTimeAllowedToCommitMs);
- return abortAndReturnFailed();
- }
- _maxTimeAllowedToCommitMs = maxTimeAllowedToCommitMs;
- response = SegmentCompletionProtocol.RESP_PROCESSED;
- }
- return response;
- }
-
- private SegmentCompletionProtocol.Response
committerUploadingConsumed(String instanceId,
- StreamPartitionMsgOffset offset, long now) {
- return processConsumedAfterCommitStart(instanceId, offset, now);
- }
-
- private SegmentCompletionProtocol.Response committerUploadingCommit(String
instanceId,
- StreamPartitionMsgOffset offset, long now) {
- return processCommitWhileUploading(instanceId, offset, now);
- }
-
- private SegmentCompletionProtocol.Response
committerUploadingStoppedConsuming(String instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, false);
- }
-
- private SegmentCompletionProtocol.Response committingConsumed(String
instanceId, StreamPartitionMsgOffset offset,
- long now) {
- return processConsumedAfterCommitStart(instanceId, offset, now);
- }
-
- private SegmentCompletionProtocol.Response committingCommit(String
instanceId, StreamPartitionMsgOffset offset,
- long now) {
- return processCommitWhileUploading(instanceId, offset, now);
- }
-
- private SegmentCompletionProtocol.Response
committingStoppedConsuming(String instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, false);
- }
-
- private SegmentCompletionProtocol.Response committedConsumed(String
instanceId, StreamPartitionMsgOffset offset) {
- // Server reporting an offset on an already completed segment. Depending
on the offset, either KEEP or DISCARD.
- SegmentCompletionProtocol.Response response;
- if (offset.compareTo(_winningOffset) == 0) {
- response = keep(instanceId, offset);
- } else {
- // Return DISCARD. It is hard to say how long the server will take to
complete things.
- response = discard(instanceId, offset);
- }
- return response;
- }
-
- private SegmentCompletionProtocol.Response committedCommit(String
instanceId, StreamPartitionMsgOffset offset) {
- if (offset.compareTo(_winningOffset) == 0) {
- return keep(instanceId, offset);
- }
- return discard(instanceId, offset);
- }
-
- private SegmentCompletionProtocol.Response
committedStoppedConsuming(String instanceId,
- StreamPartitionMsgOffset offset, String reason) {
- return processStoppedConsuming(instanceId, offset, reason, false);
- }
-
- private SegmentCompletionProtocol.Response processStoppedConsuming(String
instanceId,
- StreamPartitionMsgOffset offset, String reason, boolean createNew) {
- _logger
- .info("Instance {} stopped consuming segment {} at offset {}, state
{}, createNew: {}, reason:{}", instanceId,
- _segmentName, offset, _state, createNew, reason);
- try {
- _segmentManager.segmentStoppedConsuming(_segmentName, instanceId);
- } catch (Exception e) {
- _logger.error("Caught exception while processing stopped CONSUMING
segment: {} on instance: {}",
- _segmentName.getSegmentName(), instanceId, e);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
- return SegmentCompletionProtocol.RESP_PROCESSED;
- }
-
- // A common method when the state is > COMMITTER_NOTIFIED.
- private SegmentCompletionProtocol.Response
processConsumedAfterCommitStart(String instanceId,
- StreamPartitionMsgOffset offset, long now) {
- SegmentCompletionProtocol.Response response;
- // We have already picked a winner, and may or many not have heard from
them.
- // Common case here is that another server is coming back to us with its
offset. We either respond back with
- // HOLD or CATCHUP.
- // It may be that we never heard from the committer, or the committer is
taking too long to commit the segment.
- // In that case, we abort the FSM and start afresh (i.e, return HOLD).
- // If the winner is coming back again, then we have some more conditions
to look at.
- response = abortIfTooLateAndReturnHold(now, instanceId, offset);
- if (response != null) {
- return response;
- }
- if (instanceId.equals(_winner)) {
- // The winner is coming back to report its offset. Take a decision
based on the offset reported, and whether we
- // already notified them
- // Winner is supposedly already in the commit call. Something wrong.
- LOGGER.warn(
- "{}:Aborting FSM because winner is reporting a segment while it is
also committing instance={} offset={} "
- + "now={}", _state, instanceId, offset, now);
- // Ask them to hold, just in case the committer fails for some reason..
- return abortAndReturnHold(now, instanceId, offset);
- } else {
- // Common case: A different instance is reporting.
- if (offset.compareTo(_winningOffset) == 0) {
- // Wait until winner has posted the segment before asking this
server to KEEP the segment.
- response = hold(instanceId, offset);
- } else if (offset.compareTo(_winningOffset) < 0) {
- response = catchup(instanceId, offset);
- } else {
- // We have not yet committed, so ask the new responder to hold. They
may be the new leader in case the
- // committer fails.
- response = hold(instanceId, offset);
- }
- }
- return response;
- }
-
- private SegmentCompletionProtocol.Response
commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
- CommittingSegmentDescriptor committingSegmentDescriptor) {
- String instanceId = reqParams.getInstanceId();
- StreamPartitionMsgOffset offset =
-
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
- if (!_state.equals(State.COMMITTER_UPLOADING)) {
- // State changed while we were out of sync. return a failed commit.
- _logger.warn("State change during upload: state={} segment={}
winner={} winningOffset={}", _state,
- _segmentName.getSegmentName(), _winner, _winningOffset);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
- _logger.info("Committing segment {} at offset {} winner {}",
_segmentName.getSegmentName(), offset, instanceId);
- _state = State.COMMITTING;
- // The segment is uploaded to a unique file name indicated by
segmentLocation, so we need to move the segment file
- // to its permanent location first before committing the metadata. The
committingSegmentDescriptor is then updated
- // with the permanent segment location to be saved in metadata store.
- try {
- _segmentManager.commitSegmentFile(_realtimeTableName,
committingSegmentDescriptor);
- } catch (Exception e) {
- _logger.error("Caught exception while committing segment file for
segment: {}", _segmentName.getSegmentName(),
- e);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
- try {
- // Convert to a controller uri if the segment location uses local file
scheme.
- if (CommonConstants.Segment.LOCAL_SEGMENT_SCHEME
-
.equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme()))
{
- committingSegmentDescriptor.setSegmentLocation(URIUtils
- .constructDownloadUrl(_controllerVipUrl,
TableNameBuilder.extractRawTableName(_realtimeTableName),
- _segmentName.getSegmentName()));
- }
- _segmentManager.commitSegmentMetadata(_realtimeTableName,
committingSegmentDescriptor);
- } catch (Exception e) {
- _logger
- .error("Caught exception while committing segment metadata for
segment: {}", _segmentName.getSegmentName(),
- e);
- return SegmentCompletionProtocol.RESP_FAILED;
- }
-
- _state = State.COMMITTED;
- _logger.info("Committed segment {} at offset {} winner {}",
_segmentName.getSegmentName(), offset, instanceId);
- return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
- }
-
- private SegmentCompletionProtocol.Response
processCommitWhileUploading(String instanceId,
- StreamPartitionMsgOffset offset, long now) {
- _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
- SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
- if (response != null) {
- return response;
- }
- // Another committer (or same) came in while one was uploading. Ask them
to hold in case this one fails.
- return new SegmentCompletionProtocol.Response(
- new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
- }
-
- private SegmentCompletionProtocol.Response checkBadCommitRequest(String
instanceId, StreamPartitionMsgOffset offset,
- long now) {
- SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
- if (response != null) {
- return response;
- } else if (instanceId.equals(_winner) &&
offset.compareTo(_winningOffset) != 0) {
- // Hmm. Committer has been notified, but either a different one is
committing, or offset is different
- _logger.warn("{}:Aborting FSM (bad commit req) instance={} offset={}
now={} winning={}", _state, instanceId,
- offset, now, _winningOffset);
- return abortAndReturnHold(now, instanceId, offset);
- }
- return null;
- }
-
- private SegmentCompletionProtocol.Response
processCommitWhileHoldingOrPartialConsuming(String instanceId,
- StreamPartitionMsgOffset offset, long now) {
- _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
- SegmentCompletionProtocol.Response response =
abortIfTooLateAndReturnHold(now, instanceId, offset);
- if (response != null) {
- return response;
- }
- // We cannot get a commit if we are in this state, so ask them to hold.
Maybe we are starting after a failover.
- // The server will re-send the segmentConsumed message.
- return hold(instanceId, offset);
- }
-
- /**
- * Pick a winner if we can, preferring the instance that we are handling
right now,
- *
- * We accept the first server to report an offset as long as the server
stopped consumption
- * due to row limit. The premise is that other servers will also stop at
row limit, and there
- * is no need to wait for them to report an offset in order to decide on a
winner. The state machine takes care
- * of the cases where other servers may report different offsets (just in
case).
- *
- * If the above condition is not satisfied (i.e. either this is not the
first server, or it did not reach
- * row limit), then we can pick a winner only if it is too late to pick a
winner, or we have heard from all
- * servers.
- *
- * Otherwise, we wait to hear from more servers.
- *
- * @param preferredInstance The instance that is reporting in this thread.
- * @param now current time
- * @param stopReason reason reported by instance for stopping consumption.
- * @return true if winner picked, false otherwise.
- */
- private boolean isWinnerPicked(String preferredInstance, long now, final
String stopReason) {
- if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
- ||
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
- && _commitStateMap.size() == 1) {
- _winner = preferredInstance;
- _winningOffset = _commitStateMap.get(preferredInstance);
- return true;
- } else if (now > _maxTimeToPickWinnerMs || _commitStateMap.size() ==
numReplicasToLookFor()) {
- _logger.info("{}:Picking winner time={} size={}", _state, now -
_startTimeMs, _commitStateMap.size());
- StreamPartitionMsgOffset maxOffsetSoFar = null;
- String winnerSoFar = null;
- for (Map.Entry<String, StreamPartitionMsgOffset> entry :
_commitStateMap.entrySet()) {
- if (maxOffsetSoFar == null ||
entry.getValue().compareTo(maxOffsetSoFar) > 0) {
- maxOffsetSoFar = entry.getValue();
- winnerSoFar = entry.getKey();
- }
- }
- _winningOffset = maxOffsetSoFar;
- if (_commitStateMap.get(preferredInstance).compareTo(maxOffsetSoFar)
== 0) {
- winnerSoFar = preferredInstance;
- }
- _winner = winnerSoFar;
- return true;
- }
- return false;
- }
- }
-
@VisibleForTesting
protected boolean isLeader(String tableName) {
return _leadControllerManager.isLeaderForTable(tableName);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2595d8debf..dd2d0ceee0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
@@ -1324,7 +1325,8 @@ public class SegmentCompletionTest {
boolean isLeader, ControllerMetrics controllerMetrics) {
super(helixManager, segmentManager, controllerMetrics,
new LeadControllerManager("localhost_1234", helixManager,
controllerMetrics),
- SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
+ SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds(),
+ new SegmentCompletionConfig(new PinotConfiguration()));
_isLeader = isLeader;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 845dfd5ffa..588aaeb949 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -130,6 +130,11 @@ public class StreamConfigProperties {
*/
public static final String SERVER_UPLOAD_TO_DEEPSTORE =
"realtime.segment.serverUploadToDeepStore";
+ /**
+ * Config used to indicate which segment commit protocol implementation
controller should use for this table
+ */
+ public static final String SEGMENT_COMPLETION_FSM_SCHEME =
"segment.completion.fsm.scheme";
+
/**
* Helper method to create a stream specific property
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1a6985084b..641fa4ef89 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -918,6 +918,8 @@ public class CommonConstants {
//Set to true to load all services tagged and compiled with
hk2-metadata-generator. Default to False
public static final String CONTROLLER_SERVICE_AUTO_DISCOVERY =
"pinot.controller.service.auto.discovery";
public static final String CONFIG_OF_LOGGER_ROOT_DIR =
"pinot.controller.logger.root.dir";
+ public static final String PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION =
+ "pinot.controller.segment.completion";
}
public static class Minion {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]