This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eb4d0e1b95 Registering pauseless FSM by default and picking this FSM
for pauseless enabled tables (#15241)
eb4d0e1b95 is described below
commit eb4d0e1b95e8f49303ffdb73062ee06c905f4125
Author: 9aman <[email protected]>
AuthorDate: Mon Mar 17 12:56:30 2025 +0530
Registering pauseless FSM by default and picking this FSM for pauseless
enabled tables (#15241)
* Registering pauseless FSM by default and picking this FSM for pauseless
enabled tables
* changing name of the fsm to ensure backward compatibility
* Fixed following
- The getDefaultFsmScheme was returning class name instead of the scheme
- re-registering the same scheme should just warn instead of impacting
controller restarts
Added tests for above
* Fixing a small nit comment for fetching FSM class names in the unit tests
---
.../core/realtime/SegmentCompletionConfig.java | 13 +-
.../core/realtime/SegmentCompletionFSMFactory.java | 11 +-
.../core/realtime/SegmentCompletionManager.java | 4 +-
.../core/realtime/SegmentCompletionConfigTest.java | 50 ++++++++
.../realtime/SegmentCompletionFSMFactoryTest.java | 136 +++++++++++++++++++++
.../tests/BasePauselessRealtimeIngestionTest.java | 8 --
...ltimeIngestionCommitEndMetadataFailureTest.java | 22 ++++
...sRealtimeIngestionSegmentCommitFailureTest.java | 5 -
8 files changed, 227 insertions(+), 22 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
index c5d0ece4c6..aecdab227d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfig.java
@@ -29,9 +29,11 @@ public class SegmentCompletionConfig {
CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION
+ ".fsm.scheme.";
public static final String DEFAULT_FSM_SCHEME_KEY =
CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION
+ ".fsm.scheme.default";
+ public static final String DEFAULT_PAUSELESS_FSM_SCHEME_KEY =
+ CommonConstants.Controller.PREFIX_OF_PINOT_CONTROLLER_SEGMENT_COMPLETION
+ ".fsm.scheme.pauseless";
public static final String DEFAULT_FSM_SCHEME = "default";
+ public static final String DEFAULT_PAUSELESS_FSM_SCHEME = "pauseless";
private final Map<String, String> _fsmSchemes = new HashMap<>();
- private final String _defaultFsmScheme;
public SegmentCompletionConfig(PinotConfiguration configuration) {
// Parse properties to extract FSM schemes
@@ -43,9 +45,6 @@ public class SegmentCompletionConfig {
_fsmSchemes.put(scheme, className);
}
}
-
- // Get the default FSM scheme
- _defaultFsmScheme = configuration.getProperty(DEFAULT_FSM_SCHEME_KEY,
DEFAULT_FSM_SCHEME);
}
public Map<String, String> getFsmSchemes() {
@@ -53,6 +52,10 @@ public class SegmentCompletionConfig {
}
public String getDefaultFsmScheme() {
- return _defaultFsmScheme;
+ return DEFAULT_FSM_SCHEME;
+ }
+
+ public String getDefaultPauselessFsmScheme() {
+ return DEFAULT_PAUSELESS_FSM_SCHEME;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
index 5ac238d626..196e928cd3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactory.java
@@ -34,9 +34,10 @@ public class SegmentCompletionFSMFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentCompletionFSMFactory.class);
private static final Map<String, Class<? extends SegmentCompletionFSM>>
FSM_CLASS_MAP = new HashMap<>();
- // Static block to register the default FSM
+ // Static block to register the default FSM and pauseless FSM
static {
register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME,
BlockingSegmentCompletionFSM.class);
+ register(SegmentCompletionConfig.DEFAULT_PAUSELESS_FSM_SCHEME,
PauselessSegmentCompletionFSM.class);
}
/**
@@ -48,8 +49,12 @@ public class SegmentCompletionFSMFactory {
public static void register(String scheme, Class<? extends
SegmentCompletionFSM> fsmClass) {
Preconditions.checkNotNull(scheme, "Scheme cannot be null");
Preconditions.checkNotNull(fsmClass, "FSM Class cannot be null");
- Preconditions.checkState(FSM_CLASS_MAP.put(scheme, fsmClass) == null,
- "FSM class already registered for scheme: " + scheme);
+
+ Class<? extends SegmentCompletionFSM> previousFsmClass =
FSM_CLASS_MAP.put(scheme, fsmClass);
+ if (previousFsmClass != null) {
+ LOGGER.warn("Replacing existing FSM: {} for scheme: {} with: {}",
+ previousFsmClass, scheme, fsmClass);
+ }
LOGGER.info("Registered SegmentCompletionFSM class {} for scheme {}",
fsmClass, scheme);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 3dbd209745..ffdbd44e05 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -28,6 +28,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.LeadControllerManager;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -138,7 +139,8 @@ public class SegmentCompletionManager {
}
if (factoryName == null) {
- factoryName = _segmentCompletionConfig.getDefaultFsmScheme();
+ factoryName = PauselessConsumptionUtils.isPauselessEnabled(tableConfig)
+ ? _segmentCompletionConfig.getDefaultPauselessFsmScheme() :
_segmentCompletionConfig.getDefaultFsmScheme();
}
Preconditions.checkState(SegmentCompletionFSMFactory.isFactoryTypeSupported(factoryName),
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfigTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfigTest.java
new file mode 100644
index 0000000000..f844cacef2
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionConfigTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig.*;
+
+
+public class SegmentCompletionConfigTest {
+
+ private static final String DEFAULT_FSM_CLASS =
BlockingSegmentCompletionFSM.class.getName();
+ private static final String PAUSELESS_FSM_CLASS =
PauselessSegmentCompletionFSM.class.getName();
+ private static final String CUSTOM_FSM_CLASS =
+
"org.apache.pinot.controller.helix.core.realtime.CustomSegmentCompletionFSM";
+
+ @Test
+ public void testGetFsmSchemes() {
+ String customSchemeKey = FSM_SCHEME + "custom";
+ PinotConfiguration pinotConfiguration = new PinotConfiguration(
+ Map.of(DEFAULT_PAUSELESS_FSM_SCHEME_KEY, PAUSELESS_FSM_CLASS,
DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_CLASS,
+ customSchemeKey, CUSTOM_FSM_CLASS));
+ SegmentCompletionConfig segmentCompletionConfig = new
SegmentCompletionConfig(pinotConfiguration);
+ Map<String, String> expectedFsmSchemes =
+ Map.of(DEFAULT_FSM_SCHEME, DEFAULT_FSM_CLASS,
DEFAULT_PAUSELESS_FSM_SCHEME, PAUSELESS_FSM_CLASS, "custom",
+ CUSTOM_FSM_CLASS);
+ Assert.assertEquals(expectedFsmSchemes,
segmentCompletionConfig.getFsmSchemes());
+ Assert.assertEquals(segmentCompletionConfig.getDefaultFsmScheme(),
DEFAULT_FSM_SCHEME);
+
Assert.assertEquals(segmentCompletionConfig.getDefaultPauselessFsmScheme(),
DEFAULT_PAUSELESS_FSM_SCHEME);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
new file mode 100644
index 0000000000..1eda2de31d
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionFSMFactoryTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig.DEFAULT_FSM_SCHEME_KEY;
+import static
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig.DEFAULT_PAUSELESS_FSM_SCHEME_KEY;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class SegmentCompletionFSMFactoryTest {
+ private static final String DEFAULT_FSM_CLASS =
BlockingSegmentCompletionFSM.class.getName();
+ private static final String LLC_SEGMENT_NAME =
"tableName__0__0__20250228T1903Z";
+
+ @AfterMethod
+ public void tearDown() {
+ // Clear the factory's static state between tests
+ SegmentCompletionFSMFactory.shutdown();
+
+ // Re-register the default mappings
+
SegmentCompletionFSMFactory.register(SegmentCompletionConfig.DEFAULT_FSM_SCHEME,
+ BlockingSegmentCompletionFSM.class);
+
SegmentCompletionFSMFactory.register(SegmentCompletionConfig.DEFAULT_PAUSELESS_FSM_SCHEME,
+ PauselessSegmentCompletionFSM.class);
+ }
+
+ @Test
+ public void testCreateFSMWithDefaultFsm() {
+ PinotConfiguration pinotConfiguration = new
PinotConfiguration(Collections.emptyMap());
+ SegmentCompletionConfig segmentCompletionConfig = new
SegmentCompletionConfig(pinotConfiguration);
+ SegmentCompletionFSMFactory.init(segmentCompletionConfig);
+
+ // mock different objects that are required for creating FSM
+ SegmentCompletionManager segmentCompletionManager =
mock(SegmentCompletionManager.class);
+ LongMsgOffsetFactory longMsgOffsetFactory =
mock(LongMsgOffsetFactory.class);
+
when(segmentCompletionManager.getCurrentTimeMs()).thenReturn(System.currentTimeMillis());
+
when(segmentCompletionManager.getStreamPartitionMsgOffsetFactory(any())).thenReturn(longMsgOffsetFactory);
+ when(longMsgOffsetFactory.create(anyString())).thenReturn(new
LongMsgOffset(100));
+
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata.getNumReplicas()).thenReturn(3);
+ when(segmentZKMetadata.getEndOffset()).thenReturn("100");
+
+ PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager =
mock(PinotLLCRealtimeSegmentManager.class);
+
when(pinotLLCRealtimeSegmentManager.getCommitTimeoutMS(anyString())).thenReturn(System.currentTimeMillis());
+
+ // Asset that the default fsm (BlockingSegmentCompletionFSM) is picked
when the "default" scheme is passed
+ Assert.assertTrue(
+
SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultFsmScheme(),
segmentCompletionManager,
+ pinotLLCRealtimeSegmentManager, new
LLCSegmentName(LLC_SEGMENT_NAME),
+ segmentZKMetadata) instanceof BlockingSegmentCompletionFSM);
+
+ // Asset that the default pauseless fsm (PauselessSegmentCompletionFSM) is
picked when the "pauseless" scheme is
+ // passed
+
Assert.assertTrue(SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultPauselessFsmScheme(),
+ segmentCompletionManager, pinotLLCRealtimeSegmentManager, new
LLCSegmentName(LLC_SEGMENT_NAME),
+ segmentZKMetadata) instanceof PauselessSegmentCompletionFSM);
+ }
+
+ @Test
+ public void testCreateFSMWithCustomProvidedFsm() {
+
+ String fakeCustomeFsmClass =
+
"org.apache.pinot.controller.helix.core.realtime.SegmentCompletionFSMFactoryTest$FakeCustomFSM";
+
+ PinotConfiguration pinotConfiguration = new PinotConfiguration(
+ Map.of(DEFAULT_PAUSELESS_FSM_SCHEME_KEY, fakeCustomeFsmClass,
DEFAULT_FSM_SCHEME_KEY, DEFAULT_FSM_CLASS));
+
+ // mock different objects that are required for creating FSM
+ SegmentCompletionConfig segmentCompletionConfig = new
SegmentCompletionConfig(pinotConfiguration);
+ SegmentCompletionFSMFactory.init(segmentCompletionConfig);
+
+ SegmentCompletionManager segmentCompletionManager =
mock(SegmentCompletionManager.class);
+ LongMsgOffsetFactory longMsgOffsetFactory =
mock(LongMsgOffsetFactory.class);
+
when(segmentCompletionManager.getCurrentTimeMs()).thenReturn(System.currentTimeMillis());
+
when(segmentCompletionManager.getStreamPartitionMsgOffsetFactory(any())).thenReturn(longMsgOffsetFactory);
+ when(longMsgOffsetFactory.create(anyString())).thenReturn(new
LongMsgOffset(100));
+
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata.getNumReplicas()).thenReturn(3);
+ when(segmentZKMetadata.getEndOffset()).thenReturn("100");
+
+ PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager =
mock(PinotLLCRealtimeSegmentManager.class);
+
when(pinotLLCRealtimeSegmentManager.getCommitTimeoutMS(anyString())).thenReturn(System.currentTimeMillis());
+
+ // Assert that registering for the same scheme i.e. "default", works as
expected and BlockingSegmentCompletionFSM
+ // object is created
+ Assert.assertTrue(
+
SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultFsmScheme(),
segmentCompletionManager,
+ pinotLLCRealtimeSegmentManager, new
LLCSegmentName(LLC_SEGMENT_NAME),
+ segmentZKMetadata) instanceof BlockingSegmentCompletionFSM);
+
+ // Assert that changing the default for pauseless returns the new custom
fsm.
+
Assert.assertTrue(SegmentCompletionFSMFactory.createFSM(segmentCompletionConfig.getDefaultPauselessFsmScheme(),
+ segmentCompletionManager, pinotLLCRealtimeSegmentManager, new
LLCSegmentName(LLC_SEGMENT_NAME),
+ segmentZKMetadata) instanceof FakeCustomFSM);
+ }
+
+ public static class FakeCustomFSM extends BlockingSegmentCompletionFSM {
+
+ public FakeCustomFSM(PinotLLCRealtimeSegmentManager segmentManager,
+ SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName,
+ SegmentZKMetadata segmentMetadata) {
+ super(segmentManager, segmentCompletionManager, segmentName,
segmentMetadata);
+ }
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
index 6991ebd876..c8f860cdf2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
@@ -30,7 +30,6 @@ import
org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
import
org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import static
org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertTrue;
@@ -79,8 +77,6 @@ public abstract class BasePauselessRealtimeIngestionTest
extends BaseClusterInte
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
true);
- properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
-
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
500);
}
@@ -158,10 +154,6 @@ public abstract class BasePauselessRealtimeIngestionTest
extends BaseClusterInte
ingestionConfig.setStreamIngestionConfig(
new
StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs())));
ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
- ingestionConfig.getStreamIngestionConfig()
- .getStreamConfigMaps()
- .get(0)
- .put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
index 0c816b6563..d6a930d164 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionCommitEndMetadataFailureTest.java
@@ -22,7 +22,9 @@ import java.util.List;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.core.util.FailureInjectionUtils;
import
org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNull;
@@ -31,6 +33,11 @@ import static org.testng.Assert.assertNull;
public class PauselessRealtimeIngestionCommitEndMetadataFailureTest
extends BasePauselessRealtimeIngestionTest {
+ // The total number of real-time segments is NUM_REALTIME_SEGMENTS.
+ // Two segments are in the CONSUMING state, meaning they are still ingesting
data.
+ // The remaining segments (NUM_REALTIME_SEGMENTS - 2) are in the committing
state,
+ private static final int NUM_REALTIME_SEGMENTS_IN_COMMITTING_STATE = 46;
+
@Override
protected String getFailurePoint() {
return FailureInjectionUtils.FAULT_BEFORE_COMMIT_END_METADATA;
@@ -57,6 +64,21 @@ public class
PauselessRealtimeIngestionCommitEndMetadataFailureTest
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType,
NUM_REALTIME_SEGMENTS, _helixManager);
+ // the setup() function only waits till all the documents have been loaded
+ // this has lead to race condition in the commit protocol and validation
manager run leading to test failures
+ // checking the completion of the commit protocol i.e. segment marked
COMMITTING before triggering
+ // validation manager in the tests prevents this.
+
+ // All the segments should be in COMMITTING state for pauseless table
having commitEndMetadata failure
+ TestUtils.waitForCondition((aVoid) -> {
+ List<SegmentZKMetadata> segmentZKMetadataList =
+ _helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+ return segmentZKMetadataList.stream()
+ .filter(
+ segmentZKMetadata -> segmentZKMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.COMMITTING)
+ .count() == NUM_REALTIME_SEGMENTS_IN_COMMITTING_STATE;
+ }, 1000, 100000, "Some segments are still IN_PROGRESS");
+
List<SegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata metadata : segmentZKMetadataList) {
assertNull(metadata.getDownloadUrl());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 2e706b20cd..5eb286c2a6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -35,7 +35,6 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
import
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
@@ -54,7 +53,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static
org.apache.pinot.integration.tests.realtime.utils.FailureInjectingRealtimeTableDataManager.MAX_NUMBER_OF_FAILURES;
-import static
org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
@@ -72,8 +70,6 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
true);
- properties.put(SegmentCompletionConfig.FSM_SCHEME + "pauseless",
-
"org.apache.pinot.controller.helix.core.realtime.PauselessSegmentCompletionFSM");
// Set the delay more than the time we sleep before triggering
RealtimeSegmentValidationManager manually, i.e.
// MAX_SEGMENT_COMPLETION_TIME_MILLIS, to ensure that the segment level
validations are performed.
properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
@@ -155,7 +151,6 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
Map<String, String> streamConfigMap =
ingestionConfig.getStreamIngestionConfig()
.getStreamConfigMaps()
.get(0);
- streamConfigMap.put(SEGMENT_COMPLETION_FSM_SCHEME, "pauseless");
streamConfigMap.put("segmentDownloadTimeoutMinutes", "1");
tableConfig.getIndexingConfig().setStreamConfigs(null);
tableConfig.setIngestionConfig(ingestionConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]