This is an automated email from the ASF dual-hosted git repository. Jackie-Jiang pushed a commit to branch hotfix_18588 in repository https://gitbox.apache.org/repos/asf/pinot.git
commit fc6efbab676dcf4880e5034cff20a9475596ae39 Author: Yash Mayya <[email protected]> AuthorDate: Thu May 28 13:00:13 2026 -0700 Include MSE pipeline breaker stats by default (#18601) (#585) --- .../tests/MultiStageEngineIntegrationTest.java | 122 +++++++++++---------- .../apache/pinot/spi/utils/CommonConstants.java | 11 +- 2 files changed, 72 insertions(+), 61 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 34bc3ef93bc..534f810f849 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -2199,16 +2199,13 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS } @Test - public void testStageStatsPipelineBreaker() - throws Exception { - HelixConfigScope scope = - new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) - .build(); - try { - _helixManager.getConfigAccessor() - .set(scope, CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, "false"); - String query = "select * from mytable " - + "WHERE DayOfWeek in (select dayid from daysOfWeek)"; + public void testStageStatsPipelineBreaker() { + String query = "select * from mytable " + + "WHERE DayOfWeek in (select dayid from daysOfWeek)"; + // Pipeline breaker stats are kept by default. Retry in case a sibling test that overrode the default has just + // finished and the reset has not yet propagated to the server. + String errorMsg = "Failed to verify presence of pipeline breaker stats after multiple attempts"; + TestUtils.waitForCondition(() -> { JsonNode response = postQuery(query); assertNotNull(response.get("stageStats"), "Should have stage stats"); @@ -2222,6 +2219,11 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF"); Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable"); + if (mytableLeaf.get("children") == null) { + // Sibling test's reset has not yet propagated. Retry. + return false; + } + JsonNode pipelineReceive = mytableLeaf.get("children").get(0); Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE"); @@ -2231,29 +2233,24 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0); Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF"); Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek"); - } finally { - _helixManager.getConfigAccessor() - .set(scope, CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, "true"); - } + return true; + }, 100, 10_000L, errorMsg, Duration.ofSeconds(1)); } @Test - public void testPipelineBreakerKeepsNumGroupsLimitReached() - throws Exception { - HelixConfigScope scope = - new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) - .build(); - try { - _helixManager.getConfigAccessor() - .set(scope, CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, "false"); - String query = "" - + "SET numGroupsLimit = 1;" - + "SELECT * FROM daysOfWeek " - + "WHERE dayid in (" - + " SELECT DayOfWeek FROM mytable" - + " GROUP BY DayOfWeek" - + ")"; + public void testPipelineBreakerKeepsNumGroupsLimitReached() { + String query = "" + + "SET numGroupsLimit = 1;" + + "SELECT * FROM daysOfWeek " + + "WHERE dayid in (" + + " SELECT DayOfWeek FROM mytable" + + " GROUP BY DayOfWeek" + + ")"; + // Pipeline breaker stats are kept by default. Retry in case a sibling test that overrode the default has just + // finished and the reset has not yet propagated to the server. + String errorMsg = "Failed to verify numGroupsLimitReached on a pipeline breaker after multiple attempts"; + TestUtils.waitForCondition(() -> { JsonNode response = postQuery(query); assertNotNull(response.get("stageStats"), "Should have stage stats"); @@ -2267,6 +2264,11 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF"); Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek"); + if (mytableLeaf.get("children") == null) { + // Sibling test's reset has not yet propagated. Retry. + return false; + } + JsonNode pipelineReceive = mytableLeaf.get("children").get(0); Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE"); @@ -2276,36 +2278,46 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false)) .describedAs("numGroupsLimitReached should be true even when the limit is reached on a pipeline breaker") .isEqualTo(true); - } finally { - _helixManager.getConfigAccessor() - .set(scope, CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, "true"); - } + return true; + }, 100, 10_000L, errorMsg, Duration.ofSeconds(1)); } @Test public void testPipelineBreakerWithoutKeepingStats() { - // let's try several times to give helix time to propagate the config change - String errorMsg = "Failed to verify absence of pipeline breaker stats after multiple attempts after 10 attempts"; - TestUtils.waitForCondition(() -> { - String query = "select * from mytable " - + "WHERE DayOfWeek in (select dayid from daysOfWeek)"; - JsonNode response = postQuery(query); - assertNotNull(response.get("stageStats"), "Should have stage stats"); - - JsonNode receiveNode = response.get("stageStats"); - Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE"); - - JsonNode sendNode = receiveNode.get("children").get(0); - Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND"); - - JsonNode mytableLeaf = sendNode.get("children").get(0); - Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF"); - Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable"); - - Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker stats are not kept, " - + "there should be no children under the leaf node"); - return true; - }, 100, 10_000L, errorMsg, Duration.ofSeconds(1)); + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + try { + // Pipeline breaker stats are kept by default, so explicitly skip them for this test. + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, "true"); + // let's try several times to give helix time to propagate the config change + String errorMsg = "Failed to verify absence of pipeline breaker stats after multiple attempts after 10 attempts"; + TestUtils.waitForCondition(() -> { + String query = "select * from mytable " + + "WHERE DayOfWeek in (select dayid from daysOfWeek)"; + JsonNode response = postQuery(query); + assertNotNull(response.get("stageStats"), "Should have stage stats"); + + JsonNode receiveNode = response.get("stageStats"); + Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE"); + + JsonNode sendNode = receiveNode.get("children").get(0); + Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND"); + + JsonNode mytableLeaf = sendNode.get("children").get(0); + Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF"); + Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable"); + + // Once the config change has propagated, there should be no children (pipeline breaker stats) under the leaf + // node. Return the result instead of asserting so that waitForCondition keeps retrying while the change is + // still propagating (AssertionError would not be caught and retried). + return mytableLeaf.get("children") == null; + }, 100, 10_000L, errorMsg, Duration.ofSeconds(1)); + } finally { + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, "false"); + } } @AfterClass diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index fad1c44f4af..dfcceca260c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -2207,13 +2207,12 @@ public class CommonConstants { public static final String KEY_OF_SEND_STATS_MODE = "pinot.query.mse.stats.mode"; public static final String DEFAULT_SEND_STATS_MODE = "ALWAYS"; - /// Used to indicate whether MSE pipeline breaker stats should be included in the queryStats field. - /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker stats were not kept. Starting from 1.5.0, - /// they are not included by default but can be included by setting this flag to false (upper or lower case). - /// - /// It is expected that in 1.6.0 and later, MSE pipeline breaker stats will be included by default. + /// Used to indicate whether MSE pipeline breaker stats should be included in the stageStats field. + /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker stats were not kept. In 1.5.0 they were + /// not included by default but could be included by setting this flag to false (upper or lower case). Starting + /// from 1.6.0, they are included by default and can be excluded by setting this flag to true (upper or lower case). public static final String KEY_OF_SKIP_PIPELINE_BREAKER_STATS = "pinot.query.mse.skip.pipeline.breaker.stats"; - public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = true; + public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = false; /// Used to indicate that MSE stats should be logged at INFO level for successful queries. /// --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
