LakshSingla commented on code in PR #13353: URL: https://github.com/apache/druid/pull/13353#discussion_r1051751976
########## core/src/main/java/org/apache/druid/java/util/common/function/TriConsumer.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.function; + +import java.util.Objects; + +/** + * Based on {@link java.util.function.BiConsumer} + */ +@FunctionalInterface +public interface TriConsumer<T, U, V> +{ + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument + * @param u the second input argument + * @param v the third input argument + */ + void accept(T t, U u, V v); + + /** + * Returns a composed {@code BiConsumer} that performs, in sequence, this Review Comment: I think we are returning a TriConsumer from the method and this should be updated. Same for the definition of `@return` ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.SqlResourceTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.utils.DruidClusterAdminClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITMultiStageQueryHA Review Comment: Can you please elaborate on what `HA` at the end is? ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.SqlResourceTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.utils.DruidClusterAdminClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITMultiStageQueryHA +{ + private static final Logger LOG = new Logger(ITMultiStageQueryHA.class); + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + private SqlResourceTestClient msqClient; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + @Inject + private DruidClusterAdminClient druidClusterAdminClient; + + private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query_ha.json"; + + @Test + public void testMsqIngestionAndQuerying() throws Exception + { + String datasource = "dst"; + + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource); + + String queryLocal = + StringUtils.format( + "INSERT INTO %s\n" + + "SELECT\n" + + " TIME_PARSE(\"timestamp\") AS __time,\n" + + " isRobot,\n" + + " diffUrl,\n" + + " added,\n" + + " countryIsoCode,\n" + + " regionName,\n" + + " channel,\n" + + " flags,\n" + + " delta,\n" + + " isUnpatrolled,\n" + + " isNew,\n" + + " deltaBucket,\n" + + " isMinor,\n" + + " isAnonymous,\n" + + " deleted,\n" + + " cityName,\n" + + " metroCode,\n" + + " namespace,\n" + + " comment,\n" + + " page,\n" + + " commentLength,\n" + + " countryName,\n" + + " user,\n" + + " regionIsoCode\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n" + + " '{\"type\":\"json\"}',\n" + + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\": \"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n" + + " )\n" + + ")\n" + + "PARTITIONED BY DAY\n" + + "CLUSTERED BY \"__time\"", + datasource + ); + + // Submit the task and wait for the datasource to get loaded + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask( + queryLocal, + ImmutableMap.of( + MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, + "true", + MultiStageQueryContext.CTX_MAX_NUM_TASKS, + 3 + ) + ); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + + + String taskIdToKill = sqlTaskStatus.getTaskId() + "-worker1_0"; + killTaskAbruptly(taskIdToKill); + + msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + + msqHelper.testQueriesFromFile(QUERY_FILE, datasource); + } + + private void killTaskAbruptly(String taskIdToKill) Review Comment: Should this be moved to the IT Framework's core methods instead of being in a specific IT test? Also, is there a better way to do it without invoking the command? ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.SqlResourceTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.utils.DruidClusterAdminClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITMultiStageQueryHA +{ + private static final Logger LOG = new Logger(ITMultiStageQueryHA.class); + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + private SqlResourceTestClient msqClient; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + @Inject + private DruidClusterAdminClient druidClusterAdminClient; + + private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query_ha.json"; + + @Test + public void testMsqIngestionAndQuerying() throws Exception + { + String datasource = "dst"; + + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource); + + String queryLocal = + StringUtils.format( + "INSERT INTO %s\n" + + "SELECT\n" + + " TIME_PARSE(\"timestamp\") AS __time,\n" + + " isRobot,\n" + + " diffUrl,\n" + + " added,\n" + + " countryIsoCode,\n" + + " regionName,\n" + + " channel,\n" + + " flags,\n" + + " delta,\n" + + " isUnpatrolled,\n" + + " isNew,\n" + + " deltaBucket,\n" + + " isMinor,\n" + + " isAnonymous,\n" + + " deleted,\n" + + " cityName,\n" + + " metroCode,\n" + + " namespace,\n" + + " comment,\n" + + " page,\n" + + " commentLength,\n" + + " countryName,\n" + + " user,\n" + + " regionIsoCode\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n" + + " '{\"type\":\"json\"}',\n" + + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\": \"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n" + + " )\n" + + ")\n" + + "PARTITIONED BY DAY\n" + + "CLUSTERED BY \"__time\"", + datasource + ); + + // Submit the task and wait for the datasource to get loaded + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask( + queryLocal, + ImmutableMap.of( + MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, + "true", + MultiStageQueryContext.CTX_MAX_NUM_TASKS, + 3 + ) + ); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + + + String taskIdToKill = sqlTaskStatus.getTaskId() + "-worker1_0"; + killTaskAbruptly(taskIdToKill); + + msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + + msqHelper.testQueriesFromFile(QUERY_FILE, datasource); + } + + private void killTaskAbruptly(String taskIdToKill) + { + + String command = "jps -mlv | grep -i peon | grep -i " + taskIdToKill + " |awk '{print $1}'"; + + ITRetryUtil.retryUntil(() -> { + + Pair<String, String> stdOut = druidClusterAdminClient.runCommandInMiddleManagerContainer("/bin/bash", "-c", Review Comment: I think we should rename the method's name here since we are running the command in a specific container however the method doesn't signify as such. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java: ########## @@ -68,4 +68,20 @@ * Maximum size of the kernel manipulation queue in {@link org.apache.druid.msq.indexing.MSQControllerTask}. */ public static final int MAX_KERNEL_MANIPULATION_QUEUE_SIZE = 100_000; + + /** + * Maximum relaunches across all workers. + */ + public static final int TOTAL_RELAUNCH_LIMIT = 100; + + /** + * Maximum relaunches per worker. Initial run is not a relaunch. The worker will be spawned 1 + workerRelaunchLimit times before erroring out. + */ + public static final int PER_WORKER_RELAUNCH_LIMIT = 2; Review Comment: Should this be increased? This seems a bit low, what has been your experience with the same? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java: ########## @@ -65,9 +71,26 @@ public static String controllerTaskId(@Nullable final String queryId) /** * Returns a worker task ID given a SQL query id. */ - public static String workerTaskId(final String controllerTaskId, final int workerNumber) + public static String workerTaskId(final String controllerTaskId, final int workerNumber, final int retryCount) { - return StringUtils.format("%s-worker%d", controllerTaskId, workerNumber); + return StringUtils.format("%s-worker%d_%d", controllerTaskId, workerNumber, retryCount); + } + + /** + * Extract worker from taskId or throw exception if unable to parse out the worker. + */ + public static int workerFromTaskId(final String taskId) + { + final Matcher matcher = WORKER_PATTERN.matcher(taskId); + if (matcher.matches()) { + return Integer.parseInt(matcher.group(1)); Review Comment: If we can use the named group in the regex, then we won't have to use the stray reference to `1` and the code would remain valid irrespective of regex. ```suggestion return Integer.parseInt(matcher.group("workerNumber")); ``` ########## docs/multi-stage-query/reference.md: ########## @@ -234,6 +234,9 @@ The following table lists query limits: | Number of output columns for any one stage. | 2,000 | `TooManyColumns` | | Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers` | | Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` | +| Maximum relaunch attempts per worker. Initial run is not a relaunch. The worker will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 | `TooManyAttemptsForWorker` | +| Maximum relaunch attempts for a job across all workers. | 100 | `TooManyAttemptsForJob` | Review Comment: I think the limit here is outdated. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java: ########## @@ -58,10 +58,14 @@ public static final String CTX_FINALIZE_AGGREGATIONS = "finalizeAggregations"; private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true; - public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; + public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; Review Comment: Since we now have a fault-tolerant mode that is one step ahead of the shuffle storage, does it make sense to deprecate the durable shuffle storage mode entirely since the time benefits of just using the shuffle storage seem negligible in comparison to using the fault-tolerant mode? (My understanding is that most of the time would be spent in writing/reading from the durable storage which both cases do however with fault tolerance we also enable extra code paths which allow worker retry) ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java: ########## @@ -54,6 +58,8 @@ private static final String TASK_ID_PREFIX = "query-"; + private static final Pattern WORKER_PATTERN = Pattern.compile(".*-worker([0-9]+)_[0-9]+"); Review Comment: One more question is the taskId sure not to contain a `-`, otherwise, the regex logic might not work as expected. ########## core/src/test/java/org/apache/druid/java/util/common/function/TriConsumerTest.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.function; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class TriConsumerTest +{ + + @Test + public void sanityTest() Review Comment: I think there should be a test where the `consumerA` throws an exception after adding to the set, and then we should see if `consumerB` runs or not. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryHA.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.SqlResourceTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.utils.DruidClusterAdminClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITMultiStageQueryHA +{ + private static final Logger LOG = new Logger(ITMultiStageQueryHA.class); + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + private SqlResourceTestClient msqClient; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + @Inject + private DruidClusterAdminClient druidClusterAdminClient; + + private static final String QUERY_FILE = "/multi-stage-query/wikipedia_msq_select_query_ha.json"; + + @Test + public void testMsqIngestionAndQuerying() throws Exception + { + String datasource = "dst"; + + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource); + + String queryLocal = + StringUtils.format( + "INSERT INTO %s\n" + + "SELECT\n" + + " TIME_PARSE(\"timestamp\") AS __time,\n" + + " isRobot,\n" + + " diffUrl,\n" + + " added,\n" + + " countryIsoCode,\n" + + " regionName,\n" + + " channel,\n" + + " flags,\n" + + " delta,\n" + + " isUnpatrolled,\n" + + " isNew,\n" + + " deltaBucket,\n" + + " isMinor,\n" + + " isAnonymous,\n" + + " deleted,\n" + + " cityName,\n" + + " metroCode,\n" + + " namespace,\n" + + " comment,\n" + + " page,\n" + + " commentLength,\n" + + " countryName,\n" + + " user,\n" + + " regionIsoCode\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n" + + " '{\"type\":\"json\"}',\n" + + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\": \"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n" + + " )\n" + + ")\n" + + "PARTITIONED BY DAY\n" + + "CLUSTERED BY \"__time\"", + datasource + ); + + // Submit the task and wait for the datasource to get loaded + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask( + queryLocal, + ImmutableMap.of( + MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, + "true", + MultiStageQueryContext.CTX_MAX_NUM_TASKS, + 3 + ) + ); + + if (sqlTaskStatus.getState().isFailure()) { Review Comment: Since we are trying to coordinate between the task running and the IT running, I think we should explore into the `SLEEP()` function. I encountered this in `ITSQLCancelTest`, which allows the queries to run for a certain amount of time. Taken from ITSqlCancelTest ``` /** * This query will run exactly for 15 seconds. */ private static final String QUERY = "SELECT sleep(CASE WHEN added > 0 THEN 1 ELSE 0 END) FROM wikipedia_editstream WHERE added > 0 LIMIT 15"; ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java: ########## @@ -54,6 +58,8 @@ private static final String TASK_ID_PREFIX = "query-"; + private static final Pattern WORKER_PATTERN = Pattern.compile(".*-worker([0-9]+)_[0-9]+"); Review Comment: nit: I think there should be an explanation behind the pattern, or sample worker patterns which it should match. Also, there can be named groups, I think those should be used to make it clearer. ```suggestion private static final Pattern WORKER_PATTERN = Pattern.compile(".*-worker(?<workerNumber>[0-9]+)_[0-9]+"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
