This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eb4d0e1b95 Registering pauseless FSM by default and picking this FSM 
for pauseless enabled tables (#15241)
eb4d0e1b95 is described below

commit eb4d0e1b95e8f49303ffdb73062ee06c905f4125
Author: 9aman <[email protected]>
AuthorDate: Mon Mar 17 12:56:30 2025 +0530

    Registering pauseless FSM by default and picking this FSM for pauseless 
enabled tables (#15241)
    
    * Registering pauseless FSM by default and picking this FSM for pauseless 
enabled tables
    
    * changing name of the fsm to ensure backward compatibility
    
    * Fixed following
    - The getDefaultFsmScheme was returning class name instead of the scheme
    - re-registering the same scheme should just warn instead of impacting 
controller restarts
    
    Added tests for above
    
    * Fixing a small nit comment for fetching FSM class names in the unit tests
---
 .../core/realtime/SegmentCompletionConfig.java     |  13 +-
 .../core/realtime/SegmentCompletionFSMFactory.java |  11 +-
 .../core/realtime/SegmentCompletionManager.java    |   4 +-
 .../core/realtime/SegmentCompletionConfigTest.java |  50 ++++++++
 .../realtime/SegmentCompletionFSMFactoryTest.java  | 136 +++++++++++++++++++++
 .../tests/BasePauselessRealtimeIngestionTest.java  |   8 --
 ...ltimeIngestionCommitEndMetadataFailureTest.java |  22 ++++
 ...sRealtimeIngestionSegmentCommitFailureTest.java |   5 -
 8 files changed, 227 insertions(+), 22 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
index c5d0ece4c6..aecdab227d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
@@ -29,9 +29,11 @@ public class SegmentCompletionConfig {
       CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION 
+ ".fsm.scheme.";
   public static final String DEFAULT_FSM_SCHEME_KEY =
       CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION 
+ ".fsm.scheme.default";
+  public static final String DEFAULT_PAUSELESS_FSM_SCHEME_KEY =
+      CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION 
+ ".fsm.scheme.pauseless";
   public static final String DEFAULT_FSM_SCHEME = "default";
+  public static final String DEFAULT_PAUSELESS_FSM_SCHEME = "pauseless";
   private final Map<String, String> _fsmSchemes = new HashMap<>();
-  private final String _defaultFsmScheme;
 
   public SegmentCompletionConfig(PinotConfiguration configuration) {
     // Parse properties to extract FSM schemes
@@ -43,9 +45,6 @@ public class SegmentCompletionConfig {
         _fsmSchemes.put(scheme, className);
       }
     }
-
-    // Get the default FSM scheme
-    _defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY, 
DEFAULT_FSM_SCHEME);
   }
 
   public Map<String, String> getFsmSchemes() {
@@ -53,6 +52,10 @@ public class SegmentCompletionConfig {
   }
 
   public String getDefaultFsmScheme() {
-    return _defaultFsmScheme;
+    return DEFAULT_FSM_SCHEME;
+  }
+
+  public String getDefaultPauselessFsmScheme() {
+    return DEFAULT_PAUSELESS_FSM_SCHEME;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
index 5ac238d626..196e928cd3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
@@ -34,9 +34,10 @@ public class SegmentCompletionFSMFactory {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
   private static final Map<String, Class<? extends SegmentCompletionFSM>> 
FSM_CLASS_MAP = new HashMap<>();
 
-  // Static block to register the default FSM
+  // Static block to register the default FSM and pauseless FSM
   static {
     register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME, 
BlockingSegmentCompletionFSM.class);
+    register(SegmentCompletionConfig.DEFAULT_PAUSELESS_FSM_SCHEME, 
PauselessSegmentCompletionFSM.class);
   }
 
   /**
@@ -48,8 +49,12 @@ public class SegmentCompletionFSMFactory {
   public static void register(String scheme, Class<? extends 
SegmentCompletionFSM> fsmClass) {
     Preconditions.checkNotNull(scheme, "Scheme cannot be null");
     Preconditions.checkNotNull(fsmClass, "FSM Class cannot be null");
-    Preconditions.checkState(FSM_CLASS_MAP.put(scheme, fsmClass) == null,
-        "FSM class already registered for scheme: " + scheme);
+
+    Class<? extends SegmentCompletionFSM> previousFsmClass = 
FSM_CLASS_MAP.put(scheme, fsmClass);
+    if (previousFsmClass != null) {
+      LOGGER.warn("Replacing existing FSM: {} for scheme: {} with: {}",
+          previousFsmClass, scheme, fsmClass);
+    }
     LOGGER.info("Registered SegmentCompletionFSM class {} for scheme {}", 
fsmClass, scheme);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 3dbd209745..ffdbd44e05 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -28,6 +28,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.controller.LeadControllerManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -138,7 +139,8 @@ public class SegmentCompletionManager {
     }
 
     if (factoryName == null) {
-      factoryName = _segmentCompletionConfig.getDefaultFsmScheme();
+      factoryName = PauselessConsumptionUtils.isPauselessEnabled(tableConfig)
+          ? _segmentCompletionConfig.getDefaultPauselessFsmScheme() : 
_segmentCompletionConfig.getDefaultFsmScheme();
     }
 
     
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfigTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfigTest.java
new file mode 100644
index 0000000000..f844cacef2
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfigTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig.*;
+
+
+public class SegmentCompletionConfigTest {
+
+  private static final String DEFAULT_FSM_CLASS = 
BlockingSegmentCompletionFSM.class.getName();
+  private static final String PAUSELESS_FSM_CLASS = 
PauselessSegmentCompletionFSM.class.getName();
+  private static final String CUSTOM_FSM_CLASS =
+      
"org.apache.pinot.controller.helix.core.realtime.CustomSegmentCompletionFSM";
+
+  @Test
+  public void testGetFsmSchemes() {
+    String customSchemeKey = FSM_SCHEME + "custom";
+    PinotConfiguration pinotConfiguration = new PinotConfiguration(
+        Map.of(DEFAULT_PAUSELESS_FSM_SCHEME_KEY, PAUSELESS_FSM_CLASS, 
DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_CLASS,
+            customSchemeKey, CUSTOM_FSM_CLASS));
+    SegmentCompletionConfig segmentCompletionConfig = new 
SegmentCompletionConfig(pinotConfiguration);
+    Map<String, String> expectedFsmSchemes =
+        Map.of(DEFAULT_FSM_SCHEME, DEFAULT_FSM_CLASS, 
DEFAULT_PAUSELESS_FSM_SCHEME, PAUSELESS_FSM_CLASS, "custom",
+            CUSTOM_FSM_CLASS);
+    Assert.assertEquals(expectedFsmSchemes, 
segmentCompletionConfig.getFsmSchemes());
+    Assert.assertEquals(segmentCompletionConfig.getDefaultFsmScheme(), 
DEFAULT_FSM_SCHEME);
+    
Assert.assertEquals(segmentCompletionConfig.getDefaultPauselessFsmScheme(), 
DEFAULT_PAUSELESS_FSM_SCHEME);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
new file mode 100644
index 0000000000..1eda2de31d
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig.DEFAULT_FSM_SCHEME_KEY;
+import static 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig.DEFAULT_PAUSELESS_FSM_SCHEME_KEY;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class SegmentCompletionFSMFactoryTest {
+  private static final String DEFAULT_FSM_CLASS = 
BlockingSegmentCompletionFSM.class.getName();
+  private static final String LLC_SEGMENT_NAME = 
"tableName__0__0__20250228T1903Z";
+
+  @AfterMethod
+  public void tearDown() {
+    // Clear the factory's static state between tests
+    SegmentCompletionFSMFactory.shutdown();
+
+    // Re-register the default mappings
+    
SegmentCompletionFSMFactory.register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME,
+        BlockingSegmentCompletionFSM.class);
+    
SegmentCompletionFSMFactory.register(SegmentCompletionConfig.DEFAULT_PAUSELESS_FSM_SCHEME,
+        PauselessSegmentCompletionFSM.class);
+  }
+
+  @Test
+  public void testCreateFSMWithDefaultFsm() {
+    PinotConfiguration pinotConfiguration = new 
PinotConfiguration(Collections.emptyMap());
+    SegmentCompletionConfig segmentCompletionConfig = new 
SegmentCompletionConfig(pinotConfiguration);
+    SegmentCompletionFSMFactory.init(segmentCompletionConfig);
+
+    // mock different objects that are required for creating FSM
+    SegmentCompletionManager segmentCompletionManager = 
mock(SegmentCompletionManager.class);
+    LongMsgOffsetFactory longMsgOffsetFactory = 
mock(LongMsgOffsetFactory.class);
+    
when(segmentCompletionManager.getCurrentTimeMs()).thenReturn(System.currentTimeMillis());
+    
when(segmentCompletionManager.getStreamPartitionMsgOffsetFactory(any())).thenReturn(longMsgOffsetFactory);
+    when(longMsgOffsetFactory.create(anyString())).thenReturn(new 
LongMsgOffset(100));
+
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata.getNumReplicas()).thenReturn(3);
+    when(segmentZKMetadata.getEndOffset()).thenReturn("100");
+
+    PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
+    
when(pinotLLCRealtimeSegmentManager.getCommitTimeoutMS(anyString())).thenReturn(System.currentTimeMillis());
+
+    // Asset that the default fsm (BlockingSegmentCompletionFSM) is picked 
when the "default" scheme is passed
+    Assert.assertTrue(
+        
SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultFsmScheme(),
 segmentCompletionManager,
+            pinotLLCRealtimeSegmentManager, new 
LLCSegmentName(LLC_SEGMENT_NAME),
+            segmentZKMetadata) instanceof BlockingSegmentCompletionFSM);
+
+    // Asset that the default pauseless fsm (PauselessSegmentCompletionFSM) is 
picked when the "pauseless" scheme is
+    // passed
+    
Assert.assertTrue(SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultPauselessFsmScheme(),
+        segmentCompletionManager, pinotLLCRealtimeSegmentManager, new 
LLCSegmentName(LLC_SEGMENT_NAME),
+        segmentZKMetadata) instanceof PauselessSegmentCompletionFSM);
+  }
+
+  @Test
+  public void testCreateFSMWithCustomProvidedFsm() {
+
+    String fakeCustomeFsmClass =
+        
"org.apache.pinot.controller.helix.core.realtime.SegmentCompletionFSMFactoryTest$FakeCustomFSM";
+
+    PinotConfiguration pinotConfiguration = new PinotConfiguration(
+        Map.of(DEFAULT_PAUSELESS_FSM_SCHEME_KEY, fakeCustomeFsmClass, 
DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_CLASS));
+
+    // mock different objects that are required for creating FSM
+    SegmentCompletionConfig segmentCompletionConfig = new 
SegmentCompletionConfig(pinotConfiguration);
+    SegmentCompletionFSMFactory.init(segmentCompletionConfig);
+
+    SegmentCompletionManager segmentCompletionManager = 
mock(SegmentCompletionManager.class);
+    LongMsgOffsetFactory longMsgOffsetFactory = 
mock(LongMsgOffsetFactory.class);
+    
when(segmentCompletionManager.getCurrentTimeMs()).thenReturn(System.currentTimeMillis());
+    
when(segmentCompletionManager.getStreamPartitionMsgOffsetFactory(any())).thenReturn(longMsgOffsetFactory);
+    when(longMsgOffsetFactory.create(anyString())).thenReturn(new 
LongMsgOffset(100));
+
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata.getNumReplicas()).thenReturn(3);
+    when(segmentZKMetadata.getEndOffset()).thenReturn("100");
+
+    PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
+    
when(pinotLLCRealtimeSegmentManager.getCommitTimeoutMS(anyString())).thenReturn(System.currentTimeMillis());
+
+    // Assert that registering for the same scheme i.e. "default", works as 
expected and BlockingSegmentCompletionFSM
+    // object is created
+    Assert.assertTrue(
+        
SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultFsmScheme(),
 segmentCompletionManager,
+            pinotLLCRealtimeSegmentManager, new 
LLCSegmentName(LLC_SEGMENT_NAME),
+            segmentZKMetadata) instanceof BlockingSegmentCompletionFSM);
+
+    // Assert that changing the default for pauseless returns the new custom 
fsm.
+    
Assert.assertTrue(SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultPauselessFsmScheme(),
+        segmentCompletionManager, pinotLLCRealtimeSegmentManager, new 
LLCSegmentName(LLC_SEGMENT_NAME),
+        segmentZKMetadata) instanceof FakeCustomFSM);
+  }
+
+  public static class FakeCustomFSM extends BlockingSegmentCompletionFSM {
+
+    public FakeCustomFSM(PinotLLCRealtimeSegmentManager segmentManager,
+        SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName,
+        SegmentZKMetadata segmentMetadata) {
+      super(segmentManager, segmentCompletionManager, segmentName, 
segmentMetadata);
+    }
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
index 6991ebd876..c8f860cdf2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
@@ -30,7 +30,6 @@ import 
org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.controller.BaseControllerStarter;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
 import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
 import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
-import static 
org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
 import static org.testng.Assert.assertTrue;
 
 
@@ -79,8 +77,6 @@ public abstract class BasePauselessRealtimeIngestionTest 
extends BaseClusterInte
   protected void overrideControllerConf(Map<String, Object> properties) {
     
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
     
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
 true);
-    properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
-        
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
     
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
         500);
   }
@@ -158,10 +154,6 @@ public abstract class BasePauselessRealtimeIngestionTest 
extends BaseClusterInte
     ingestionConfig.setStreamIngestionConfig(
         new 
StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
     
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
-    ingestionConfig.getStreamIngestionConfig()
-        .getStreamConfigMaps()
-        .get(0)
-        .put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
     tableConfig.getIndexingConfig().setStreamConfigs(null);
     tableConfig.setIngestionConfig(ingestionConfig);
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
index 0c816b6563..d6a930d164 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
@@ -22,7 +22,9 @@ import java.util.List;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.util.FailureInjectionUtils;
 import 
org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNull;
@@ -31,6 +33,11 @@ import static org.testng.Assert.assertNull;
 public class PauselessRealtimeIngestionCommitEndMetadataFailureTest
     extends BasePauselessRealtimeIngestionTest {
 
+  // The total number of real-time segments is NUM_REALTIME_SEGMENTS.
+  // Two segments are in the CONSUMING state, meaning they are still ingesting 
data.
+  // The remaining segments (NUM_REALTIME_SEGMENTS - 2) are in the committing 
state,
+  private static final int NUM_REALTIME_SEGMENTS_IN_COMMITTING_STATE = 46;
+
   @Override
   protected String getFailurePoint() {
     return FailureInjectionUtils.FAULT_BEFORE_COMMIT_END_METADATA;
@@ -57,6 +64,21 @@ public class 
PauselessRealtimeIngestionCommitEndMetadataFailureTest
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
     PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, 
NUM_REALTIME_SEGMENTS, _helixManager);
 
+    // the setup() function only waits till all the documents have been loaded
+    // this has lead to race condition in the commit protocol and validation 
manager run leading to test failures
+    // checking the completion of the commit protocol i.e. segment marked 
COMMITTING before triggering
+    // validation manager in the tests prevents this.
+
+    // All the segments should be in COMMITTING state for pauseless table 
having commitEndMetadata failure
+    TestUtils.waitForCondition((aVoid) -> {
+      List<SegmentZKMetadata> segmentZKMetadataList =
+          _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+      return segmentZKMetadataList.stream()
+          .filter(
+              segmentZKMetadata -> segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING)
+          .count() == NUM_REALTIME_SEGMENTS_IN_COMMITTING_STATE;
+    }, 1000, 100000, "Some segments are still IN_PROGRESS");
+
     List<SegmentZKMetadata> segmentZKMetadataList = 
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
     for (SegmentZKMetadata metadata : segmentZKMetadataList) {
       assertNull(metadata.getDownloadUrl());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 2e706b20cd..5eb286c2a6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -35,7 +35,6 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.BaseControllerStarter;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
 import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
 import 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
 import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
@@ -54,7 +53,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingRealtimeTableDataManager.MAX_NUMBER_OF_FAILURES;
-import static 
org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
@@ -72,8 +70,6 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
   protected void overrideControllerConf(Map<String, Object> properties) {
     
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
     
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
 true);
-    properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
-        
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
     // Set the delay more than the time we sleep before triggering 
RealtimeSegmentValidationManager manually, i.e.
     // MAX_SEGMENT_COMPLETION_TIME_MILLIS, to ensure that the segment level 
validations are performed.
     
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
@@ -155,7 +151,6 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
     Map<String, String> streamConfigMap = 
ingestionConfig.getStreamIngestionConfig()
         .getStreamConfigMaps()
         .get(0);
-    streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
     streamConfigMap.put("segmentDownloadTimeoutMinutes", "1");
     tableConfig.getIndexingConfig().setStreamConfigs(null);
     tableConfig.setIngestionConfig(ingestionConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to