yashmayya commented on code in PR #17576:
URL: https://github.com/apache/pinot/pull/17576#discussion_r2743166324


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,112 @@ public void testNaturalJoinWithNoVirtualColumns()
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    String query = "select * from mytable "
+        + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+    JsonNode response = postQuery(query);
+    assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+    JsonNode receiveNode = response.get("stageStats");
+    
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode sendNode = receiveNode.get("children").get(0);
+    
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode mytableLeaf = sendNode.get("children").get(0);
+    Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+    
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+    JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+    
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+    
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+    
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+    
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+  }
+
+  @Test
+  public void testPipelineBreakerKeepsNumGroupsLimitReached()
+      throws Exception {
+    String query = ""
+        + "SET numGroupsLimit = 1;"
+        + "SELECT * FROM daysOfWeek "
+        + "WHERE dayid in ("
+        + " SELECT DayOfWeek FROM mytable"
+        + " GROUP BY DayOfWeek"
+        + ")";
+
+    JsonNode response = postQuery(query);
+    assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+    JsonNode receiveNode = response.get("stageStats");
+    
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode sendNode = receiveNode.get("children").get(0);
+    
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode mytableLeaf = sendNode.get("children").get(0);
+    Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+    
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+    JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+    
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+    
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+        .describedAs("numGroupsLimitReached should be true even when the limit 
is reached on a pipeline breaker")
+        .isEqualTo(true);
+  }
+
+  @Test
+  public void testPipelineBreakerWithoutKeepingStats()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    _helixManager.getConfigAccessor()
+        .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_KEEP_PIPELINE_BREAKER_STATS, 
"false");
+    try {
+      // lets try several times to give helix time to propagate the config 
change
+      for (int i = 0; i < 10; i++) {
+        try {
+          String query = "select * from mytable "
+              + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+          JsonNode response = postQuery(query);
+          assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+          JsonNode receiveNode = response.get("stageStats");
+          
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+          JsonNode sendNode = receiveNode.get("children").get(0);
+          
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+          JsonNode mytableLeaf = sendNode.get("children").get(0);
+          
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+          
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+          Assert.assertNull(mytableLeaf.get("children"), "When pipeline 
breaker stats are not kept, "
+              + "there should be no children under the leaf node");
+          System.out.println("Successfully verified absence of pipeline 
breaker stats on attempt " + (i + 1));

Review Comment:
   Let's remove this?



##########
compatibility-verifier/compCheck.sh:
##########
@@ -527,6 +527,7 @@ if [ -f "${SERVER_CONF_2}" ]; then
   stopService server2
   startService server2 "$oldTargetDir" "$SERVER_CONF_2"
   waitForServer2Ready
+  sleep 300

Review Comment:
   What's this for?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,112 @@ public void testNaturalJoinWithNoVirtualColumns()
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    String query = "select * from mytable "
+        + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+    JsonNode response = postQuery(query);
+    assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+    JsonNode receiveNode = response.get("stageStats");
+    
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode sendNode = receiveNode.get("children").get(0);
+    
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode mytableLeaf = sendNode.get("children").get(0);

Review Comment:
   So if IIUC, the structure would look like `LEAF <- MAILBOX_RECEIVE <- 
MAILBOX_SEND <- LEAF`? And we don't specifically have a `PIPELINE_BREAKER` node 
in the stats? 



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,112 @@ public void testNaturalJoinWithNoVirtualColumns()
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    String query = "select * from mytable "
+        + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+    JsonNode response = postQuery(query);
+    assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+    JsonNode receiveNode = response.get("stageStats");
+    
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode sendNode = receiveNode.get("children").get(0);
+    
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode mytableLeaf = sendNode.get("children").get(0);
+    Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+    
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+    JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+    
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+    
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+    
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+    
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+  }
+
+  @Test
+  public void testPipelineBreakerKeepsNumGroupsLimitReached()
+      throws Exception {
+    String query = ""
+        + "SET numGroupsLimit = 1;"
+        + "SELECT * FROM daysOfWeek "
+        + "WHERE dayid in ("
+        + " SELECT DayOfWeek FROM mytable"
+        + " GROUP BY DayOfWeek"
+        + ")";
+
+    JsonNode response = postQuery(query);
+    assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+    JsonNode receiveNode = response.get("stageStats");
+    
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode sendNode = receiveNode.get("children").get(0);
+    
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    JsonNode mytableLeaf = sendNode.get("children").get(0);
+    Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+    
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+    JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+    
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+    JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+    
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+    
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+        .describedAs("numGroupsLimitReached should be true even when the limit 
is reached on a pipeline breaker")
+        .isEqualTo(true);
+  }
+
+  @Test
+  public void testPipelineBreakerWithoutKeepingStats()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    _helixManager.getConfigAccessor()
+        .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_KEEP_PIPELINE_BREAKER_STATS, 
"false");
+    try {
+      // lets try several times to give helix time to propagate the config 
change
+      for (int i = 0; i < 10; i++) {

Review Comment:
   Can we use the `waitForCondition` pattern from `TestUtils` instead?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,122 @@ public void testNaturalJoinWithNoVirtualColumns()
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = "select * from mytable "
+          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+      
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerKeepsNumGroupsLimitReached()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = ""
+          + "SET numGroupsLimit = 1;"
+          + "SELECT * FROM daysOfWeek "
+          + "WHERE dayid in ("
+          + " SELECT DayOfWeek FROM mytable"
+          + " GROUP BY DayOfWeek"
+          + ")";
+
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+          .describedAs("numGroupsLimitReached should be true even when the 
limit is reached on a pipeline breaker")
+          .isEqualTo(true);
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerWithoutKeepingStats()
+      throws Exception {
+    // lets try several times to give helix time to propagate the config change
+    for (int i = 0; i < 10; i++) {
+      try {
+        String query = "select * from mytable "
+            + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+        JsonNode response = postQuery(query);
+        assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+        JsonNode receiveNode = response.get("stageStats");
+        
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+        JsonNode sendNode = receiveNode.get("children").get(0);
+        
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+        JsonNode mytableLeaf = sendNode.get("children").get(0);
+        
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+        
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+        Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker 
stats are not kept, "
+            + "there should be no children under the leaf node");
+        System.out.println("Successfully verified absence of pipeline breaker 
stats on attempt " + (i + 1));

Review Comment:
   Let's remove this?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,122 @@ public void testNaturalJoinWithNoVirtualColumns()
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = "select * from mytable "
+          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+      
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerKeepsNumGroupsLimitReached()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = ""
+          + "SET numGroupsLimit = 1;"
+          + "SELECT * FROM daysOfWeek "
+          + "WHERE dayid in ("
+          + " SELECT DayOfWeek FROM mytable"
+          + " GROUP BY DayOfWeek"
+          + ")";
+
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+          .describedAs("numGroupsLimitReached should be true even when the 
limit is reached on a pipeline breaker")
+          .isEqualTo(true);
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerWithoutKeepingStats()
+      throws Exception {
+    // lets try several times to give helix time to propagate the config change
+    for (int i = 0; i < 10; i++) {

Review Comment:
   Can we use the `waitForCondition` pattern from `TestUtils` instead?



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2149,6 +2148,122 @@ public void testNaturalJoinWithNoVirtualColumns()
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = "select * from mytable "
+          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+      
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerKeepsNumGroupsLimitReached()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = ""
+          + "SET numGroupsLimit = 1;"
+          + "SELECT * FROM daysOfWeek "
+          + "WHERE dayid in ("
+          + " SELECT DayOfWeek FROM mytable"
+          + " GROUP BY DayOfWeek"
+          + ")";
+
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+          .describedAs("numGroupsLimitReached should be true even when the 
limit is reached on a pipeline breaker")
+          .isEqualTo(true);
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerWithoutKeepingStats()
+      throws Exception {
+    // lets try several times to give helix time to propagate the config change
+    for (int i = 0; i < 10; i++) {
+      try {
+        String query = "select * from mytable "
+            + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+        JsonNode response = postQuery(query);
+        assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+        JsonNode receiveNode = response.get("stageStats");
+        
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+        JsonNode sendNode = receiveNode.get("children").get(0);
+        
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+        JsonNode mytableLeaf = sendNode.get("children").get(0);
+        
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+        
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+        Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker 
stats are not kept, "

Review Comment:
   So if IIUC, the structure would look like `LEAF` <- `MAILBOX_RECEIVE` <- 
`MAILBOX_SEND` <- `LEAF`? And we don't specifically have a `PIPELINE_BREAKER` 
node in the stats?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to