This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aee5a1f10ae Include MSE pipeline breaker stats by default (#18601)
aee5a1f10ae is described below
commit aee5a1f10ae0d82d8c806e0bf2cebd8b99b886b6
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 12:56:10 2026 -0700
Include MSE pipeline breaker stats by default (#18601)
---
.../tests/MultiStageEngineIntegrationTest.java | 122 +++++++++++----------
.../apache/pinot/spi/utils/CommonConstants.java | 11 +-
2 files changed, 72 insertions(+), 61 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 025820debb3..e6008353e33 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -2329,16 +2329,13 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
- public void testStageStatsPipelineBreaker()
- throws Exception {
- HelixConfigScope scope =
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
- .build();
- try {
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
- String query = "select * from mytable "
- + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ public void testStageStatsPipelineBreaker() {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ // Pipeline breaker stats are kept by default. Retry in case a sibling
test that overrode the default has just
+ // finished and the reset has not yet propagated to the server.
+ String errorMsg = "Failed to verify presence of pipeline breaker stats
after multiple attempts";
+ TestUtils.waitForCondition(() -> {
JsonNode response = postQuery(query);
assertNotNull(response.get("stageStats"), "Should have stage stats");
@@ -2352,6 +2349,11 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+ if (mytableLeaf.get("children") == null) {
+ // Sibling test's reset has not yet propagated. Retry.
+ return false;
+ }
+
JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
@@ -2361,29 +2363,24 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
- } finally {
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
- }
+ return true;
+ }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
}
@Test
- public void testPipelineBreakerKeepsNumGroupsLimitReached()
- throws Exception {
- HelixConfigScope scope =
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
- .build();
- try {
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
- String query = ""
- + "SET numGroupsLimit = 1;"
- + "SELECT * FROM daysOfWeek "
- + "WHERE dayid in ("
- + " SELECT DayOfWeek FROM mytable"
- + " GROUP BY DayOfWeek"
- + ")";
+ public void testPipelineBreakerKeepsNumGroupsLimitReached() {
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+ // Pipeline breaker stats are kept by default. Retry in case a sibling
test that overrode the default has just
+ // finished and the reset has not yet propagated to the server.
+ String errorMsg = "Failed to verify numGroupsLimitReached on a pipeline
breaker after multiple attempts";
+ TestUtils.waitForCondition(() -> {
JsonNode response = postQuery(query);
assertNotNull(response.get("stageStats"), "Should have stage stats");
@@ -2397,6 +2394,11 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+ if (mytableLeaf.get("children") == null) {
+ // Sibling test's reset has not yet propagated. Retry.
+ return false;
+ }
+
JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
@@ -2406,36 +2408,46 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
.describedAs("numGroupsLimitReached should be true even when the
limit is reached on a pipeline breaker")
.isEqualTo(true);
- } finally {
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
- }
+ return true;
+ }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
}
@Test
public void testPipelineBreakerWithoutKeepingStats() {
- // let's try several times to give helix time to propagate the config
change
- String errorMsg = "Failed to verify absence of pipeline breaker stats
after multiple attempts after 10 attempts";
- TestUtils.waitForCondition(() -> {
- String query = "select * from mytable "
- + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
- JsonNode response = postQuery(query);
- assertNotNull(response.get("stageStats"), "Should have stage stats");
-
- JsonNode receiveNode = response.get("stageStats");
-
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
-
- JsonNode sendNode = receiveNode.get("children").get(0);
-
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
-
- JsonNode mytableLeaf = sendNode.get("children").get(0);
-
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
-
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
-
- Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker
stats are not kept, "
- + "there should be no children under the leaf node");
- return true;
- }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ // Pipeline breaker stats are kept by default, so explicitly skip them
for this test.
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ // let's try several times to give helix time to propagate the config
change
+ String errorMsg = "Failed to verify absence of pipeline breaker stats
after multiple attempts after 10 attempts";
+ TestUtils.waitForCondition(() -> {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ // Once the config change has propagated, there should be no children
(pipeline breaker stats) under the leaf
+ // node. Return the result instead of asserting so that
waitForCondition keeps retrying while the change is
+ // still propagating (AssertionError would not be caught and retried).
+ return mytableLeaf.get("children") == null;
+ }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ }
}
@AfterClass
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 694f64cc4e3..20ec9305f5a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2526,13 +2526,12 @@ public class CommonConstants {
public static final String KEY_OF_SEND_STATS_MODE =
"pinot.query.mse.stats.mode";
public static final String DEFAULT_SEND_STATS_MODE = "ALWAYS";
- /// Used to indicate whether MSE pipeline breaker stats should be included
in the queryStats field.
- /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker
stats were not kept. Starting from 1.5.0,
- /// they are not included by default but can be included by setting this
flag to false (upper or lower case).
- ///
- /// It is expected that in 1.6.0 and later, MSE pipeline breaker stats
will be included by default.
+ /// Used to indicate whether MSE pipeline breaker stats should be included
in the stageStats field.
+ /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker
stats were not kept. In 1.5.0 they were
+ /// not included by default but could be included by setting this flag to
false (upper or lower case). Starting
+ /// from 1.6.0, they are included by default and can be excluded by
setting this flag to true (upper or lower case).
public static final String KEY_OF_SKIP_PIPELINE_BREAKER_STATS =
"pinot.query.mse.skip.pipeline.breaker.stats";
- public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = true;
+ public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = false;
/// Used to indicate that MSE stats should be logged at INFO level for
successful queries.
///
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]