This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/27.0.0 by this push:
     new 6576819e31 Fixing an issue in sequential merge  (#14574)
6576819e31 is described below

commit 6576819e317827e004aa9e9c5a12b11f5c93f7d9
Author: Karan Kumar <[email protected]>
AuthorDate: Wed Jul 12 22:05:30 2023 +0530

    Fixing an issue in sequential merge  (#14574)
    
    * Fixing an issue in sequential merge where workers without any partial key 
statistics would get stuck because controller did not change the worker state.
    
    * Removing empty check
    
    * Adding IT for MSQ sequential bug fix.
    
    (cherry picked from commit 89aee6caaae41ecd99b6d99042bd62427ae4e6cb)
---
 .../apache/druid/msq/exec/WorkerSketchFetcher.java | 35 +++++-----
 .../druid/msq/exec/WorkerSketchFetcherTest.java    | 23 +++----
 .../msq/ITKeyStatisticsSketchMergeMode.java        | 75 ++++++++++++++++++++++
 ...wikipedia_msq_select_query_sequential_test.json | 15 +++++
 4 files changed, 116 insertions(+), 32 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index c4ebfceac1..21b77ee151 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -39,6 +39,7 @@ import 
org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
 import org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation;
 
 import javax.annotation.Nullable;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
@@ -244,29 +245,13 @@ public class WorkerSketchFetcher implements AutoCloseable
       throw new ISE("All worker partial key information not received for 
stage[%d]", stageId.getStageNumber());
     }
 
-    if 
(completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().isEmpty()) {
-      // No time chunks at all: skip fetching.
-      kernelActions.accept(
-          kernel -> {
-            for (final String taskId : tasks) {
-              final int workerNumber = MSQTasks.workerFromTaskId(taskId);
-              kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
-                  stageId,
-                  workerNumber,
-                  ClusterByStatisticsSnapshot.empty()
-              );
-            }
-          }
-      );
-
-      return;
-    }
-
+    final Set<String> noBoundaries = new HashSet<>(tasks);
     
completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk,
 wks) -> {
 
       for (String taskId : tasks) {
         int workerNumber = MSQTasks.workerFromTaskId(taskId);
         if (wks.contains(workerNumber)) {
+          noBoundaries.remove(taskId);
           executorService.submit(() -> {
             fetchStatsFromWorker(
                 kernelActions,
@@ -285,10 +270,24 @@ public class WorkerSketchFetcher implements AutoCloseable
                 ),
                 retryOperation
             );
+
           });
         }
       }
     });
+
+    // if the worker did not get any records, update the state of the worker
+    for (String taskId : noBoundaries) {
+      kernelActions.accept(
+          kernel -> {
+            final int workerNumber = MSQTasks.workerFromTaskId(taskId);
+            kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
+                stageId,
+                workerNumber,
+                ClusterByStatisticsSnapshot.empty()
+            );
+          });
+    }
   }
 
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
index bcfb692daf..592fd089ef 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
@@ -113,8 +113,7 @@ public class WorkerSketchFetcherTest
       latch.countDown();
     }, stageDefinition.getId(), ImmutableSet.copyOf(TASK_IDS), ((queryKernel, 
integer, msqFault) -> {}));
 
-    latch.await(5, TimeUnit.SECONDS);
-    Assert.assertEquals(0, latch.getCount());
+    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
 
   }
 
@@ -122,7 +121,7 @@ public class WorkerSketchFetcherTest
   public void test_submitFetcherTask_sequentialFetch() throws 
InterruptedException
   {
     doReturn(true).when(completeKeyStatisticsInformation).isComplete();
-    final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1);
+    final CountDownLatch latch = new CountDownLatch(TASK_IDS.size());
 
     target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher, 
true));
 
@@ -143,8 +142,8 @@ public class WorkerSketchFetcherTest
         ((queryKernel, integer, msqFault) -> {})
     );
 
-    latch.await(5, TimeUnit.SECONDS);
-    Assert.assertEquals(0, latch.getCount());
+    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+
   }
 
   @Test
@@ -186,17 +185,15 @@ public class WorkerSketchFetcherTest
         })
     );
 
-    latch.await(5, TimeUnit.SECONDS);
-    retryLatch.await(5, TimeUnit.SECONDS);
-    Assert.assertEquals(0, latch.getCount());
-    Assert.assertEquals(0, retryLatch.getCount());
+    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+    Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
   }
 
   @Test
   public void test_SequentialRetryEnabled_retryInvoked() throws 
InterruptedException
   {
     doReturn(true).when(completeKeyStatisticsInformation).isComplete();
-    final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1);
+    final CountDownLatch latch = new CountDownLatch(TASK_IDS.size());
 
     target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher, 
true));
 
@@ -217,10 +214,8 @@ public class WorkerSketchFetcherTest
         })
     );
 
-    latch.await(5, TimeUnit.SECONDS);
-    retryLatch.await(5, TimeUnit.SECONDS);
-    Assert.assertEquals(0, latch.getCount());
-    Assert.assertEquals(0, retryLatch.getCount());
+    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+    Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
   }
 
   @Test
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
index bccde84099..b008015890 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
@@ -190,4 +190,79 @@ public class ITKeyStatisticsSketchMergeMode
 
     msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
   }
+
+
+  @Test
+  public void testMsqIngestionSequentialMergingWithEmptyStatistics() throws 
Exception
+  {
+    String datasource = "dst";
+
+    // Clear up the datasource from the previous runs
+    coordinatorClient.unloadSegmentsForDataSource(datasource);
+
+    String queryLocal =
+        StringUtils.format(
+            "Replace INTO %s overwrite ALL \n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
+            + "  )\n"
+            + ")\n"
+            + "where delta=111 "
+            // we add this filter since delta=111 is only present in 
wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
+            + "PARTITIONED BY DAY\n"
+            + "CLUSTERED BY \"__time\"",
+            datasource
+        );
+
+    ImmutableMap<String, Object> context = ImmutableMap.of(
+        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        ClusterStatisticsMergeMode.SEQUENTIAL,
+        MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+        3
+    );
+
+    // Submit the task and wait for the datasource to get loaded
+    SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false, 
context, null);
+    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlQuery);
+
+    if (sqlTaskStatus.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          sqlTaskStatus.getError()
+      ));
+    }
+
+    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+
+    
msqHelper.testQueriesFromFile("/multi-stage-query/wikipedia_msq_select_query_sequential_test.json",
 datasource);
+  }
 }
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json
new file mode 100644
index 0000000000..c50ea09ad2
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json
@@ -0,0 +1,15 @@
+[
+  {
+    "query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM 
%%DATASOURCE%%",
+    "expectedResults": [
+      {
+        "__time": 1377933081000,
+        "isRobot": "",
+        "added": 123,
+        "delta": 111,
+        "deleted": 12,
+        "namespace":"article"
+      }
+    ]
+  }
+]
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to