LakshSingla commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1051751976


##########
core/src/main/java/org/apache/druid/java/util/common/function/TriConsumer.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.function;
+
+import java.util.Objects;
+
+/**
+ * Based on {@link java.util.function.BiConsumer}
+ */
+@FunctionalInterface
+public interface TriConsumer<T, U, V>
+{
+  /**
+   * Performs this operation on the given arguments.
+   *
+   * @param t the first input argument
+   * @param u the second input argument
+   * @param v the third input argument
+   */
+  void accept(T t, U u, V v);
+
+  /**
+   * Returns a composed {@code BiConsumer} that performs, in sequence, this

Review Comment:
   I think we are returning a TriConsumer from the method and this should be 
updated. Same for the definition of `@return`



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITMultiStageQueryHA

Review Comment:
   Can you please elaborate on what `HA` at the end is? 



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITMultiStageQueryHA
+{
+  private static final Logger LOG = new Logger(ITMultiStageQueryHA.class);
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  private SqlResourceTestClient msqClient;
+
+  @Inject
+  private IntegrationTestingConfig config;
+
+  @Inject
+  private ObjectMapper jsonMapper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Inject
+  private CoordinatorResourceTestClient coordinatorClient;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private static final String QUERY_FILE = 
"/multi-stage-query/wikipedia_msq_select_query_ha.json";
+
+  @Test
+  public void testMsqIngestionAndQuerying() throws Exception
+  {
+    String datasource = "dst";
+
+    // Clear up the datasource from the previous runs
+    coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+    String queryLocal =
+        StringUtils.format(
+            "INSERT INTO %s\n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":
 
\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+            + "  )\n"
+            + ")\n"
+            + "PARTITIONED BY DAY\n"
+            + "CLUSTERED BY \"__time\"",
+            datasource
+        );
+
+    // Submit the task and wait for the datasource to get loaded
+    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(
+        queryLocal,
+        ImmutableMap.of(
+            MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
+            "true",
+            MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+            3
+        )
+    );
+
+    if (sqlTaskStatus.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          sqlTaskStatus.getError()
+      ));
+    }
+
+
+    String taskIdToKill = sqlTaskStatus.getTaskId() + "-worker1_0";
+    killTaskAbruptly(taskIdToKill);
+
+    msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+  }
+
+  private void killTaskAbruptly(String taskIdToKill)

Review Comment:
   Should this be moved to the IT Framework's core methods instead of being in 
a specific IT test? Also, is there a better way to do it without invoking the 
command? 



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITMultiStageQueryHA
+{
+  private static final Logger LOG = new Logger(ITMultiStageQueryHA.class);
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  private SqlResourceTestClient msqClient;
+
+  @Inject
+  private IntegrationTestingConfig config;
+
+  @Inject
+  private ObjectMapper jsonMapper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Inject
+  private CoordinatorResourceTestClient coordinatorClient;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private static final String QUERY_FILE = 
"/multi-stage-query/wikipedia_msq_select_query_ha.json";
+
+  @Test
+  public void testMsqIngestionAndQuerying() throws Exception
+  {
+    String datasource = "dst";
+
+    // Clear up the datasource from the previous runs
+    coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+    String queryLocal =
+        StringUtils.format(
+            "INSERT INTO %s\n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":
 
\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+            + "  )\n"
+            + ")\n"
+            + "PARTITIONED BY DAY\n"
+            + "CLUSTERED BY \"__time\"",
+            datasource
+        );
+
+    // Submit the task and wait for the datasource to get loaded
+    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(
+        queryLocal,
+        ImmutableMap.of(
+            MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
+            "true",
+            MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+            3
+        )
+    );
+
+    if (sqlTaskStatus.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          sqlTaskStatus.getError()
+      ));
+    }
+
+
+    String taskIdToKill = sqlTaskStatus.getTaskId() + "-worker1_0";
+    killTaskAbruptly(taskIdToKill);
+
+    msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
+  }
+
+  private void killTaskAbruptly(String taskIdToKill)
+  {
+
+    String command = "jps -mlv | grep -i peon | grep -i " + taskIdToKill + " 
|awk  '{print  $1}'";
+
+    ITRetryUtil.retryUntil(() -> {
+
+      Pair<String, String> stdOut = 
druidClusterAdminClient.runCommandInMiddleManagerContainer("/bin/bash", "-c",

Review Comment:
   I think we should rename the method's name here since we are running the 
command in a specific container however the method doesn't signify as  such.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java:
##########
@@ -68,4 +68,20 @@
    * Maximum size of the kernel manipulation queue in {@link 
org.apache.druid.msq.indexing.MSQControllerTask}.
    */
   public static final int MAX_KERNEL_MANIPULATION_QUEUE_SIZE = 100_000;
+
+  /**
+   * Maximum relaunches across all workers.
+   */
+  public static final int TOTAL_RELAUNCH_LIMIT = 100;
+
+  /**
+   * Maximum relaunches per worker. Initial run is not a relaunch. The worker 
will be spawned 1 + workerRelaunchLimit times before erroring out.
+   */
+  public static final int PER_WORKER_RELAUNCH_LIMIT = 2;

Review Comment:
   Should this be increased? This seems a bit low, what has been your 
experience with the same?  



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java:
##########
@@ -65,9 +71,26 @@ public static String controllerTaskId(@Nullable final String 
queryId)
   /**
    * Returns a worker task ID given a SQL query id.
    */
-  public static String workerTaskId(final String controllerTaskId, final int 
workerNumber)
+  public static String workerTaskId(final String controllerTaskId, final int 
workerNumber, final int retryCount)
   {
-    return StringUtils.format("%s-worker%d", controllerTaskId, workerNumber);
+    return StringUtils.format("%s-worker%d_%d", controllerTaskId, 
workerNumber, retryCount);
+  }
+
+  /**
+   * Extract worker from taskId or throw exception if unable to parse out the 
worker.
+   */
+  public static int workerFromTaskId(final String taskId)
+  {
+    final Matcher matcher = WORKER_PATTERN.matcher(taskId);
+    if (matcher.matches()) {
+      return Integer.parseInt(matcher.group(1));

Review Comment:
   If we can use the named group in the regex, then we won't have to use the 
stray reference to `1` and the code would remain valid irrespective of regex.
   ```suggestion
         return Integer.parseInt(matcher.group("workerNumber"));
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -234,6 +234,9 @@ The following table lists query limits:
 | Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
 | Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent 
soft limit may be lower. | `TooManyWorkers` |
 | Maximum memory occupied by broadcasted tables. | 30% of each [processor 
memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |
+| Maximum relaunch attempts per worker. Initial run is not a relaunch. The 
worker will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 | 
`TooManyAttemptsForWorker` |
+| Maximum relaunch attempts for a job across all workers. | 100 | 
`TooManyAttemptsForJob` |

Review Comment:
   I think the limit here is outdated. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -58,10 +58,14 @@
   public static final String CTX_FINALIZE_AGGREGATIONS = 
"finalizeAggregations";
   private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true;
 
-  public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE = 
"durableShuffleStorage";
+  public static final String CTX_DURABLE_SHUFFLE_STORAGE = 
"durableShuffleStorage";

Review Comment:
   Since we now have a fault-tolerant mode that is one step ahead of the 
shuffle storage, does it make sense to deprecate the durable shuffle storage 
mode entirely since the time benefits of just using the shuffle storage seem 
negligible in comparison to using the fault-tolerant mode? (My understanding is 
that most of the time would be spent in writing/reading from the durable 
storage which both cases do however with fault tolerance we also enable extra 
code paths which allow worker retry)  



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java:
##########
@@ -54,6 +58,8 @@
 
   private static final String TASK_ID_PREFIX = "query-";
 
+  private static final Pattern WORKER_PATTERN = 
Pattern.compile(".*-worker([0-9]+)_[0-9]+");

Review Comment:
   One more question is the taskId sure not to contain a `-`, otherwise, the 
regex logic might not work as expected. 



##########
core/src/test/java/org/apache/druid/java/util/common/function/TriConsumerTest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.function;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class TriConsumerTest
+{
+
+  @Test
+  public void sanityTest()

Review Comment:
   I think there should be a test where the `consumerA` throws an exception 
after adding to the set, and then we should see if `consumerB` runs or not. 



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITMultiStageQueryHA
+{
+  private static final Logger LOG = new Logger(ITMultiStageQueryHA.class);
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  private SqlResourceTestClient msqClient;
+
+  @Inject
+  private IntegrationTestingConfig config;
+
+  @Inject
+  private ObjectMapper jsonMapper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Inject
+  private CoordinatorResourceTestClient coordinatorClient;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private static final String QUERY_FILE = 
"/multi-stage-query/wikipedia_msq_select_query_ha.json";
+
+  @Test
+  public void testMsqIngestionAndQuerying() throws Exception
+  {
+    String datasource = "dst";
+
+    // Clear up the datasource from the previous runs
+    coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+    String queryLocal =
+        StringUtils.format(
+            "INSERT INTO %s\n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":
 
\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+            + "  )\n"
+            + ")\n"
+            + "PARTITIONED BY DAY\n"
+            + "CLUSTERED BY \"__time\"",
+            datasource
+        );
+
+    // Submit the task and wait for the datasource to get loaded
+    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(
+        queryLocal,
+        ImmutableMap.of(
+            MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
+            "true",
+            MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+            3
+        )
+    );
+
+    if (sqlTaskStatus.getState().isFailure()) {

Review Comment:
   Since we are trying to coordinate between the task running and the IT 
running, I think we should explore into the `SLEEP()` function. I encountered 
this in `ITSQLCancelTest`, which allows the queries to run for a certain amount 
of time. 
   Taken from ITSqlCancelTest
   ```
     /**
      * This query will run exactly for 15 seconds.
      */
   
     private static final String QUERY
         = "SELECT sleep(CASE WHEN added > 0 THEN 1 ELSE 0 END) FROM 
wikipedia_editstream WHERE added > 0 LIMIT 15";
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java:
##########
@@ -54,6 +58,8 @@
 
   private static final String TASK_ID_PREFIX = "query-";
 
+  private static final Pattern WORKER_PATTERN = 
Pattern.compile(".*-worker([0-9]+)_[0-9]+");

Review Comment:
   nit: I think there should be an explanation behind the pattern, or sample 
worker patterns which it should match. Also, there can be named groups, I think 
those should be used to make it clearer. 
   
   ```suggestion
     private static final Pattern WORKER_PATTERN = 
Pattern.compile(".*-worker(?<workerNumber>[0-9]+)_[0-9]+");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to