This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/27.0.0 by this push:
new 6576819e31 Fixing an issue in sequential merge (#14574)
6576819e31 is described below
commit 6576819e317827e004aa9e9c5a12b11f5c93f7d9
Author: Karan Kumar <[email protected]>
AuthorDate: Wed Jul 12 22:05:30 2023 +0530
Fixing an issue in sequential merge (#14574)
* Fixing an issue in sequential merge where workers without any partial key
statistics would get stuck because controller did not change the worker state.
* Removing empty check
* Adding IT for MSQ sequential bug fix.
(cherry picked from commit 89aee6caaae41ecd99b6d99042bd62427ae4e6cb)
---
.../apache/druid/msq/exec/WorkerSketchFetcher.java | 35 +++++-----
.../druid/msq/exec/WorkerSketchFetcherTest.java | 23 +++----
.../msq/ITKeyStatisticsSketchMergeMode.java | 75 ++++++++++++++++++++++
...wikipedia_msq_select_query_sequential_test.json | 15 +++++
4 files changed, 116 insertions(+), 32 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index c4ebfceac1..21b77ee151 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -39,6 +39,7 @@ import
org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -244,29 +245,13 @@ public class WorkerSketchFetcher implements AutoCloseable
throw new ISE("All worker partial key information not received for
stage[%d]", stageId.getStageNumber());
}
- if
(completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().isEmpty()) {
- // No time chunks at all: skip fetching.
- kernelActions.accept(
- kernel -> {
- for (final String taskId : tasks) {
- final int workerNumber = MSQTasks.workerFromTaskId(taskId);
- kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
- stageId,
- workerNumber,
- ClusterByStatisticsSnapshot.empty()
- );
- }
- }
- );
-
- return;
- }
-
+ final Set<String> noBoundaries = new HashSet<>(tasks);
completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk,
wks) -> {
for (String taskId : tasks) {
int workerNumber = MSQTasks.workerFromTaskId(taskId);
if (wks.contains(workerNumber)) {
+ noBoundaries.remove(taskId);
executorService.submit(() -> {
fetchStatsFromWorker(
kernelActions,
@@ -285,10 +270,24 @@ public class WorkerSketchFetcher implements AutoCloseable
),
retryOperation
);
+
});
}
}
});
+
+ // if the worker did not get any records, update the state of the worker
+ for (String taskId : noBoundaries) {
+ kernelActions.accept(
+ kernel -> {
+ final int workerNumber = MSQTasks.workerFromTaskId(taskId);
+ kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
+ stageId,
+ workerNumber,
+ ClusterByStatisticsSnapshot.empty()
+ );
+ });
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
index bcfb692daf..592fd089ef 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
@@ -113,8 +113,7 @@ public class WorkerSketchFetcherTest
latch.countDown();
}, stageDefinition.getId(), ImmutableSet.copyOf(TASK_IDS), ((queryKernel,
integer, msqFault) -> {}));
- latch.await(5, TimeUnit.SECONDS);
- Assert.assertEquals(0, latch.getCount());
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@@ -122,7 +121,7 @@ public class WorkerSketchFetcherTest
public void test_submitFetcherTask_sequentialFetch() throws
InterruptedException
{
doReturn(true).when(completeKeyStatisticsInformation).isComplete();
- final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1);
+ final CountDownLatch latch = new CountDownLatch(TASK_IDS.size());
target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher,
true));
@@ -143,8 +142,8 @@ public class WorkerSketchFetcherTest
((queryKernel, integer, msqFault) -> {})
);
- latch.await(5, TimeUnit.SECONDS);
- Assert.assertEquals(0, latch.getCount());
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+
}
@Test
@@ -186,17 +185,15 @@ public class WorkerSketchFetcherTest
})
);
- latch.await(5, TimeUnit.SECONDS);
- retryLatch.await(5, TimeUnit.SECONDS);
- Assert.assertEquals(0, latch.getCount());
- Assert.assertEquals(0, retryLatch.getCount());
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_SequentialRetryEnabled_retryInvoked() throws
InterruptedException
{
doReturn(true).when(completeKeyStatisticsInformation).isComplete();
- final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1);
+ final CountDownLatch latch = new CountDownLatch(TASK_IDS.size());
target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher,
true));
@@ -217,10 +214,8 @@ public class WorkerSketchFetcherTest
})
);
- latch.await(5, TimeUnit.SECONDS);
- retryLatch.await(5, TimeUnit.SECONDS);
- Assert.assertEquals(0, latch.getCount());
- Assert.assertEquals(0, retryLatch.getCount());
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
}
@Test
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
index bccde84099..b008015890 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
@@ -190,4 +190,79 @@ public class ITKeyStatisticsSketchMergeMode
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}
+
+
+ @Test
+ public void testMsqIngestionSequentialMergingWithEmptyStatistics() throws
Exception
+ {
+ String datasource = "dst";
+
+ // Clear up the datasource from the previous runs
+ coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+ String queryLocal =
+ StringUtils.format(
+ "Replace INTO %s overwrite ALL \n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
+ + " )\n"
+ + ")\n"
+ + "where delta=111 "
+ // we add this filter since delta=111 is only present in
wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
+ + "PARTITIONED BY DAY\n"
+ + "CLUSTERED BY \"__time\"",
+ datasource
+ );
+
+ ImmutableMap<String, Object> context = ImmutableMap.of(
+ MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+ ClusterStatisticsMergeMode.SEQUENTIAL,
+ MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+ 3
+ );
+
+ // Submit the task and wait for the datasource to get loaded
+ SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false,
context, null);
+ SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlQuery);
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+
+ msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+
msqHelper.testQueriesFromFile("/multi-stage-query/wikipedia_msq_select_query_sequential_test.json",
datasource);
+ }
}
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json
new file mode 100644
index 0000000000..c50ea09ad2
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json
@@ -0,0 +1,15 @@
+[
+ {
+ "query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM
%%DATASOURCE%%",
+ "expectedResults": [
+ {
+ "__time": 1377933081000,
+ "isRobot": "",
+ "added": 123,
+ "delta": 111,
+ "deleted": 12,
+ "namespace":"article"
+ }
+ ]
+ }
+]
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]