Jackie-Jiang commented on code in PR #14741:
URL: https://github.com/apache/pinot/pull/14741#discussion_r1903693007


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Optional;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+
+
+public class PauselessConsumptionUtils {
+  public static final String PAUSELESS_CONSUMPTION_ENABLED = 
"pauselessConsumptionEnabled";
+
+  private PauselessConsumptionUtils() {
+    // Private constructor to prevent instantiation of utility class
+  }
+
+  /**
+   * Checks if pauseless consumption is enabled for the given table 
configuration.
+   * Returns false if any configuration component is missing or if the flag is 
not set to true.
+   *
+   * @param tableConfig The table configuration to check. Must not be null.
+   * @return true if pauseless consumption is explicitly enabled, false 
otherwise
+   * @throws NullPointerException if tableConfig is null
+   */
+  public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {

Review Comment:
   Since this config is newly added, we don't need to add it in 2 places. Let's 
just keep it within the `StreamIngestionConfig`



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Optional;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+
+
+public class PauselessConsumptionUtils {
+  public static final String PAUSELESS_CONSUMPTION_ENABLED = 
"pauselessConsumptionEnabled";

Review Comment:
   Seems unused



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java:
##########
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
+import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+public class PauselessSegmentCompletionFSM extends 
BlockingSegmentCompletionFSM {
+  public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager 
segmentManager,
+      SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName,
+      SegmentZKMetadata segmentMetadata) {
+    super(segmentManager, segmentCompletionManager, segmentName, 
segmentMetadata);
+    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING) {
+      StreamPartitionMsgOffsetFactory factory =
+          
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
+      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
+      _state = BlockingSegmentCompletionFSMState.COMMITTED;
+      _winningOffset = endOffset;
+      _winner = "UNKNOWN";
+    }
+  }
+
+  /*
+   * A server has sent segmentConsumed() message. The caller will save the 
segment if we return
+   * COMMIT_CONTINUE. We need to verify that it is the same server that we 
notified as the winner
+   * and the offset is the same as what is coming in with the commit. We can 
then move to
+   * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
+   *
+   * In case of discrepancy we move the state machine to ABORTED state so that 
this FSM is removed
+   * from the map, and things start over. In this case, we respond to the 
server with a 'hold' so
+   * that they re-transmit their segmentConsumed() message and start over.
+   */
+  @Override

Review Comment:
   The overridden methods in this class are almost identical to the original 
ones. Suggest isolating the modified part as a separate method to improve the 
readability.
   For this method, can we override `committerNotifiedCommit()`?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -544,6 +557,55 @@ private void doAddConsumingSegment(String segmentName)
     _logger.info("Added new CONSUMING segment: {}", segmentName);
   }
 
+  @Override
+  public File downloadSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    if (!PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {

Review Comment:
   Let's rely on segment status instead of table config to decide how to 
download the segment. There is no guarantee that the table config is in sync 
with the segment (e.g. user changes table config in the middle)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java:
##########
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
+import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+public class PauselessSegmentCompletionFSM extends 
BlockingSegmentCompletionFSM {
+  public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager 
segmentManager,
+      SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName,
+      SegmentZKMetadata segmentMetadata) {
+    super(segmentManager, segmentCompletionManager, segmentName, 
segmentMetadata);
+    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING) {
+      StreamPartitionMsgOffsetFactory factory =
+          
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
+      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
+      _state = BlockingSegmentCompletionFSMState.COMMITTED;
+      _winningOffset = endOffset;
+      _winner = "UNKNOWN";
+    }
+  }
+
+  /*
+   * A server has sent segmentConsumed() message. The caller will save the 
segment if we return
+   * COMMIT_CONTINUE. We need to verify that it is the same server that we 
notified as the winner
+   * and the offset is the same as what is coming in with the commit. We can 
then move to
+   * COMMITTER_UPLOADING and wait for the segmentCommitEnd() call.
+   *
+   * In case of discrepancy we move the state machine to ABORTED state so that 
this FSM is removed
+   * from the map, and things start over. In this case, we respond to the 
server with a 'hold' so
+   * that they re-transmit their segmentConsumed() message and start over.
+   */
+  @Override
+  public SegmentCompletionProtocol.Response 
segmentCommitStart(SegmentCompletionProtocol.Request.Params reqParams) {
+    String instanceId = reqParams.getInstanceId();
+    StreamPartitionMsgOffset offset = 
_streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
+    long now = _segmentCompletionManager.getCurrentTimeMs();
+    if (_excludedServerStateMap.contains(instanceId)) {
+      _logger.warn("Not accepting commit from {} since it had stoppd 
consuming", instanceId);
+      return SegmentCompletionProtocol.RESP_FAILED;
+    }
+    synchronized (this) {
+      _logger.info("Processing segmentCommitStart({}, {})", instanceId, 
offset);
+      switch (_state) {
+        case PARTIAL_CONSUMING:
+          return partialConsumingCommit(instanceId, offset, now);
+
+        case HOLDING:
+          return holdingCommit(instanceId, offset, now);
+
+        case COMMITTER_DECIDED:
+          return committerDecidedCommit(instanceId, offset, now);
+
+        case COMMITTER_NOTIFIED:
+          SegmentCompletionProtocol.Response response = 
committerNotifiedCommit(instanceId, offset, now);
+          try {
+            if (response == SegmentCompletionProtocol.RESP_COMMIT_CONTINUE) {
+              CommittingSegmentDescriptor committingSegmentDescriptor =
+                  
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+              LOGGER.info(
+                  "Starting to commit changes to ZK and ideal state for the 
segment:{} as the leader has been selected",
+                  _segmentName);
+              _segmentManager.commitSegmentStartMetadata(
+                  
TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()),
+                  committingSegmentDescriptor);
+            }
+          } catch (Exception e) {
+            // this aims to handle the failures during 
commitSegmentStartMetadata
+            // we abort the state machine to allow commit protocol to start 
from the beginning
+            // the server would then retry the commit protocol from the start
+            return abortAndReturnFailed();

Review Comment:
   Do we need to explicitly catch and handle exception here? Seems we are not 
catching exception in other places



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java:
##########
@@ -44,6 +44,8 @@ public class SegmentZKMetadata implements ZKMetadata {
   private boolean _endTimeMsCached;
   private long _endTimeMs;
 
+  public static final long DEFAULT_CRC_VALUE = -1;

Review Comment:
   Seems this is introduced to identify the committing segment. Suggest 
checking the `status()` instead



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PauselessSegmentCommitter.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.slf4j.Logger;
+
+
+public class PauselessSegmentCommitter extends SplitSegmentCommitter {
+  public PauselessSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
+      SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader,
+      @Nullable String peerDownloadScheme) {
+    super(segmentLogger, protocolHandler, params, segmentUploader, 
peerDownloadScheme);
+  }
+
+  public PauselessSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,

Review Comment:
   (nit) This constructor is not needed



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -544,6 +557,55 @@ private void doAddConsumingSegment(String segmentName)
     _logger.info("Added new CONSUMING segment: {}", segmentName);
   }
 
+  @Override
+  public File downloadSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    if (!PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
+      return super.downloadSegment(zkMetadata);
+    }
+
+    final long startTime = System.currentTimeMillis();
+
+    while (System.currentTimeMillis() - startTime < TIMEOUT_MS) {
+      // ZK Metadata may change during segment download process; fetch it on 
every retry.
+      zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
+
+      if (zkMetadata.getDownloadUrl() != null) {
+        // The downloadSegment() will throw an exception in case there are 
some genuine issues.
+        // We don't want to retry in those scenarios and will throw an 
exception
+        return downloadSegmentFromDeepStore(zkMetadata);
+      }
+
+      if (_peerDownloadScheme != null) {
+        _logger.info("Peer download is enabled for the segment: {}", 
zkMetadata.getSegmentName());
+        try {
+          return downloadSegmentFromPeers(zkMetadata);

Review Comment:
   We should check external view to find the `ONLINE` replica (i.e. call 
`PeerServerSegmentFinder.getOnlineServersFromExternalView()`), then start 
downloading only if we can find one server online. This way we can prevent the 
exponential backoff within the method



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -119,6 +121,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(5);
 
+  public static final long TIMEOUT_MINUTES = 5;
+  public static final long TIMEOUT_MS = TIMEOUT_MINUTES * 60 * 1000;
+  public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep 
interval

Review Comment:
   +1



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -845,6 +846,23 @@ public void run() {
               //       CONSUMING -> ONLINE state transition.
               segmentLock.lockInterruptibly();
               try {
+                // For tables with pauseless consumption enabled we want to 
start the commit protocol that
+                // 1. Updates the endOffset in the ZK metadata for the 
committing segment
+                // 2. Creates ZK metadata for the new consuming segment
+                // 3. Updates the IdealState for committing and new consuming 
segment to ONLINE and CONSUMING
+                // respectively.
+                // See design doc for the new commit protocol:
+                // 
https://docs.google.com/document/d/1d-xttk7sXFIOqfyZvYw5W_KeGS6Ztmi8eYevBcCrT_c

Review Comment:
   Let's link the PR instead of the design doc



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -907,6 +925,22 @@ public void run() {
     }
   }
 
+  boolean startSegmentCommit() {

Review Comment:
   ```suggestion
     private boolean startSegmentCommit() {
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -544,6 +557,55 @@ private void doAddConsumingSegment(String segmentName)
     _logger.info("Added new CONSUMING segment: {}", segmentName);
   }
 
+  @Override
+  public File downloadSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    if (!PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
+      return super.downloadSegment(zkMetadata);
+    }
+
+    final long startTime = System.currentTimeMillis();
+
+    while (System.currentTimeMillis() - startTime < TIMEOUT_MS) {
+      // ZK Metadata may change during segment download process; fetch it on 
every retry.
+      zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
+
+      if (zkMetadata.getDownloadUrl() != null) {
+        // The downloadSegment() will throw an exception in case there are 
some genuine issues.
+        // We don't want to retry in those scenarios and will throw an 
exception
+        return downloadSegmentFromDeepStore(zkMetadata);
+      }
+
+      if (_peerDownloadScheme != null) {
+        _logger.info("Peer download is enabled for the segment: {}", 
zkMetadata.getSegmentName());
+        try {
+          return downloadSegmentFromPeers(zkMetadata);
+        } catch (Exception e) {
+          // TODO :in this case we just retry as some of the other servers 
might be trying to build the
+          //  segment
+          _logger.warn("Could not download segment: {} from peer", 
zkMetadata.getSegmentName(), e);
+        }
+      }
+
+      long timeElapsed = System.currentTimeMillis() - startTime;
+      long timeRemaining = TIMEOUT_MS - timeElapsed;
+
+      if (timeRemaining <= 0) {
+        break;
+      }
+
+      _logger.info("Sleeping for 30 seconds as the segment url is missing. 
Time remaining: {} minutes",
+          Math.round(timeRemaining / 60000.0));
+
+      // Sleep for the shorter of our normal interval or remaining time
+      Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining));
+    }
+
+// If we exit the loop without returning, throw an exception

Review Comment:
   (nit) format



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java:
##########
@@ -459,4 +460,12 @@ public Set<String> getAllReferencedColumns() {
     }
     return allColumns;
   }
+
+  public boolean isPauselessConsumptionEnabled() {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to