This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a9cd82b98 Make segment completion FSM pluggable (#14088)
2a9cd82b98 is described below

commit 2a9cd82b9849bd7e9b6db5b7403b502632b7b41d
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Dec 13 18:32:19 2024 +0530

    Make segment completion FSM pluggable (#14088)
    
    * WIP: Make Completion FSM pluggable
    
    * Add factory and make all test pass
    
    * Fix test failures
    
    * Add javadoc comments
    
    * Simplify fsm factory by just storing class names instead of constructors
    
    * More refactoring plus removed hardcoded
    
    * rebase with master
    
    * Fix schemes
    
    * refactor
    
    * Refactoring
    
    * Make segment completion protocol configurable from table config
    
    * fix bug
    
    ---------
    
    Co-authored-by: Kartik Khare <[email protected]>
---
 .../pinot/controller/BaseControllerStarter.java    |   6 +-
 .../realtime/BlockingSegmentCompletionFSM.java     | 902 ++++++++++++++++++++
 .../core/realtime/SegmentCompletionConfig.java     |  58 ++
 .../helix/core/realtime/SegmentCompletionFSM.java  | 133 +++
 .../core/realtime/SegmentCompletionFSMFactory.java | 123 +++
 .../core/realtime/SegmentCompletionManager.java    | 921 +--------------------
 .../helix/core/realtime/SegmentCompletionTest.java |   4 +-
 .../pinot/spi/stream/StreamConfigProperties.java   |   5 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 9 files changed, 1272 insertions(+), 882 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 9a48b6ad6f..342413d355 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -99,6 +99,7 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
@@ -489,9 +490,12 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, 
_controllerMetrics);
     // TODO: Need to put this inside HelixResourceManager when 
HelixControllerLeadershipManager is removed.
     
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+
+    SegmentCompletionConfig segmentCompletionConfig = new 
SegmentCompletionConfig(_config);
+
     _segmentCompletionManager =
         new SegmentCompletionManager(_helixParticipantManager, 
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
-            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
+            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds(), 
segmentCompletionConfig);
 
     _sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
new file mode 100644
index 0000000000..b119928a46
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
@@ -0,0 +1,902 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
+import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class implements the FSM on the controller side for each completing 
segment.
+ *
+ * An FSM is is created when we first hear about a segment (typically through 
the segmentConsumed message).
+ * When an FSM is created, it may have one of two start states (HOLDING, or 
COMMITTED), depending on the
+ * constructor used.
+ *
+ * We kick off an FSM in the COMMITTED state (rare) when we find that 
PROPERTYSTORE already has the segment
+ * with the Status set to DONE.
+ *
+ * We kick off an FSM in the HOLDING state (typical) when a sementConsumed() 
message arrives from the
+ * first server we hear from.
+ *
+ * The FSM does not have a timer. It is clocked by the servers, which, 
typically, are retransmitting their
+ * segmentConsumed() message every so often 
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS).
+ *
+ * See https://github.com/linkedin/pinot/wiki/Low-level-kafka-consumers
+ */
+public class BlockingSegmentCompletionFSM implements SegmentCompletionFSM {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingSegmentCompletionFSM.class);
+
+  public enum BlockingSegmentCompletionFSMState {
+    PARTIAL_CONSUMING,  // Indicates that at least one replica has reported 
that it has stopped consuming.
+    HOLDING,          // the segment has started finalizing.
+    COMMITTER_DECIDED, // We know who the committer will be, we will let them 
know next time they call segmentConsumed()
+    COMMITTER_NOTIFIED, // we notified the committer to commit.
+    COMMITTER_UPLOADING,  // committer is uploading.
+    COMMITTING, // we are in the process of committing to zk
+    COMMITTED,    // We already committed a segment.
+    ABORTED,      // state machine is aborted. we will start a fresh one when 
the next segmentConsumed comes in.
+  }
+
+  // We will have some variation between hosts, so we add 10% to the max hold 
time to pick a winner.
+  // If there is more than 10% variation, then it is handled as an error case 
(i.e. the first few to
+  // come in will have a winner, and the later ones will just download the 
segment)
+  private static final long MAX_TIME_TO_PICK_WINNER_MS =
+      SegmentCompletionProtocol.MAX_HOLD_TIME_MS + 
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
+
+  // Once we pick a winner, the winner may get notified in the next call, so 
add one hold time plus some.
+  // It may be that the winner is not the server that we are currently 
processing a segmentConsumed()
+  // message from. In that case, we will wait for the next segmetnConsumed() 
message from the picked winner.
+  // If the winner does not come back to us within that time, we abort the 
state machine and start over.
+  private static final long MAX_TIME_TO_NOTIFY_WINNER_MS =
+      MAX_TIME_TO_PICK_WINNER_MS + SegmentCompletionProtocol.MAX_HOLD_TIME_MS 
+ (
+          SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
+
+  public final Logger _logger;
+
+  BlockingSegmentCompletionFSMState _state = 
BlockingSegmentCompletionFSMState.HOLDING;
+      // Typically start off in HOLDING state.
+  final long _startTimeMs;
+  private final LLCSegmentName _segmentName;
+  private final String _rawTableName;
+  private final String _realtimeTableName;
+  private final int _numReplicas;
+  private final Set<String> _excludedServerStateMap;
+  private final Map<String, StreamPartitionMsgOffset> _commitStateMap;
+  private final StreamPartitionMsgOffsetFactory 
_streamPartitionMsgOffsetFactory;
+  private StreamPartitionMsgOffset _winningOffset = null;
+  private String _winner;
+  private final PinotLLCRealtimeSegmentManager _segmentManager;
+  private final SegmentCompletionManager _segmentCompletionManager;
+  private final long _maxTimeToPickWinnerMs;
+  private final long _maxTimeToNotifyWinnerMs;
+  private final long _initialCommitTimeMs;
+  // Once the winner is notified, they are expected to commit right away. At 
this point, it is the segment build
+  // time that we need to consider.
+  // We may need to add some time here to allow for getting the lock? For now 0
+  // We may need to add some time for the committer come back to us (after the 
build)? For now 0.
+  private long _maxTimeAllowedToCommitMs;
+  private final String _controllerVipUrl;
+
+  public BlockingSegmentCompletionFSM(PinotLLCRealtimeSegmentManager 
segmentManager,
+      SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName,
+      SegmentZKMetadata segmentMetadata) {
+    _segmentName = segmentName;
+    _rawTableName = _segmentName.getTableName();
+    _realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
+    _numReplicas = segmentMetadata.getNumReplicas();
+    _segmentManager = segmentManager;
+    _commitStateMap = new HashMap<>(HashUtil.getHashMapCapacity(_numReplicas));
+    _excludedServerStateMap = new HashSet<>(_numReplicas);
+    _segmentCompletionManager = segmentCompletionManager;
+    _startTimeMs = _segmentCompletionManager.getCurrentTimeMs();
+    _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
+    _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
+    _streamPartitionMsgOffsetFactory = 
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
+    long initialCommitTimeMs = MAX_TIME_TO_NOTIFY_WINNER_MS + 
_segmentManager.getCommitTimeoutMS(_realtimeTableName);
+    Long savedCommitTime = 
_segmentCompletionManager.getCommitTime(_rawTableName);
+    if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
+      initialCommitTimeMs = savedCommitTime;
+    }
+    _logger = LoggerFactory.getLogger("SegmentCompletionFSM_" + 
segmentName.getSegmentName());
+    int maxCommitTimeForAllSegmentsSeconds = 
SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds();
+    if (initialCommitTimeMs > maxCommitTimeForAllSegmentsSeconds * 1000L) {
+      // The table has a really high value configured for max commit time. Set 
it to a higher value than default
+      // and go from there.
+      _logger
+          .info("Configured max commit time {}s too high for table {}, 
changing to {}s", initialCommitTimeMs / 1000,
+              _realtimeTableName, maxCommitTimeForAllSegmentsSeconds);
+      initialCommitTimeMs = maxCommitTimeForAllSegmentsSeconds * 1000L;
+    }
+    _initialCommitTimeMs = initialCommitTimeMs;
+    _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
+    _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
+
+    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
+      StreamPartitionMsgOffsetFactory factory =
+          
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
+      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
+      _state = BlockingSegmentCompletionFSMState.COMMITTED;
+      _winningOffset = endOffset;
+      _winner = "UNKNOWN";
+    }
+  }
+
+  @Override
+  public void transitionToInitialState(String msgType) {
+    if (_state == BlockingSegmentCompletionFSMState.COMMITTED) {
+      // Already set; no need to do anything here.
+      return;
+    }
+
+    // If we receive a STOPPED_CONSUMING message before any others, switch to 
PARTIAL_CONSUMING
+    if (SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING.equals(msgType)) {
+      _state = BlockingSegmentCompletionFSMState.PARTIAL_CONSUMING;
+    }
+    // Otherwise, we remain in HOLDING
+  }
+
+  @Override
+  public String toString() {
+    return "{" + _segmentName.getSegmentName() + "," + _state + "," + 
_startTimeMs + "," + _winner + ","
+        + _winningOffset + "," + _controllerVipUrl + "}";
+  }
+
+  // SegmentCompletionManager releases the FSM from the hashtable when it is 
done.
+  public boolean isDone() {
+    return _state.equals(BlockingSegmentCompletionFSMState.COMMITTED) || 
_state.equals(
+        BlockingSegmentCompletionFSMState.ABORTED);
+  }
+
+  /*
+   * We just heard from a server that it has reached completion stage, and is 
reporting the offset
+   * that the server is at. Since multiple servers can come in at the same 
time for this segment,
+   * we need to synchronize on the FSM to handle the messages. The processing 
time itself is small,
+   * so we should be OK with this synchronization.
+   */
+  @Override
+  public SegmentCompletionProtocol.Response segmentConsumed(String instanceId, 
StreamPartitionMsgOffset offset,
+      final String stopReason) {
+    final long now = _segmentCompletionManager.getCurrentTimeMs();
+    // We can synchronize the entire block for the SegmentConsumed message.
+    synchronized (this) {
+      _logger.info("Processing segmentConsumed({}, {})", instanceId, offset);
+      if (_excludedServerStateMap.contains(instanceId)) {
+        // Could be that the server was restarted, and it started consuming 
again, and somehow got to complete
+        // consumption up to this point. We will accept it.
+        _logger.info("Marking instance {} alive again", instanceId);
+        _excludedServerStateMap.remove(instanceId);
+      }
+      _commitStateMap.put(instanceId, offset);
+      switch (_state) {
+        case PARTIAL_CONSUMING:
+          return partialConsumingConsumed(instanceId, offset, now, stopReason);
+
+        case HOLDING:
+          return holdingConsumed(instanceId, offset, now, stopReason);
+
+        case COMMITTER_DECIDED: // This must be a retransmit
+          return committerDecidedConsumed(instanceId, offset, now);
+
+        case COMMITTER_NOTIFIED:
+          return committerNotifiedConsumed(instanceId, offset, now);
+
+        case COMMITTER_UPLOADING:
+          return committerUploadingConsumed(instanceId, offset, now);
+
+        case COMMITTING:
+          return committingConsumed(instanceId, offset, now);
+
+        case COMMITTED:
+          return committedConsumed(instanceId, offset);
+
+        case ABORTED:
+          // FSM has been aborted, just return HOLD
+          return hold(instanceId, offset);
+
+        default:
+          return fail(instanceId, offset);
+      }
+    }
+  }
+
+  /*
+   * A server has sent segmentConsumed() message. The caller will save the 
segment if we return
+   * COMMIT_CONTINUE. We need to verify that it is the same server that we 
notified as the winner
+   * and the offset is the same as what is coming in with the commit. We can 
then move to
+   * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
+   *
+   * In case of discrepancy we move the state machine to ABORTED state so that 
this FSM is removed
+   * from the map, and things start over. In this case, we respond to the 
server with a 'hold' so
+   * that they re-transmit their segmentConsumed() message and start over.
+   */
+  @Override
+  public SegmentCompletionProtocol.Response segmentCommitStart(String 
instanceId, StreamPartitionMsgOffset offset) {
+    long now = _segmentCompletionManager.getCurrentTimeMs();
+    if (_excludedServerStateMap.contains(instanceId)) {
+      _logger.warn("Not accepting commit from {} since it had stoppd 
consuming", instanceId);
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+    synchronized (this) {
+      _logger.info("Processing segmentCommitStart({}, {})", instanceId, 
offset);
+      switch (_state) {
+        case PARTIAL_CONSUMING:
+          return partialConsumingCommit(instanceId, offset, now);
+
+        case HOLDING:
+          return holdingCommit(instanceId, offset, now);
+
+        case COMMITTER_DECIDED:
+          return committerDecidedCommit(instanceId, offset, now);
+
+        case COMMITTER_NOTIFIED:
+          return committerNotifiedCommit(instanceId, offset, now);
+
+        case COMMITTER_UPLOADING:
+          return committerUploadingCommit(instanceId, offset, now);
+
+        case COMMITTING:
+          return committingCommit(instanceId, offset, now);
+
+        case COMMITTED:
+          return committedCommit(instanceId, offset);
+
+        case ABORTED:
+          return hold(instanceId, offset);
+
+        default:
+          return fail(instanceId, offset);
+      }
+    }
+  }
+
+  @Override
+  public SegmentCompletionProtocol.Response stoppedConsuming(String 
instanceId, StreamPartitionMsgOffset offset,
+      String reason) {
+    synchronized (this) {
+      _logger.info("Processing stoppedConsuming({}, {})", instanceId, offset);
+      _excludedServerStateMap.add(instanceId);
+      switch (_state) {
+        case PARTIAL_CONSUMING:
+          return partialConsumingStoppedConsuming(instanceId, offset, reason);
+
+        case HOLDING:
+          return holdingStoppedConsuming(instanceId, offset, reason);
+
+        case COMMITTER_DECIDED:
+          return committerDecidedStoppedConsuming(instanceId, offset, reason);
+
+        case COMMITTER_NOTIFIED:
+          return committerNotifiedStoppedConsuming(instanceId, offset, reason);
+
+        case COMMITTER_UPLOADING:
+          return committerUploadingStoppedConsuming(instanceId, offset, 
reason);
+
+        case COMMITTING:
+          return committingStoppedConsuming(instanceId, offset, reason);
+
+        case COMMITTED:
+          return committedStoppedConsuming(instanceId, offset, reason);
+
+        case ABORTED:
+          _logger.info("Ignoring StoppedConsuming message from {} in state 
{}", instanceId, _state);
+          return SegmentCompletionProtocol.RESP_PROCESSED;
+
+        default:
+          return fail(instanceId, offset);
+      }
+    }
+  }
+
+  @Override
+  public SegmentCompletionProtocol.Response extendBuildTime(final String 
instanceId,
+      final StreamPartitionMsgOffset offset, final int extTimeSec) {
+    final long now = _segmentCompletionManager.getCurrentTimeMs();
+    synchronized (this) {
+      _logger.info("Processing extendBuildTime({}, {}, {})", instanceId, 
offset, extTimeSec);
+      switch (_state) {
+        case PARTIAL_CONSUMING:
+        case HOLDING:
+        case COMMITTER_DECIDED:
+          return fail(instanceId, offset);
+        case COMMITTER_NOTIFIED:
+          return committerNotifiedExtendBuildTime(instanceId, offset, 
extTimeSec, now);
+        case COMMITTER_UPLOADING:
+        case COMMITTING:
+        case COMMITTED:
+        case ABORTED:
+        default:
+          return fail(instanceId, offset);
+      }
+    }
+  }
+
+  /*
+   * We can get this call only when the state is COMMITTER_UPLOADING. Also, 
the instanceId should be equal to
+   * the _winner.
+   */
+  @Override
+  public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
+      CommittingSegmentDescriptor committingSegmentDescriptor) {
+    String instanceId = reqParams.getInstanceId();
+    StreamPartitionMsgOffset offset =
+        
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
+    synchronized (this) {
+      if (_excludedServerStateMap.contains(instanceId)) {
+        _logger.warn("Not accepting commitEnd from {} since it had stoppd 
consuming", instanceId);
+        return abortAndReturnFailed();
+      }
+      _logger.info("Processing segmentCommitEnd({}, {})", instanceId, offset);
+      if 
(!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING) || 
!instanceId.equals(_winner)
+          || offset.compareTo(_winningOffset) != 0) {
+        // State changed while we were out of sync. return a failed commit.
+        _logger.warn("State change during upload: state={} segment={} 
winner={} winningOffset={}", _state,
+            _segmentName.getSegmentName(), _winner, _winningOffset);
+        return abortAndReturnFailed();
+      }
+      SegmentCompletionProtocol.Response response =
+          commitSegment(reqParams, committingSegmentDescriptor);
+      if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
+        return abortAndReturnFailed();
+      } else {
+        return response;
+      }
+    }
+  }
+
+  // Helper methods that log the current state and the response sent
+  private SegmentCompletionProtocol.Response fail(String instanceId, 
StreamPartitionMsgOffset offset) {
+    _logger.info("{}:FAIL for instance={} offset={}", _state, instanceId, 
offset);
+    return SegmentCompletionProtocol.RESP_FAILED;
+  }
+
+  private SegmentCompletionProtocol.Response commit(String instanceId, 
StreamPartitionMsgOffset offset) {
+    long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) / 
1000;
+    _logger
+        .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, 
instanceId, offset, allowedBuildTimeSec);
+    SegmentCompletionProtocol.Response.Params params =
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
+            .withBuildTimeSeconds(allowedBuildTimeSec)
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+            .withControllerVipUrl(_controllerVipUrl);
+    return new SegmentCompletionProtocol.Response(params);
+  }
+
+  private SegmentCompletionProtocol.Response discard(String instanceId, 
StreamPartitionMsgOffset offset) {
+    _logger.warn("{}:DISCARD for instance={} offset={}", _state, instanceId, 
offset);
+    return SegmentCompletionProtocol.RESP_DISCARD;
+  }
+
+  private SegmentCompletionProtocol.Response keep(String instanceId, 
StreamPartitionMsgOffset offset) {
+    _logger.info("{}:KEEP for instance={} offset={}", _state, instanceId, 
offset);
+    return new SegmentCompletionProtocol.Response(
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
+  }
+
+  private SegmentCompletionProtocol.Response catchup(String instanceId, 
StreamPartitionMsgOffset offset) {
+    _logger.info("{}:CATCHUP for instance={} offset={}", _state, instanceId, 
offset);
+    return new SegmentCompletionProtocol.Response(
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(_winningOffset.toString())
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
+  }
+
+  private SegmentCompletionProtocol.Response hold(String instanceId, 
StreamPartitionMsgOffset offset) {
+    _logger.info("{}:HOLD for instance={} offset={}", _state, instanceId, 
offset);
+    return new SegmentCompletionProtocol.Response(new 
SegmentCompletionProtocol.Response.Params()
+        .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
+        .withStreamPartitionMsgOffset(offset.toString()));
+  }
+
+  private SegmentCompletionProtocol.Response abortAndReturnHold(long now, 
String instanceId,
+      StreamPartitionMsgOffset offset) {
+    _state = BlockingSegmentCompletionFSMState.ABORTED;
+    _segmentCompletionManager.getControllerMetrics()
+        .addMeteredTableValue(_rawTableName, 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+    return hold(instanceId, offset);
+  }
+
+  private SegmentCompletionProtocol.Response abortAndReturnFailed() {
+    _state = BlockingSegmentCompletionFSMState.ABORTED;
+    _segmentCompletionManager.getControllerMetrics()
+        .addMeteredTableValue(_rawTableName, 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+    return SegmentCompletionProtocol.RESP_FAILED;
+  }
+
+  private SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long 
now, String instanceId,
+      StreamPartitionMsgOffset offset) {
+    if (now > _maxTimeAllowedToCommitMs) {
+      _logger
+          .warn("{}:Aborting FSM (too late) instance={} offset={} now={} 
start={}", _state, instanceId, offset, now,
+              _startTimeMs);
+      return abortAndReturnHold(now, instanceId, offset);
+    }
+    return null;
+  }
+
+  private int numReplicasToLookFor() {
+    return _numReplicas - _excludedServerStateMap.size();
+  }
+
+  private SegmentCompletionProtocol.Response partialConsumingConsumed(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now, final String stopReason) {
+    // This is the first time we are getting segmentConsumed() for this 
segment.
+    // Some instance thinks we can close this segment, so go to HOLDING state, 
and process as normal.
+    // We will just be looking for less replicas.
+    _state = BlockingSegmentCompletionFSMState.HOLDING;
+    return holdingConsumed(instanceId, offset, now, stopReason);
+  }
+
+  /*
+   * This is not a good state to get a commit message, but it is possible that 
the controller failed while in
+   * COMMITTER_NOTIFIED state, and the first message we got in the new 
controller was a stoppedConsuming
+   * message. As long as the committer is not the one who stopped consuming 
(which we have already checked before
+   * coming here), we will trust the server that this is a valid commit.
+   */
+  private SegmentCompletionProtocol.Response partialConsumingCommit(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    // Do the same as HOLDING__commit
+    return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, 
now);
+  }
+
+  private SegmentCompletionProtocol.Response 
partialConsumingStoppedConsuming(String instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, true);
+  }
+
+  /*
+   * If we have waited "enough", or we have all replicas reported, then we can 
pick a winner.
+   *
+   * Otherwise, we ask the server that is reporting to come back again later 
until one of these conditions hold.
+   *
+   * If we can pick a winner then we go to COMMITTER_DECIDED or 
COMMITTER_NOTIIFIED (if the instance
+   * in this call is the same as winner).
+   *
+   * If we can go to COMMITTER_NOTIFIED then we respond with a COMMIT message, 
otherwise with a HOLD message.
+   */
+  private SegmentCompletionProtocol.Response holdingConsumed(String 
instanceId, StreamPartitionMsgOffset offset,
+      long now, final String stopReason) {
+    SegmentCompletionProtocol.Response response;
+    // If we are past the max time to pick a winner, or we have heard from all 
replicas,
+    // we are ready to pick a winner.
+    if (isWinnerPicked(instanceId, now, stopReason)) {
+      if (_winner.equals(instanceId)) {
+        _logger.info("{}:Committer notified winner instance={} offset={}", 
_state, instanceId, offset);
+        response = commit(instanceId, offset);
+        _state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED;
+      } else {
+        _logger.info("{}:Committer decided winner={} offset={}", _state, 
_winner, _winningOffset);
+        response = catchup(instanceId, offset);
+        _state = BlockingSegmentCompletionFSMState.COMMITTER_DECIDED;
+      }
+    } else {
+      response = hold(instanceId, offset);
+    }
+    return response;
+  }
+
+  /*
+   * This not a good state to receive a commit message, but then it may be 
that the controller
+   * failed over while in the COMMITTER_NOTIFIED state...
+   */
+  private SegmentCompletionProtocol.Response holdingCommit(String instanceId, 
StreamPartitionMsgOffset offset,
+      long now) {
+    return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, 
now);
+  }
+
+  private SegmentCompletionProtocol.Response holdingStoppedConsuming(String 
instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, true);
+  }
+
+  /*
+   * We have already decided who the committer is, but have not let them know 
yet. If this is the committer that
+   * we decided, then respond back with COMMIT. Otherwise, if the offset is 
smaller, respond back with a CATCHUP.
+   * Otherwise, just have the server HOLD. Since the segment is not committed 
yet, we cannot ask them to KEEP or
+   * DISCARD etc. If the committer fails for any reason, we will need a new 
committer.
+   */
+  private SegmentCompletionProtocol.Response committerDecidedConsumed(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    if (offset.compareTo(_winningOffset) > 0) {
+      _logger.warn("{}:Aborting FSM (offset larger than winning) instance={} 
offset={} now={} winning={}", _state,
+          instanceId, offset, now, _winningOffset);
+      return abortAndReturnHold(now, instanceId, offset);
+    }
+    SegmentCompletionProtocol.Response response;
+    if (_winner.equals(instanceId)) {
+      if (_winningOffset.compareTo(offset) == 0) {
+        _logger.info("{}:Notifying winner instance={} offset={}", _state, 
instanceId, offset);
+        response = commit(instanceId, offset);
+        _state = BlockingSegmentCompletionFSMState.COMMITTER_NOTIFIED;
+      } else {
+        // Winner coming back with a different offset.
+        _logger
+            .warn("{}:Winner coming back with different offset for instance={} 
offset={} prevWinnOffset={}", _state,
+                instanceId, offset, _winningOffset);
+        response = abortAndReturnHold(now, instanceId, offset);
+      }
+    } else if (offset.compareTo(_winningOffset) == 0) {
+      // Wait until winner has posted the segment.
+      response = hold(instanceId, offset);
+    } else {
+      response = catchup(instanceId, offset);
+    }
+    if (now > _maxTimeToNotifyWinnerMs) {
+      // Winner never got back to us. Abort the completion protocol and start 
afresh.
+      // We can potentially optimize here to see if this instance has the 
highest so far, and re-elect them to
+      // be winner, but for now, we will abort it and restart
+      response = abortAndReturnHold(now, instanceId, offset);
+    }
+    return response;
+  }
+
+  /*
+   * We have already decided who the committer is, but have not let them know 
yet. So, we don't expect
+   * a commit() call here.
+   */
+  private SegmentCompletionProtocol.Response committerDecidedCommit(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, 
now);
+  }
+
+  private SegmentCompletionProtocol.Response 
committerDecidedStoppedConsuming(String instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, false);
+  }
+
+  /*
+   * We have notified the committer. If we get a consumed message from another 
server, we can ask them to
+   * catchup (if the offset is lower). If anything else, then we pretty much 
ask them to hold.
+   */
+  private SegmentCompletionProtocol.Response committerNotifiedConsumed(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    SegmentCompletionProtocol.Response response;
+    // We have already picked a winner and notified them but we have not heard 
from them yet.
+    // Common case here is that another server is coming back to us with its 
offset. We either respond back with
+    // HOLD or CATCHUP.
+    // If the winner is coming back again, then we have some more conditions 
to look at.
+    response = abortIfTooLateAndReturnHold(now, instanceId, offset);
+    if (response != null) {
+      return response;
+    }
+    if (instanceId.equals(_winner)) {
+      // Winner is coming back to after holding. Somehow they never heard us 
return COMMIT.
+      // Allow them to be winner again, since we are still within time to pick 
a winner.
+      if (offset.compareTo(_winningOffset) == 0) {
+        response = commit(instanceId, offset);
+      } else {
+        // Something seriously wrong. Abort the FSM
+        response = discard(instanceId, offset);
+        _logger.warn("{}:Aborting for instance={} offset={}", _state, 
instanceId, offset);
+        _state = BlockingSegmentCompletionFSMState.ABORTED;
+      }
+    } else {
+      // Common case: A different instance is reporting.
+      if (offset.compareTo(_winningOffset) == 0) {
+        // Wait until winner has posted the segment before asking this server 
to KEEP the segment.
+        response = hold(instanceId, offset);
+      } else if (offset.compareTo(_winningOffset) < 0) {
+        response = catchup(instanceId, offset);
+      } else {
+        // We have not yet committed, so ask the new responder to hold. They 
may be the new leader in case the
+        // committer fails.
+        response = hold(instanceId, offset);
+      }
+    }
+    return response;
+  }
+
+  /*
+   * We have notified the committer. If we get a consumed message from another 
server, we can ask them to
+   * catchup (if the offset is lower). If anything else, then we pretty much 
ask them to hold.
+   */
+  private SegmentCompletionProtocol.Response committerNotifiedCommit(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    SegmentCompletionProtocol.Response response = null;
+    response = checkBadCommitRequest(instanceId, offset, now);
+    if (response != null) {
+      return response;
+    }
+    _logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, 
offset);
+    _state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING;
+    long commitTimeMs = now - _startTimeMs;
+    if (commitTimeMs > _initialCommitTimeMs) {
+      // We assume that the commit time holds for all partitions. It is 
possible, though, that one partition
+      // commits at a lower time than another partition, and the two 
partitions are going simultaneously,
+      // and we may not get the maximum value all the time.
+      _segmentCompletionManager.setCommitTime(_segmentName.getTableName(), 
commitTimeMs);
+    }
+    return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
+  }
+
+  private SegmentCompletionProtocol.Response 
committerNotifiedStoppedConsuming(String instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, false);
+  }
+
+  private SegmentCompletionProtocol.Response 
committerNotifiedExtendBuildTime(String instanceId,
+      StreamPartitionMsgOffset offset, int extTimeSec, long now) {
+    SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
+    if (response == null) {
+      long maxTimeAllowedToCommitMs = now + extTimeSec * 1000;
+      if (maxTimeAllowedToCommitMs
+          > _startTimeMs + 
SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds() * 1000L) {
+        _logger.warn("Not accepting lease extension from {} startTime={} 
requestedTime={}", instanceId, _startTimeMs,
+            maxTimeAllowedToCommitMs);
+        return abortAndReturnFailed();
+      }
+      _maxTimeAllowedToCommitMs = maxTimeAllowedToCommitMs;
+      response = SegmentCompletionProtocol.RESP_PROCESSED;
+    }
+    return response;
+  }
+
+  private SegmentCompletionProtocol.Response committerUploadingConsumed(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    return processConsumedAfterCommitStart(instanceId, offset, now);
+  }
+
+  private SegmentCompletionProtocol.Response committerUploadingCommit(String 
instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    return processCommitWhileUploading(instanceId, offset, now);
+  }
+
+  private SegmentCompletionProtocol.Response 
committerUploadingStoppedConsuming(String instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, false);
+  }
+
+  private SegmentCompletionProtocol.Response committingConsumed(String 
instanceId, StreamPartitionMsgOffset offset,
+      long now) {
+    return processConsumedAfterCommitStart(instanceId, offset, now);
+  }
+
+  private SegmentCompletionProtocol.Response committingCommit(String 
instanceId, StreamPartitionMsgOffset offset,
+      long now) {
+    return processCommitWhileUploading(instanceId, offset, now);
+  }
+
+  private SegmentCompletionProtocol.Response committingStoppedConsuming(String 
instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, false);
+  }
+
+  private SegmentCompletionProtocol.Response committedConsumed(String 
instanceId, StreamPartitionMsgOffset offset) {
+    // Server reporting an offset on an already completed segment. Depending 
on the offset, either KEEP or DISCARD.
+    SegmentCompletionProtocol.Response response;
+    if (offset.compareTo(_winningOffset) == 0) {
+      response = keep(instanceId, offset);
+    } else {
+      // Return DISCARD. It is hard to say how long the server will take to 
complete things.
+      response = discard(instanceId, offset);
+    }
+    return response;
+  }
+
+  private SegmentCompletionProtocol.Response committedCommit(String 
instanceId, StreamPartitionMsgOffset offset) {
+    if (offset.compareTo(_winningOffset) == 0) {
+      return keep(instanceId, offset);
+    }
+    return discard(instanceId, offset);
+  }
+
+  private SegmentCompletionProtocol.Response committedStoppedConsuming(String 
instanceId,
+      StreamPartitionMsgOffset offset, String reason) {
+    return processStoppedConsuming(instanceId, offset, reason, false);
+  }
+
+  private SegmentCompletionProtocol.Response processStoppedConsuming(String 
instanceId,
+      StreamPartitionMsgOffset offset, String reason, boolean createNew) {
+    _logger
+        .info("Instance {} stopped consuming segment {} at offset {}, state 
{}, createNew: {}, reason:{}", instanceId,
+            _segmentName, offset, _state, createNew, reason);
+    try {
+      _segmentManager.segmentStoppedConsuming(_segmentName, instanceId);
+    } catch (Exception e) {
+      _logger.error("Caught exception while processing stopped CONSUMING 
segment: {} on instance: {}",
+          _segmentName.getSegmentName(), instanceId, e);
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+    return SegmentCompletionProtocol.RESP_PROCESSED;
+  }
+
+  // A common method when the state is > COMMITTER_NOTIFIED.
+  private SegmentCompletionProtocol.Response 
processConsumedAfterCommitStart(String instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    SegmentCompletionProtocol.Response response;
+    // We have already picked a winner, and may or many not have heard from 
them.
+    // Common case here is that another server is coming back to us with its 
offset. We either respond back with
+    // HOLD or CATCHUP.
+    // It may be that we never heard from the committer, or the committer is 
taking too long to commit the segment.
+    // In that case, we abort the FSM and start afresh (i.e, return HOLD).
+    // If the winner is coming back again, then we have some more conditions 
to look at.
+    response = abortIfTooLateAndReturnHold(now, instanceId, offset);
+    if (response != null) {
+      return response;
+    }
+    if (instanceId.equals(_winner)) {
+      // The winner is coming back to report its offset. Take a decision based 
on the offset reported, and whether we
+      // already notified them
+      // Winner is supposedly already in the commit call. Something wrong.
+      LOGGER.warn(
+          "{}:Aborting FSM because winner is reporting a segment while it is 
also committing instance={} offset={} "
+              + "now={}", _state, instanceId, offset, now);
+      // Ask them to hold, just in case the committer fails for some reason..
+      return abortAndReturnHold(now, instanceId, offset);
+    } else {
+      // Common case: A different instance is reporting.
+      if (offset.compareTo(_winningOffset) == 0) {
+        // Wait until winner has posted the segment before asking this server 
to KEEP the segment.
+        response = hold(instanceId, offset);
+      } else if (offset.compareTo(_winningOffset) < 0) {
+        response = catchup(instanceId, offset);
+      } else {
+        // We have not yet committed, so ask the new responder to hold. They 
may be the new leader in case the
+        // committer fails.
+        response = hold(instanceId, offset);
+      }
+    }
+    return response;
+  }
+
+  private SegmentCompletionProtocol.Response 
commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
+      CommittingSegmentDescriptor committingSegmentDescriptor) {
+    String instanceId = reqParams.getInstanceId();
+    StreamPartitionMsgOffset offset =
+        
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
+    if (!_state.equals(BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING)) 
{
+      // State changed while we were out of sync. return a failed commit.
+      _logger.warn("State change during upload: state={} segment={} winner={} 
winningOffset={}", _state,
+          _segmentName.getSegmentName(), _winner, _winningOffset);
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+    _logger.info("Committing segment {} at offset {} winner {}", 
_segmentName.getSegmentName(), offset, instanceId);
+    _state = BlockingSegmentCompletionFSMState.COMMITTING;
+    // In case of splitCommit, the segment is uploaded to a unique file name 
indicated by segmentLocation,
+    // so we need to move the segment file to its permanent location first 
before committing the metadata.
+    // The committingSegmentDescriptor is then updated with the permanent 
segment location to be saved in metadata
+    // store.
+    try {
+      _segmentManager.commitSegmentFile(_realtimeTableName, 
committingSegmentDescriptor);
+    } catch (Exception e) {
+      _logger.error("Caught exception while committing segment file for 
segment: {}", _segmentName.getSegmentName(),
+          e);
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+    try {
+      // Convert to a controller uri if the segment location uses local file 
scheme.
+      if (CommonConstants.Segment.LOCAL_SEGMENT_SCHEME
+          
.equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme()))
 {
+        committingSegmentDescriptor.setSegmentLocation(URIUtils
+            .constructDownloadUrl(_controllerVipUrl, 
TableNameBuilder.extractRawTableName(_realtimeTableName),
+                _segmentName.getSegmentName()));
+      }
+      _segmentManager.commitSegmentMetadata(_realtimeTableName, 
committingSegmentDescriptor);
+    } catch (Exception e) {
+      _logger
+          .error("Caught exception while committing segment metadata for 
segment: {}", _segmentName.getSegmentName(),
+              e);
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+
+    _state = BlockingSegmentCompletionFSMState.COMMITTED;
+    _logger.info("Committed segment {} at offset {} winner {}", 
_segmentName.getSegmentName(), offset, instanceId);
+    return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
+  }
+
+  private SegmentCompletionProtocol.Response 
processCommitWhileUploading(String instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
+    SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
+    if (response != null) {
+      return response;
+    }
+    // Another committer (or same) came in while one was uploading. Ask them 
to hold in case this one fails.
+    return new SegmentCompletionProtocol.Response(
+        new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
+  }
+
+  private SegmentCompletionProtocol.Response checkBadCommitRequest(String 
instanceId, StreamPartitionMsgOffset offset,
+      long now) {
+    SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
+    if (response != null) {
+      return response;
+    } else if (instanceId.equals(_winner) && offset.compareTo(_winningOffset) 
!= 0) {
+      // Hmm. Committer has been notified, but either a different one is 
committing, or offset is different
+      _logger.warn("{}:Aborting FSM (bad commit req) instance={} offset={} 
now={} winning={}", _state, instanceId,
+          offset, now, _winningOffset);
+      return abortAndReturnHold(now, instanceId, offset);
+    }
+    return null;
+  }
+
+  private SegmentCompletionProtocol.Response 
processCommitWhileHoldingOrPartialConsuming(String instanceId,
+      StreamPartitionMsgOffset offset, long now) {
+    _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
+    SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
+    if (response != null) {
+      return response;
+    }
+    // We cannot get a commit if we are in this state, so ask them to hold. 
Maybe we are starting after a failover.
+    // The server will re-send the segmentConsumed message.
+    return hold(instanceId, offset);
+  }
+
+  /**
+   * Pick a winner if we can, preferring the instance that we are handling 
right now,
+   *
+   * We accept the first server to report an offset as long as the server 
stopped consumption
+   * due to row limit. The premise is that other servers will also stop at row 
limit, and there
+   * is no need to wait for them to report an offset in order to decide on a 
winner. The state machine takes care
+   * of the cases where other servers may report different offsets (just in 
case).
+   *
+   * If the above condition is not satisfied (i.e. either this is not the 
first server, or it did not reach
+   * row limit), then we can pick a winner only if it is too late to pick a 
winner, or we have heard from all
+   * servers.
+   *
+   * Otherwise, we wait to hear from more servers.
+   *
+   * @param preferredInstance The instance that is reporting in this thread.
+   * @param now current time
+   * @param stopReason reason reported by instance for stopping consumption.
+   * @return true if winner picked, false otherwise.
+   */
+  private boolean isWinnerPicked(String preferredInstance, long now, final 
String stopReason) {
+    if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
+        || 
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
+        && _commitStateMap.size() == 1) {
+      _winner = preferredInstance;
+      _winningOffset = _commitStateMap.get(preferredInstance);
+      return true;
+    } else if (now > _maxTimeToPickWinnerMs || _commitStateMap.size() == 
numReplicasToLookFor()) {
+      _logger.info("{}:Picking winner time={} size={}", _state, now - 
_startTimeMs, _commitStateMap.size());
+      StreamPartitionMsgOffset maxOffsetSoFar = null;
+      String winnerSoFar = null;
+      for (Map.Entry<String, StreamPartitionMsgOffset> entry : 
_commitStateMap.entrySet()) {
+        if (maxOffsetSoFar == null || 
entry.getValue().compareTo(maxOffsetSoFar) > 0) {
+          maxOffsetSoFar = entry.getValue();
+          winnerSoFar = entry.getKey();
+        }
+      }
+      _winningOffset = maxOffsetSoFar;
+      if (_commitStateMap.get(preferredInstance).compareTo(maxOffsetSoFar) == 
0) {
+        winnerSoFar = preferredInstance;
+      }
+      _winner = winnerSoFar;
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
new file mode 100644
index 0000000000..c5d0ece4c6
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class SegmentCompletionConfig {
+  public static final String FSM_SCHEME =
+      CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION 
+ ".fsm.scheme.";
+  public static final String DEFAULT_FSM_SCHEME_KEY =
+      CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION 
+ ".fsm.scheme.default";
+  public static final String DEFAULT_FSM_SCHEME = "default";
+  private final Map<String, String> _fsmSchemes = new HashMap<>();
+  private final String _defaultFsmScheme;
+
+  public SegmentCompletionConfig(PinotConfiguration configuration) {
+    // Parse properties to extract FSM schemes
+    // Assuming properties keys are in the format scheme=className
+    for (String key : configuration.getKeys()) {
+      if (key.startsWith(FSM_SCHEME)) {
+        String scheme = key.substring(FSM_SCHEME.length());
+        String className = configuration.getProperty(key);
+        _fsmSchemes.put(scheme, className);
+      }
+    }
+
+    // Get the default FSM scheme
+    _defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY, 
DEFAULT_FSM_SCHEME);
+  }
+
+  public Map<String, String> getFsmSchemes() {
+    return _fsmSchemes;
+  }
+
+  public String getDefaultFsmScheme() {
+    return _defaultFsmScheme;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java
new file mode 100644
index 0000000000..516ce4c07d
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSM.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * Interface for managing the state machine transitions related to segment 
completion
+ * in a real-time table within Apache Pinot.
+ *
+ * The segment completion process is crucial for handling real-time ingestion, 
ensuring
+ * that segments are correctly built, committed, or discarded based on the 
server's state
+ * and the protocol defined for low-level consumer (LLC) tables. This 
interface abstracts
+ * the controller-side logic for responding to server events during the 
lifecycle of a
+ * segment's completion.
+ */
+public interface SegmentCompletionFSM {
+
+  /**
+   * Initializes the FSM to its initial state based on the message type.
+   * @param msgType The message type that triggered the FSM initialization.
+   *                This is sent by the server when triggering the commit.
+   *                The current supported values are segmentConsumed, 
segmentCommit
+   *                
,segmentCommitStart,segmentUpload,segmentCommitEnd,segmentCommitEndWithMetadata
+   *                ,segmentStoppedConsuming,extendBuildTime
+   *
+   */
+  void transitionToInitialState(String msgType);
+
+  /**
+   * Checks whether the segment completion process has completed.
+   *
+   * The process is considered complete when the segment has been either 
successfully
+   * committed or marked as aborted. This method helps determine if the FSM 
can be
+   * removed from memory.
+   *
+   * @return {@code true} if the FSM has reached a terminal state, {@code 
false} otherwise.
+   */
+  boolean isDone();
+
+  /**
+   * Processes the event where a server indicates it has consumed up to a 
specified offset.
+   *
+   * This is typically triggered when a server finishes consuming data for a 
segment due
+   * to reaching a row limit, an end-of-partition signal, or another stopping 
condition.
+   * The FSM evaluates the reported offset and determines the next state or 
action for
+   * the server, such as holding, catching up, or committing the segment.
+   *
+   * @param instanceId The ID of the server instance reporting consumption.
+   * @param offset The offset up to which the server has consumed.
+   * @param stopReason The reason the server stopped consuming (e.g., row 
limit or end of partition).
+   * @return A response indicating the next action for the server (e.g., HOLD, 
CATCHUP, or COMMIT).
+   */
+  SegmentCompletionProtocol.Response segmentConsumed(String instanceId, 
StreamPartitionMsgOffset offset,
+      String stopReason);
+
+  /**
+   * Processes the start of a segment commit from a server.
+   *
+   * This occurs when a server signals its intention to commit a segment it 
has built.
+   * The FSM verifies whether the server is eligible to commit based on its 
previous
+   * state and the reported offset, and transitions to a committing state if 
appropriate.
+   *
+   * @param instanceId The ID of the server instance attempting to commit.
+   * @param offset The offset being committed by the server.
+   * @return A response indicating the next action for the server (e.g., 
CONTINUE or FAILED).
+   */
+  SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, 
StreamPartitionMsgOffset offset);
+
+  /**
+   * Handles the event where a server indicates it has stopped consuming.
+   *
+   * This is triggered when a server cannot continue consuming for a segment, 
potentially
+   * due to resource constraints, errors, or a manual stop. The FSM updates 
its state
+   * and determines whether the server can participate in subsequent actions 
for the segment.
+   *
+   * @param instanceId The ID of the server instance reporting the stopped 
consumption.
+   * @param offset The offset at which the server stopped consuming.
+   * @param reason The reason for stopping consumption (e.g., resource 
constraints or errors).
+   * @return A response indicating the next action for the server (e.g., 
PROCESSED or FAILED).
+   */
+  SegmentCompletionProtocol.Response stoppedConsuming(String instanceId, 
StreamPartitionMsgOffset offset,
+      String reason);
+
+  /**
+   * Handles a request to extend the time allowed for segment building.
+   *
+   * If a server requires more time to build a segment, it can request an 
extension.
+   * The FSM evaluates the request in the context of the current state and the 
protocol's
+   * constraints, and either grants or denies the extension.
+   *
+   * @param instanceId The ID of the server instance requesting an extension.
+   * @param offset The offset at which the server is currently consuming.
+   * @param extTimeSec The additional time (in seconds) requested for segment 
building.
+   * @return A response indicating whether the extension was accepted or 
denied.
+   */
+  SegmentCompletionProtocol.Response extendBuildTime(String instanceId, 
StreamPartitionMsgOffset offset,
+      int extTimeSec);
+
+  /**
+   * Processes the end of a segment commit from a server.
+   *
+   * This method is triggered when a server has completed uploading the 
segment and
+   * signals the end of the commit process. The FSM validates the commit, 
updates metadata,
+   * and finalizes the segment's state. Depending on the outcome, the segment 
is either
+   * successfully committed or the FSM transitions to an error state.
+   *
+   * @param reqParams The parameters of the commit request.
+   * @param committingSegmentDescriptor Metadata about the segment being 
committed.
+   * @return A response indicating whether the commit was successful or failed.
+   */
+  SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
+      CommittingSegmentDescriptor committingSegmentDescriptor);
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
new file mode 100644
index 0000000000..5ac238d626
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentCompletionFSMFactory {
+  private SegmentCompletionFSMFactory() {
+    // Private constructor to prevent instantiation
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
+  private static final Map<String, Class<? extends SegmentCompletionFSM>> 
FSM_CLASS_MAP = new HashMap<>();
+
+  // Static block to register the default FSM
+  static {
+    register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME, 
BlockingSegmentCompletionFSM.class);
+  }
+
+  /**
+   * Registers an FSM class with a specific scheme/type.
+   *
+   * @param scheme The scheme or type key.
+   * @param fsmClass The class for the FSM.
+   */
+  public static void register(String scheme, Class<? extends 
SegmentCompletionFSM> fsmClass) {
+    Preconditions.checkNotNull(scheme, "Scheme cannot be null");
+    Preconditions.checkNotNull(fsmClass, "FSM Class cannot be null");
+    Preconditions.checkState(FSM_CLASS_MAP.put(scheme, fsmClass) == null,
+        "FSM class already registered for scheme: " + scheme);
+    LOGGER.info("Registered SegmentCompletionFSM class {} for scheme {}", 
fsmClass, scheme);
+  }
+
+  /**
+   * Initializes the factory with configurations.
+   *
+   * @param fsmFactoryConfig The configuration object containing FSM schemes 
and classes.
+   */
+  public static void init(SegmentCompletionConfig fsmFactoryConfig) {
+    Map<String, String> schemesConfig = fsmFactoryConfig.getFsmSchemes();
+    for (Map.Entry<String, String> entry : schemesConfig.entrySet()) {
+      String scheme = entry.getKey();
+      String className = entry.getValue();
+      try {
+        LOGGER.info("Initializing SegmentCompletionFSM for scheme {}, 
classname {}", scheme, className);
+        Class<?> clazz = Class.forName(className);
+        register(scheme, (Class<? extends SegmentCompletionFSM>) clazz);
+      } catch (Exception e) {
+        LOGGER.error("Could not register FSM class for class {} with scheme 
{}", className, scheme, e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Creates an FSM instance based on the scheme/type.
+   *
+   * @param scheme The scheme or type key.
+   * @param manager The SegmentCompletionManager instance.
+   * @param segmentManager The PinotLLCRealtimeSegmentManager instance.
+   * @param llcSegmentName The segment name.
+   * @param segmentMetadata The segment metadata.
+   * @return An instance of SegmentCompletionFSM.
+   */
+  public static SegmentCompletionFSM createFSM(String scheme,
+      SegmentCompletionManager manager,
+      PinotLLCRealtimeSegmentManager segmentManager,
+      LLCSegmentName llcSegmentName,
+      SegmentZKMetadata segmentMetadata) {
+    Class<? extends SegmentCompletionFSM> fsmClass = FSM_CLASS_MAP.get(scheme);
+    Preconditions.checkState(fsmClass != null, "No FSM registered for scheme: 
" + scheme);
+    try {
+      return fsmClass.getConstructor(
+          PinotLLCRealtimeSegmentManager.class,
+          SegmentCompletionManager.class,
+          LLCSegmentName.class,
+          SegmentZKMetadata.class
+      ).newInstance(segmentManager, manager, llcSegmentName, segmentMetadata);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create FSM instance for scheme {}", scheme, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Checks if a scheme is supported.
+   *
+   * @param factoryType The scheme to check.
+   * @return True if supported, false otherwise.
+   */
+  public static boolean isFactoryTypeSupported(String factoryType) {
+    return FSM_CLASS_MAP.containsKey(factoryType);
+  }
+
+  /**
+   * Clears all registered FSM classes.
+   */
+  public static void shutdown() {
+    FSM_CLASS_MAP.clear();
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 92a9adc1c0..63d302f929 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -20,28 +20,22 @@ package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.LeadControllerManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -60,17 +54,6 @@ public class SegmentCompletionManager {
   // TODO Can we log using the segment name in the log message?
   public static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentCompletionManager.class);
 
-  private enum State {
-    PARTIAL_CONSUMING,  // Indicates that at least one replica has reported 
that it has stopped consuming.
-    HOLDING,          // the segment has started finalizing.
-    COMMITTER_DECIDED, // We know who the committer will be, we will let them 
know next time they call segmentConsumed()
-    COMMITTER_NOTIFIED, // we notified the committer to commit.
-    COMMITTER_UPLOADING,  // committer is uploading.
-    COMMITTING, // we are in the process of committing to zk
-    COMMITTED,    // We already committed a segment.
-    ABORTED,      // state machine is aborted. we will start a fresh one when 
the next segmentConsumed comes in.
-  }
-
   private final HelixManager _helixManager;
   // A map that holds the FSM for each segment.
   private final Map<String, SegmentCompletionFSM> _fsmMap = new 
ConcurrentHashMap<>();
@@ -78,6 +61,8 @@ public class SegmentCompletionManager {
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
   private final LeadControllerManager _leadControllerManager;
+  private final SegmentCompletionConfig _segmentCompletionConfig;
+
 
   // Half hour max commit time for all segments
   private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -91,13 +76,17 @@ public class SegmentCompletionManager {
 
   public SegmentCompletionManager(HelixManager helixManager, 
PinotLLCRealtimeSegmentManager segmentManager,
       ControllerMetrics controllerMetrics, LeadControllerManager 
leadControllerManager,
-      int segmentCommitTimeoutSeconds) {
+      int segmentCommitTimeoutSeconds, SegmentCompletionConfig 
segmentCompletionConfig) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
     _leadControllerManager = leadControllerManager;
     SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
         TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, 
TimeUnit.SECONDS));
+    _segmentCompletionConfig = segmentCompletionConfig;
+
+    // Initialize the FSM Factory
+    SegmentCompletionFSMFactory.init(_segmentCompletionConfig);
   }
 
   public String getControllerVipUrl() {
@@ -108,6 +97,7 @@ public class SegmentCompletionManager {
     return System.currentTimeMillis();
   }
 
+
   protected StreamPartitionMsgOffsetFactory 
getStreamPartitionMsgOffsetFactory(LLCSegmentName llcSegmentName) {
     String rawTableName = llcSegmentName.getTableName();
     TableConfig tableConfig = 
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
@@ -116,6 +106,18 @@ public class SegmentCompletionManager {
     return 
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
   }
 
+  public Long getCommitTime(String tableName) {
+    return _commitTimeMap.get(tableName);
+  }
+
+  public void setCommitTime(String tableName, long commitTime) {
+    _commitTimeMap.put(tableName, commitTime);
+  }
+
+  public ControllerMetrics getControllerMetrics() {
+    return _controllerMetrics;
+  }
+
   private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName 
llcSegmentName, String msgType) {
     return _fsmMap.computeIfAbsent(llcSegmentName.getSegmentName(), k -> 
createFsm(llcSegmentName, msgType));
   }
@@ -125,20 +127,27 @@ public class SegmentCompletionManager {
     String segmentName = llcSegmentName.getSegmentName();
     SegmentZKMetadata segmentMetadata = 
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
     Preconditions.checkState(segmentMetadata != null, "Failed to find ZK 
metadata for segment: %s", segmentName);
-    SegmentCompletionFSM fsm;
-    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
-      // Segment is already committed
-      StreamPartitionMsgOffsetFactory factory = 
getStreamPartitionMsgOffsetFactory(llcSegmentName);
-      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
-      fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, 
llcSegmentName, segmentMetadata.getNumReplicas(),
-          endOffset);
-    } else if 
(msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
-      fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, 
llcSegmentName,
-          segmentMetadata.getNumReplicas());
-    } else {
-      // Segment is in the process of completing, and this is the first one to 
respond. Create fsm
-      fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, 
llcSegmentName, segmentMetadata.getNumReplicas());
+
+    TableConfig tableConfig = 
_segmentManager.getTableConfig(realtimeTableName);
+    String factoryName = null;
+    try {
+      Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+      factoryName = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME);
+    } catch (Exception e) {
+      // If there is an exception, we default to the default factory.
+    }
+
+    if (factoryName == null) {
+      factoryName = _segmentCompletionConfig.getDefaultFsmScheme();
     }
+
+    
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
+        "No FSM registered for name: " + factoryName);
+
+    SegmentCompletionFSM fsm =
+        SegmentCompletionFSMFactory.createFSM(factoryName, this, 
_segmentManager, llcSegmentName, segmentMetadata);
+    fsm.transitionToInitialState(msgType);
+
     LOGGER.info("Created FSM {}", fsm);
     return fsm;
   }
@@ -302,854 +311,6 @@ public class SegmentCompletionManager {
     return response;
   }
 
-  /**
-   * This class implements the FSM on the controller side for each completing 
segment.
-   *
-   * An FSM is is created when we first hear about a segment (typically 
through the segmentConsumed message).
-   * When an FSM is created, it may have one of two start states (HOLDING, or 
COMMITTED), depending on the
-   * constructor used.
-   *
-   * We kick off an FSM in the COMMITTED state (rare) when we find that 
PROPERTYSTORE already has the segment
-   * with the Status set to DONE.
-   *
-   * We kick off an FSM in the HOLDING state (typical) when a sementConsumed() 
message arrives from the
-   * first server we hear from.
-   *
-   * The FSM does not have a timer. It is clocked by the servers, which, 
typically, are retransmitting their
-   * segmentConsumed() message every so often 
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS).
-   *
-   * See https://github.com/linkedin/pinot/wiki/Low-level-kafka-consumers
-   */
-  private static class SegmentCompletionFSM {
-    // We will have some variation between hosts, so we add 10% to the max 
hold time to pick a winner.
-    // If there is more than 10% variation, then it is handled as an error 
case (i.e. the first few to
-    // come in will have a winner, and the later ones will just download the 
segment)
-    private static final long MAX_TIME_TO_PICK_WINNER_MS =
-        SegmentCompletionProtocol.MAX_HOLD_TIME_MS + 
(SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
-
-    // Once we pick a winner, the winner may get notified in the next call, so 
add one hold time plus some.
-    // It may be that the winner is not the server that we are currently 
processing a segmentConsumed()
-    // message from. In that case, we will wait for the next segmetnConsumed() 
message from the picked winner.
-    // If the winner does not come back to us within that time, we abort the 
state machine and start over.
-    private static final long MAX_TIME_TO_NOTIFY_WINNER_MS =
-        MAX_TIME_TO_PICK_WINNER_MS + 
SegmentCompletionProtocol.MAX_HOLD_TIME_MS + (
-            SegmentCompletionProtocol.MAX_HOLD_TIME_MS / 10);
-
-    public final Logger _logger;
-
-    State _state = State.HOLDING;   // Typically start off in HOLDING state.
-    final long _startTimeMs;
-    private final LLCSegmentName _segmentName;
-    private final String _rawTableName;
-    private final String _realtimeTableName;
-    private final int _numReplicas;
-    private final Set<String> _excludedServerStateMap;
-    private final Map<String, StreamPartitionMsgOffset> _commitStateMap;
-    private final StreamPartitionMsgOffsetFactory 
_streamPartitionMsgOffsetFactory;
-    private StreamPartitionMsgOffset _winningOffset = null;
-    private String _winner;
-    private final PinotLLCRealtimeSegmentManager _segmentManager;
-    private final SegmentCompletionManager _segmentCompletionManager;
-    private final long _maxTimeToPickWinnerMs;
-    private final long _maxTimeToNotifyWinnerMs;
-    private final long _initialCommitTimeMs;
-    // Once the winner is notified, they are expected to commit right away. At 
this point, it is the segment build
-    // time that we need to consider.
-    // We may need to add some time here to allow for getting the lock? For 
now 0
-    // We may need to add some time for the committer come back to us (after 
the build)? For now 0.
-    private long _maxTimeAllowedToCommitMs;
-    private final String _controllerVipUrl;
-
-    public static SegmentCompletionFSM 
fsmInHolding(PinotLLCRealtimeSegmentManager segmentManager,
-        SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName, int numReplicas) {
-      return new SegmentCompletionFSM(segmentManager, 
segmentCompletionManager, segmentName, numReplicas);
-    }
-
-    public static SegmentCompletionFSM 
fsmInCommit(PinotLLCRealtimeSegmentManager segmentManager,
-        SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName, int numReplicas,
-        StreamPartitionMsgOffset winningOffset) {
-      return new SegmentCompletionFSM(segmentManager, 
segmentCompletionManager, segmentName, numReplicas,
-          winningOffset);
-    }
-
-    public static SegmentCompletionFSM 
fsmStoppedConsuming(PinotLLCRealtimeSegmentManager segmentManager,
-        SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName, int numReplicas) {
-      SegmentCompletionFSM fsm =
-          new SegmentCompletionFSM(segmentManager, segmentCompletionManager, 
segmentName, numReplicas);
-      fsm._state = State.PARTIAL_CONSUMING;
-      return fsm;
-    }
-
-    // Ctor that starts the FSM in HOLDING state
-    private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
-        SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName, int numReplicas) {
-      _segmentName = segmentName;
-      _rawTableName = _segmentName.getTableName();
-      _realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
-      _numReplicas = numReplicas;
-      _segmentManager = segmentManager;
-      _commitStateMap = new 
HashMap<>(HashUtil.getHashMapCapacity(_numReplicas));
-      _excludedServerStateMap = new HashSet<>(_numReplicas);
-      _segmentCompletionManager = segmentCompletionManager;
-      _startTimeMs = _segmentCompletionManager.getCurrentTimeMs();
-      _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
-      _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
-      _streamPartitionMsgOffsetFactory = 
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
-      long initialCommitTimeMs = MAX_TIME_TO_NOTIFY_WINNER_MS + 
_segmentManager.getCommitTimeoutMS(_realtimeTableName);
-      Long savedCommitTime = 
_segmentCompletionManager._commitTimeMap.get(_rawTableName);
-      if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
-        initialCommitTimeMs = savedCommitTime;
-      }
-      _logger = LoggerFactory.getLogger("SegmentCompletionFSM_" + 
segmentName.getSegmentName());
-      if (initialCommitTimeMs > MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 
1000) {
-        // The table has a really high value configured for max commit time. 
Set it to a higher value than default
-        // and go from there.
-        _logger
-            .info("Configured max commit time {}s too high for table {}, 
changing to {}s", initialCommitTimeMs / 1000,
-                _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
-        initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
-      }
-      _initialCommitTimeMs = initialCommitTimeMs;
-      _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
-      _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
-    }
-
-    // Ctor that starts the FSM in COMMITTED state
-    private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
-        SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName, int numReplicas,
-        StreamPartitionMsgOffset winningOffset) {
-      // Constructor used when we get an event after a segment is committed.
-      this(segmentManager, segmentCompletionManager, segmentName, numReplicas);
-      _state = State.COMMITTED;
-      _winningOffset = winningOffset;
-      _winner = "UNKNOWN";
-    }
-
-    @Override
-    public String toString() {
-      return "{" + _segmentName.getSegmentName() + "," + _state + "," + 
_startTimeMs + "," + _winner + ","
-          + _winningOffset + "," + _controllerVipUrl + "}";
-    }
-
-    // SegmentCompletionManager releases the FSM from the hashtable when it is 
done.
-    public boolean isDone() {
-      return _state.equals(State.COMMITTED) || _state.equals(State.ABORTED);
-    }
-
-    /*
-     * We just heard from a server that it has reached completion stage, and 
is reporting the offset
-     * that the server is at. Since multiple servers can come in at the same 
time for this segment,
-     * we need to synchronize on the FSM to handle the messages. The 
processing time itself is small,
-     * so we should be OK with this synchronization.
-     */
-    public SegmentCompletionProtocol.Response segmentConsumed(String 
instanceId, StreamPartitionMsgOffset offset,
-        final String stopReason) {
-      final long now = _segmentCompletionManager.getCurrentTimeMs();
-      // We can synchronize the entire block for the SegmentConsumed message.
-      synchronized (this) {
-        _logger.info("Processing segmentConsumed({}, {})", instanceId, offset);
-        if (_excludedServerStateMap.contains(instanceId)) {
-          // Could be that the server was restarted, and it started consuming 
again, and somehow got to complete
-          // consumption up to this point. We will accept it.
-          _logger.info("Marking instance {} alive again", instanceId);
-          _excludedServerStateMap.remove(instanceId);
-        }
-        _commitStateMap.put(instanceId, offset);
-        switch (_state) {
-          case PARTIAL_CONSUMING:
-            return partialConsumingConsumed(instanceId, offset, now, 
stopReason);
-
-          case HOLDING:
-            return holdingConsumed(instanceId, offset, now, stopReason);
-
-          case COMMITTER_DECIDED: // This must be a retransmit
-            return committerDecidedConsumed(instanceId, offset, now);
-
-          case COMMITTER_NOTIFIED:
-            return committerNotifiedConsumed(instanceId, offset, now);
-
-          case COMMITTER_UPLOADING:
-            return committerUploadingConsumed(instanceId, offset, now);
-
-          case COMMITTING:
-            return committingConsumed(instanceId, offset, now);
-
-          case COMMITTED:
-            return committedConsumed(instanceId, offset);
-
-          case ABORTED:
-            // FSM has been aborted, just return HOLD
-            return hold(instanceId, offset);
-
-          default:
-            return fail(instanceId, offset);
-        }
-      }
-    }
-
-    /*
-     * A server has sent segmentConsumed() message. The caller will save the 
segment if we return
-     * COMMIT_CONTINUE. We need to verify that it is the same server that we 
notified as the winner
-     * and the offset is the same as what is coming in with the commit. We can 
then move to
-     * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
-     *
-     * In case of discrepancy we move the state machine to ABORTED state so 
that this FSM is removed
-     * from the map, and things start over. In this case, we respond to the 
server with a 'hold' so
-     * that they re-transmit their segmentConsumed() message and start over.
-     */
-    public SegmentCompletionProtocol.Response segmentCommitStart(String 
instanceId, StreamPartitionMsgOffset offset) {
-      long now = _segmentCompletionManager.getCurrentTimeMs();
-      if (_excludedServerStateMap.contains(instanceId)) {
-        _logger.warn("Not accepting commit from {} since it had stoppd 
consuming", instanceId);
-        return SegmentCompletionProtocol.RESP_FAILED;
-      }
-      synchronized (this) {
-        _logger.info("Processing segmentCommitStart({}, {})", instanceId, 
offset);
-        switch (_state) {
-          case PARTIAL_CONSUMING:
-            return partialConsumingCommit(instanceId, offset, now);
-
-          case HOLDING:
-            return holdingCommit(instanceId, offset, now);
-
-          case COMMITTER_DECIDED:
-            return committerDecidedCommit(instanceId, offset, now);
-
-          case COMMITTER_NOTIFIED:
-            return committerNotifiedCommit(instanceId, offset, now);
-
-          case COMMITTER_UPLOADING:
-            return committerUploadingCommit(instanceId, offset, now);
-
-          case COMMITTING:
-            return committingCommit(instanceId, offset, now);
-
-          case COMMITTED:
-            return committedCommit(instanceId, offset);
-
-          case ABORTED:
-            return hold(instanceId, offset);
-
-          default:
-            return fail(instanceId, offset);
-        }
-      }
-    }
-
-    public SegmentCompletionProtocol.Response stoppedConsuming(String 
instanceId, StreamPartitionMsgOffset offset,
-        String reason) {
-      synchronized (this) {
-        _logger.info("Processing stoppedConsuming({}, {})", instanceId, 
offset);
-        _excludedServerStateMap.add(instanceId);
-        switch (_state) {
-          case PARTIAL_CONSUMING:
-            return partialConsumingStoppedConsuming(instanceId, offset, 
reason);
-
-          case HOLDING:
-            return holdingStoppedConsuming(instanceId, offset, reason);
-
-          case COMMITTER_DECIDED:
-            return committerDecidedStoppedConsuming(instanceId, offset, 
reason);
-
-          case COMMITTER_NOTIFIED:
-            return committerNotifiedStoppedConsuming(instanceId, offset, 
reason);
-
-          case COMMITTER_UPLOADING:
-            return committerUploadingStoppedConsuming(instanceId, offset, 
reason);
-
-          case COMMITTING:
-            return committingStoppedConsuming(instanceId, offset, reason);
-
-          case COMMITTED:
-            return committedStoppedConsuming(instanceId, offset, reason);
-
-          case ABORTED:
-            _logger.info("Ignoring StoppedConsuming message from {} in state 
{}", instanceId, _state);
-            return SegmentCompletionProtocol.RESP_PROCESSED;
-
-          default:
-            return fail(instanceId, offset);
-        }
-      }
-    }
-
-    public SegmentCompletionProtocol.Response extendBuildTime(final String 
instanceId,
-        final StreamPartitionMsgOffset offset, final int extTimeSec) {
-      final long now = _segmentCompletionManager.getCurrentTimeMs();
-      synchronized (this) {
-        _logger.info("Processing extendBuildTime({}, {}, {})", instanceId, 
offset, extTimeSec);
-        switch (_state) {
-          case PARTIAL_CONSUMING:
-          case HOLDING:
-          case COMMITTER_DECIDED:
-            return fail(instanceId, offset);
-          case COMMITTER_NOTIFIED:
-            return committerNotifiedExtendBuildTime(instanceId, offset, 
extTimeSec, now);
-          case COMMITTER_UPLOADING:
-          case COMMITTING:
-          case COMMITTED:
-          case ABORTED:
-          default:
-            return fail(instanceId, offset);
-        }
-      }
-    }
-
-    /*
-     * We can get this call only when the state is COMMITTER_UPLOADING. Also, 
the instanceId should be equal to
-     * the _winner.
-     */
-    public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
-        CommittingSegmentDescriptor committingSegmentDescriptor) {
-      String instanceId = reqParams.getInstanceId();
-      StreamPartitionMsgOffset offset =
-          
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
-      synchronized (this) {
-        if (_excludedServerStateMap.contains(instanceId)) {
-          _logger.warn("Not accepting commitEnd from {} since it had stoppd 
consuming", instanceId);
-          return abortAndReturnFailed();
-        }
-        _logger.info("Processing segmentCommitEnd({}, {})", instanceId, 
offset);
-        if (!_state.equals(State.COMMITTER_UPLOADING) || 
!instanceId.equals(_winner)
-            || offset.compareTo(_winningOffset) != 0) {
-          // State changed while we were out of sync. return a failed commit.
-          _logger.warn("State change during upload: state={} segment={} 
winner={} winningOffset={}", _state,
-              _segmentName.getSegmentName(), _winner, _winningOffset);
-          return abortAndReturnFailed();
-        }
-        SegmentCompletionProtocol.Response response = commitSegment(reqParams, 
committingSegmentDescriptor);
-        if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
-          return abortAndReturnFailed();
-        } else {
-          return response;
-        }
-      }
-    }
-
-    // Helper methods that log the current state and the response sent
-    private SegmentCompletionProtocol.Response fail(String instanceId, 
StreamPartitionMsgOffset offset) {
-      _logger.info("{}:FAIL for instance={} offset={}", _state, instanceId, 
offset);
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    private SegmentCompletionProtocol.Response commit(String instanceId, 
StreamPartitionMsgOffset offset) {
-      long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) / 
1000;
-      _logger
-          .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, 
instanceId, offset, allowedBuildTimeSec);
-      SegmentCompletionProtocol.Response.Params params =
-          new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
-              .withBuildTimeSeconds(allowedBuildTimeSec)
-              
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
-              .withControllerVipUrl(_controllerVipUrl);
-      return new SegmentCompletionProtocol.Response(params);
-    }
-
-    private SegmentCompletionProtocol.Response discard(String instanceId, 
StreamPartitionMsgOffset offset) {
-      _logger.warn("{}:DISCARD for instance={} offset={}", _state, instanceId, 
offset);
-      return SegmentCompletionProtocol.RESP_DISCARD;
-    }
-
-    private SegmentCompletionProtocol.Response keep(String instanceId, 
StreamPartitionMsgOffset offset) {
-      _logger.info("{}:KEEP for instance={} offset={}", _state, instanceId, 
offset);
-      return new SegmentCompletionProtocol.Response(
-          new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
-              
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
-    }
-
-    private SegmentCompletionProtocol.Response catchup(String instanceId, 
StreamPartitionMsgOffset offset) {
-      _logger.info("{}:CATCHUP for instance={} offset={}", _state, instanceId, 
offset);
-      return new SegmentCompletionProtocol.Response(
-          new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(_winningOffset.toString())
-              
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
-    }
-
-    private SegmentCompletionProtocol.Response hold(String instanceId, 
StreamPartitionMsgOffset offset) {
-      _logger.info("{}:HOLD for instance={} offset={}", _state, instanceId, 
offset);
-      return new SegmentCompletionProtocol.Response(new 
SegmentCompletionProtocol.Response.Params()
-          .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
-          .withStreamPartitionMsgOffset(offset.toString()));
-    }
-
-    private SegmentCompletionProtocol.Response abortAndReturnHold(long now, 
String instanceId,
-        StreamPartitionMsgOffset offset) {
-      _state = State.ABORTED;
-      _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_rawTableName, 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
-      return hold(instanceId, offset);
-    }
-
-    private SegmentCompletionProtocol.Response abortAndReturnFailed() {
-      _state = State.ABORTED;
-      _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_rawTableName, 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    private SegmentCompletionProtocol.Response 
abortIfTooLateAndReturnHold(long now, String instanceId,
-        StreamPartitionMsgOffset offset) {
-      if (now > _maxTimeAllowedToCommitMs) {
-        _logger
-            .warn("{}:Aborting FSM (too late) instance={} offset={} now={} 
start={}", _state, instanceId, offset, now,
-                _startTimeMs);
-        return abortAndReturnHold(now, instanceId, offset);
-      }
-      return null;
-    }
-
-    private int numReplicasToLookFor() {
-      return _numReplicas - _excludedServerStateMap.size();
-    }
-
-    private SegmentCompletionProtocol.Response partialConsumingConsumed(String 
instanceId,
-        StreamPartitionMsgOffset offset, long now, final String stopReason) {
-      // This is the first time we are getting segmentConsumed() for this 
segment.
-      // Some instance thinks we can close this segment, so go to HOLDING 
state, and process as normal.
-      // We will just be looking for less replicas.
-      _state = State.HOLDING;
-      return holdingConsumed(instanceId, offset, now, stopReason);
-    }
-
-    /*
-     * This is not a good state to get a commit message, but it is possible 
that the controller failed while in
-     * COMMITTER_NOTIFIED state, and the first message we got in the new 
controller was a stoppedConsuming
-     * message. As long as the committer is not the one who stopped consuming 
(which we have already checked before
-     * coming here), we will trust the server that this is a valid commit.
-     */
-    private SegmentCompletionProtocol.Response partialConsumingCommit(String 
instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      // Do the same as HOLDING__commit
-      return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, 
now);
-    }
-
-    private SegmentCompletionProtocol.Response 
partialConsumingStoppedConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, true);
-    }
-
-    /*
-     * If we have waited "enough", or we have all replicas reported, then we 
can pick a winner.
-     *
-     * Otherwise, we ask the server that is reporting to come back again later 
until one of these conditions hold.
-     *
-     * If we can pick a winner then we go to COMMITTER_DECIDED or 
COMMITTER_NOTIIFIED (if the instance
-     * in this call is the same as winner).
-     *
-     * If we can go to COMMITTER_NOTIFIED then we respond with a COMMIT 
message, otherwise with a HOLD message.
-     */
-    private SegmentCompletionProtocol.Response holdingConsumed(String 
instanceId, StreamPartitionMsgOffset offset,
-        long now, final String stopReason) {
-      SegmentCompletionProtocol.Response response;
-      // If we are past the max time to pick a winner, or we have heard from 
all replicas,
-      // we are ready to pick a winner.
-      if (isWinnerPicked(instanceId, now, stopReason)) {
-        if (_winner.equals(instanceId)) {
-          _logger.info("{}:Committer notified winner instance={} offset={}", 
_state, instanceId, offset);
-          response = commit(instanceId, offset);
-          _state = State.COMMITTER_NOTIFIED;
-        } else {
-          _logger.info("{}:Committer decided winner={} offset={}", _state, 
_winner, _winningOffset);
-          response = catchup(instanceId, offset);
-          _state = State.COMMITTER_DECIDED;
-        }
-      } else {
-        response = hold(instanceId, offset);
-      }
-      return response;
-    }
-
-    /*
-     * This not a good state to receive a commit message, but then it may be 
that the controller
-     * failed over while in the COMMITTER_NOTIFIED state...
-     */
-    private SegmentCompletionProtocol.Response holdingCommit(String 
instanceId, StreamPartitionMsgOffset offset,
-        long now) {
-      return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, 
now);
-    }
-
-    private SegmentCompletionProtocol.Response holdingStoppedConsuming(String 
instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, true);
-    }
-
-    /*
-     * We have already decided who the committer is, but have not let them 
know yet. If this is the committer that
-     * we decided, then respond back with COMMIT. Otherwise, if the offset is 
smaller, respond back with a CATCHUP.
-     * Otherwise, just have the server HOLD. Since the segment is not 
committed yet, we cannot ask them to KEEP or
-     * DISCARD etc. If the committer fails for any reason, we will need a new 
committer.
-     */
-    private SegmentCompletionProtocol.Response committerDecidedConsumed(String 
instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      if (offset.compareTo(_winningOffset) > 0) {
-        _logger.warn("{}:Aborting FSM (offset larger than winning) instance={} 
offset={} now={} winning={}", _state,
-            instanceId, offset, now, _winningOffset);
-        return abortAndReturnHold(now, instanceId, offset);
-      }
-      SegmentCompletionProtocol.Response response;
-      if (_winner.equals(instanceId)) {
-        if (_winningOffset.compareTo(offset) == 0) {
-          _logger.info("{}:Notifying winner instance={} offset={}", _state, 
instanceId, offset);
-          response = commit(instanceId, offset);
-          _state = State.COMMITTER_NOTIFIED;
-        } else {
-          // Winner coming back with a different offset.
-          _logger
-              .warn("{}:Winner coming back with different offset for 
instance={} offset={} prevWinnOffset={}", _state,
-                  instanceId, offset, _winningOffset);
-          response = abortAndReturnHold(now, instanceId, offset);
-        }
-      } else if (offset.compareTo(_winningOffset) == 0) {
-        // Wait until winner has posted the segment.
-        response = hold(instanceId, offset);
-      } else {
-        response = catchup(instanceId, offset);
-      }
-      if (now > _maxTimeToNotifyWinnerMs) {
-        // Winner never got back to us. Abort the completion protocol and 
start afresh.
-        // We can potentially optimize here to see if this instance has the 
highest so far, and re-elect them to
-        // be winner, but for now, we will abort it and restart
-        response = abortAndReturnHold(now, instanceId, offset);
-      }
-      return response;
-    }
-
-    /*
-     * We have already decided who the committer is, but have not let them 
know yet. So, we don't expect
-     * a commit() call here.
-     */
-    private SegmentCompletionProtocol.Response committerDecidedCommit(String 
instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      return processCommitWhileHoldingOrPartialConsuming(instanceId, offset, 
now);
-    }
-
-    private SegmentCompletionProtocol.Response 
committerDecidedStoppedConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, false);
-    }
-
-    /*
-     * We have notified the committer. If we get a consumed message from 
another server, we can ask them to
-     * catchup (if the offset is lower). If anything else, then we pretty much 
ask them to hold.
-     */
-    private SegmentCompletionProtocol.Response 
committerNotifiedConsumed(String instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      SegmentCompletionProtocol.Response response;
-      // We have already picked a winner and notified them but we have not 
heard from them yet.
-      // Common case here is that another server is coming back to us with its 
offset. We either respond back with
-      // HOLD or CATCHUP.
-      // If the winner is coming back again, then we have some more conditions 
to look at.
-      response = abortIfTooLateAndReturnHold(now, instanceId, offset);
-      if (response != null) {
-        return response;
-      }
-      if (instanceId.equals(_winner)) {
-        // Winner is coming back to after holding. Somehow they never heard us 
return COMMIT.
-        // Allow them to be winner again, since we are still within time to 
pick a winner.
-        if (offset.compareTo(_winningOffset) == 0) {
-          response = commit(instanceId, offset);
-        } else {
-          // Something seriously wrong. Abort the FSM
-          response = discard(instanceId, offset);
-          _logger.warn("{}:Aborting for instance={} offset={}", _state, 
instanceId, offset);
-          _state = State.ABORTED;
-        }
-      } else {
-        // Common case: A different instance is reporting.
-        if (offset.compareTo(_winningOffset) == 0) {
-          // Wait until winner has posted the segment before asking this 
server to KEEP the segment.
-          response = hold(instanceId, offset);
-        } else if (offset.compareTo(_winningOffset) < 0) {
-          response = catchup(instanceId, offset);
-        } else {
-          // We have not yet committed, so ask the new responder to hold. They 
may be the new leader in case the
-          // committer fails.
-          response = hold(instanceId, offset);
-        }
-      }
-      return response;
-    }
-
-    /*
-     * We have notified the committer. If we get a consumed message from 
another server, we can ask them to
-     * catchup (if the offset is lower). If anything else, then we pretty much 
ask them to hold.
-     */
-    private SegmentCompletionProtocol.Response committerNotifiedCommit(String 
instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      SegmentCompletionProtocol.Response response = null;
-      response = checkBadCommitRequest(instanceId, offset, now);
-      if (response != null) {
-        return response;
-      }
-      _logger.info("{}:Uploading for instance={} offset={}", _state, 
instanceId, offset);
-      _state = State.COMMITTER_UPLOADING;
-      long commitTimeMs = now - _startTimeMs;
-      if (commitTimeMs > _initialCommitTimeMs) {
-        // We assume that the commit time holds for all partitions. It is 
possible, though, that one partition
-        // commits at a lower time than another partition, and the two 
partitions are going simultaneously,
-        // and we may not get the maximum value all the time.
-        
_segmentCompletionManager._commitTimeMap.put(_segmentName.getTableName(), 
commitTimeMs);
-      }
-      return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
-    }
-
-    private SegmentCompletionProtocol.Response 
committerNotifiedStoppedConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, false);
-    }
-
-    private SegmentCompletionProtocol.Response 
committerNotifiedExtendBuildTime(String instanceId,
-        StreamPartitionMsgOffset offset, int extTimeSec, long now) {
-      SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
-      if (response == null) {
-        long maxTimeAllowedToCommitMs = now + extTimeSec * 1000;
-        if (maxTimeAllowedToCommitMs > _startTimeMs + 
MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000) {
-          _logger.warn("Not accepting lease extension from {} startTime={} 
requestedTime={}", instanceId, _startTimeMs,
-              maxTimeAllowedToCommitMs);
-          return abortAndReturnFailed();
-        }
-        _maxTimeAllowedToCommitMs = maxTimeAllowedToCommitMs;
-        response = SegmentCompletionProtocol.RESP_PROCESSED;
-      }
-      return response;
-    }
-
-    private SegmentCompletionProtocol.Response 
committerUploadingConsumed(String instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      return processConsumedAfterCommitStart(instanceId, offset, now);
-    }
-
-    private SegmentCompletionProtocol.Response committerUploadingCommit(String 
instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      return processCommitWhileUploading(instanceId, offset, now);
-    }
-
-    private SegmentCompletionProtocol.Response 
committerUploadingStoppedConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, false);
-    }
-
-    private SegmentCompletionProtocol.Response committingConsumed(String 
instanceId, StreamPartitionMsgOffset offset,
-        long now) {
-      return processConsumedAfterCommitStart(instanceId, offset, now);
-    }
-
-    private SegmentCompletionProtocol.Response committingCommit(String 
instanceId, StreamPartitionMsgOffset offset,
-        long now) {
-      return processCommitWhileUploading(instanceId, offset, now);
-    }
-
-    private SegmentCompletionProtocol.Response 
committingStoppedConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, false);
-    }
-
-    private SegmentCompletionProtocol.Response committedConsumed(String 
instanceId, StreamPartitionMsgOffset offset) {
-      // Server reporting an offset on an already completed segment. Depending 
on the offset, either KEEP or DISCARD.
-      SegmentCompletionProtocol.Response response;
-      if (offset.compareTo(_winningOffset) == 0) {
-        response = keep(instanceId, offset);
-      } else {
-        // Return DISCARD. It is hard to say how long the server will take to 
complete things.
-        response = discard(instanceId, offset);
-      }
-      return response;
-    }
-
-    private SegmentCompletionProtocol.Response committedCommit(String 
instanceId, StreamPartitionMsgOffset offset) {
-      if (offset.compareTo(_winningOffset) == 0) {
-        return keep(instanceId, offset);
-      }
-      return discard(instanceId, offset);
-    }
-
-    private SegmentCompletionProtocol.Response 
committedStoppedConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, String reason) {
-      return processStoppedConsuming(instanceId, offset, reason, false);
-    }
-
-    private SegmentCompletionProtocol.Response processStoppedConsuming(String 
instanceId,
-        StreamPartitionMsgOffset offset, String reason, boolean createNew) {
-      _logger
-          .info("Instance {} stopped consuming segment {} at offset {}, state 
{}, createNew: {}, reason:{}", instanceId,
-              _segmentName, offset, _state, createNew, reason);
-      try {
-        _segmentManager.segmentStoppedConsuming(_segmentName, instanceId);
-      } catch (Exception e) {
-        _logger.error("Caught exception while processing stopped CONSUMING 
segment: {} on instance: {}",
-            _segmentName.getSegmentName(), instanceId, e);
-        return SegmentCompletionProtocol.RESP_FAILED;
-      }
-      return SegmentCompletionProtocol.RESP_PROCESSED;
-    }
-
-    // A common method when the state is > COMMITTER_NOTIFIED.
-    private SegmentCompletionProtocol.Response 
processConsumedAfterCommitStart(String instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      SegmentCompletionProtocol.Response response;
-      // We have already picked a winner, and may or many not have heard from 
them.
-      // Common case here is that another server is coming back to us with its 
offset. We either respond back with
-      // HOLD or CATCHUP.
-      // It may be that we never heard from the committer, or the committer is 
taking too long to commit the segment.
-      // In that case, we abort the FSM and start afresh (i.e, return HOLD).
-      // If the winner is coming back again, then we have some more conditions 
to look at.
-      response = abortIfTooLateAndReturnHold(now, instanceId, offset);
-      if (response != null) {
-        return response;
-      }
-      if (instanceId.equals(_winner)) {
-        // The winner is coming back to report its offset. Take a decision 
based on the offset reported, and whether we
-        // already notified them
-        // Winner is supposedly already in the commit call. Something wrong.
-        LOGGER.warn(
-            "{}:Aborting FSM because winner is reporting a segment while it is 
also committing instance={} offset={} "
-                + "now={}", _state, instanceId, offset, now);
-        // Ask them to hold, just in case the committer fails for some reason..
-        return abortAndReturnHold(now, instanceId, offset);
-      } else {
-        // Common case: A different instance is reporting.
-        if (offset.compareTo(_winningOffset) == 0) {
-          // Wait until winner has posted the segment before asking this 
server to KEEP the segment.
-          response = hold(instanceId, offset);
-        } else if (offset.compareTo(_winningOffset) < 0) {
-          response = catchup(instanceId, offset);
-        } else {
-          // We have not yet committed, so ask the new responder to hold. They 
may be the new leader in case the
-          // committer fails.
-          response = hold(instanceId, offset);
-        }
-      }
-      return response;
-    }
-
-    private SegmentCompletionProtocol.Response 
commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
-        CommittingSegmentDescriptor committingSegmentDescriptor) {
-      String instanceId = reqParams.getInstanceId();
-      StreamPartitionMsgOffset offset =
-          
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
-      if (!_state.equals(State.COMMITTER_UPLOADING)) {
-        // State changed while we were out of sync. return a failed commit.
-        _logger.warn("State change during upload: state={} segment={} 
winner={} winningOffset={}", _state,
-            _segmentName.getSegmentName(), _winner, _winningOffset);
-        return SegmentCompletionProtocol.RESP_FAILED;
-      }
-      _logger.info("Committing segment {} at offset {} winner {}", 
_segmentName.getSegmentName(), offset, instanceId);
-      _state = State.COMMITTING;
-      // The segment is uploaded to a unique file name indicated by 
segmentLocation, so we need to move the segment file
-      // to its permanent location first before committing the metadata. The 
committingSegmentDescriptor is then updated
-      // with the permanent segment location to be saved in metadata store.
-      try {
-        _segmentManager.commitSegmentFile(_realtimeTableName, 
committingSegmentDescriptor);
-      } catch (Exception e) {
-        _logger.error("Caught exception while committing segment file for 
segment: {}", _segmentName.getSegmentName(),
-            e);
-        return SegmentCompletionProtocol.RESP_FAILED;
-      }
-      try {
-        // Convert to a controller uri if the segment location uses local file 
scheme.
-        if (CommonConstants.Segment.LOCAL_SEGMENT_SCHEME
-            
.equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme()))
 {
-          committingSegmentDescriptor.setSegmentLocation(URIUtils
-              .constructDownloadUrl(_controllerVipUrl, 
TableNameBuilder.extractRawTableName(_realtimeTableName),
-                  _segmentName.getSegmentName()));
-        }
-        _segmentManager.commitSegmentMetadata(_realtimeTableName, 
committingSegmentDescriptor);
-      } catch (Exception e) {
-        _logger
-            .error("Caught exception while committing segment metadata for 
segment: {}", _segmentName.getSegmentName(),
-                e);
-        return SegmentCompletionProtocol.RESP_FAILED;
-      }
-
-      _state = State.COMMITTED;
-      _logger.info("Committed segment {} at offset {} winner {}", 
_segmentName.getSegmentName(), offset, instanceId);
-      return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
-    }
-
-    private SegmentCompletionProtocol.Response 
processCommitWhileUploading(String instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
-      SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
-      if (response != null) {
-        return response;
-      }
-      // Another committer (or same) came in while one was uploading. Ask them 
to hold in case this one fails.
-      return new SegmentCompletionProtocol.Response(
-          new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
-              
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
-    }
-
-    private SegmentCompletionProtocol.Response checkBadCommitRequest(String 
instanceId, StreamPartitionMsgOffset offset,
-        long now) {
-      SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
-      if (response != null) {
-        return response;
-      } else if (instanceId.equals(_winner) && 
offset.compareTo(_winningOffset) != 0) {
-        // Hmm. Committer has been notified, but either a different one is 
committing, or offset is different
-        _logger.warn("{}:Aborting FSM (bad commit req) instance={} offset={} 
now={} winning={}", _state, instanceId,
-            offset, now, _winningOffset);
-        return abortAndReturnHold(now, instanceId, offset);
-      }
-      return null;
-    }
-
-    private SegmentCompletionProtocol.Response 
processCommitWhileHoldingOrPartialConsuming(String instanceId,
-        StreamPartitionMsgOffset offset, long now) {
-      _logger.info("Processing segmentCommit({}, {})", instanceId, offset);
-      SegmentCompletionProtocol.Response response = 
abortIfTooLateAndReturnHold(now, instanceId, offset);
-      if (response != null) {
-        return response;
-      }
-      // We cannot get a commit if we are in this state, so ask them to hold. 
Maybe we are starting after a failover.
-      // The server will re-send the segmentConsumed message.
-      return hold(instanceId, offset);
-    }
-
-    /**
-     * Pick a winner if we can, preferring the instance that we are handling 
right now,
-     *
-     * We accept the first server to report an offset as long as the server 
stopped consumption
-     * due to row limit. The premise is that other servers will also stop at 
row limit, and there
-     * is no need to wait for them to report an offset in order to decide on a 
winner. The state machine takes care
-     * of the cases where other servers may report different offsets (just in 
case).
-     *
-     * If the above condition is not satisfied (i.e. either this is not the 
first server, or it did not reach
-     * row limit), then we can pick a winner only if it is too late to pick a 
winner, or we have heard from all
-     * servers.
-     *
-     * Otherwise, we wait to hear from more servers.
-     *
-     * @param preferredInstance The instance that is reporting in this thread.
-     * @param now current time
-     * @param stopReason reason reported by instance for stopping consumption.
-     * @return true if winner picked, false otherwise.
-     */
-    private boolean isWinnerPicked(String preferredInstance, long now, final 
String stopReason) {
-      if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
-          || 
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
-          && _commitStateMap.size() == 1) {
-        _winner = preferredInstance;
-        _winningOffset = _commitStateMap.get(preferredInstance);
-        return true;
-      } else if (now > _maxTimeToPickWinnerMs || _commitStateMap.size() == 
numReplicasToLookFor()) {
-        _logger.info("{}:Picking winner time={} size={}", _state, now - 
_startTimeMs, _commitStateMap.size());
-        StreamPartitionMsgOffset maxOffsetSoFar = null;
-        String winnerSoFar = null;
-        for (Map.Entry<String, StreamPartitionMsgOffset> entry : 
_commitStateMap.entrySet()) {
-          if (maxOffsetSoFar == null || 
entry.getValue().compareTo(maxOffsetSoFar) > 0) {
-            maxOffsetSoFar = entry.getValue();
-            winnerSoFar = entry.getKey();
-          }
-        }
-        _winningOffset = maxOffsetSoFar;
-        if (_commitStateMap.get(preferredInstance).compareTo(maxOffsetSoFar) 
== 0) {
-          winnerSoFar = preferredInstance;
-        }
-        _winner = winnerSoFar;
-        return true;
-      }
-      return false;
-    }
-  }
-
   @VisibleForTesting
   protected boolean isLeader(String tableName) {
     return _leadControllerManager.isLeaderForTable(tableName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2595d8debf..dd2d0ceee0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
@@ -1324,7 +1325,8 @@ public class SegmentCompletionTest {
         boolean isLeader, ControllerMetrics controllerMetrics) {
       super(helixManager, segmentManager, controllerMetrics,
           new LeadControllerManager("localhost_1234", helixManager, 
controllerMetrics),
-          SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
+          SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds(),
+          new SegmentCompletionConfig(new PinotConfiguration()));
       _isLeader = isLeader;
     }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 845dfd5ffa..588aaeb949 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -130,6 +130,11 @@ public class StreamConfigProperties {
    */
   public static final String SERVER_UPLOAD_TO_DEEPSTORE = 
"realtime.segment.serverUploadToDeepStore";
 
+  /**
+   * Config used to indicate which segment commit protocol implementation 
controller should use for this table
+   */
+  public static final String SEGMENT_COMPLETION_FSM_SCHEME = 
"segment.completion.fsm.scheme";
+
   /**
    * Helper method to create a stream specific property
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1a6985084b..641fa4ef89 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -918,6 +918,8 @@ public class CommonConstants {
     //Set to true to load all services tagged and compiled with 
hk2-metadata-generator. Default to False
     public static final String CONTROLLER_SERVICE_AUTO_DISCOVERY = 
"pinot.controller.service.auto.discovery";
     public static final String CONFIG_OF_LOGGER_ROOT_DIR = 
"pinot.controller.logger.root.dir";
+    public static final String PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION =
+        "pinot.controller.segment.completion";
   }
 
   public static class Minion {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to