This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e2e0cb905cf Add reasoning for choosing shardSpec to the MSQ report 
(#16175)
e2e0cb905cf is described below

commit e2e0cb905cf237f6062e44fde2ec5d501d6cb834
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Apr 9 11:32:02 2024 +0530

    Add reasoning for choosing shardSpec to the MSQ report (#16175)
    
    This PR logs the segment type and reason chosen. It also adds it to the 
query report, to be displayed in the UI.
    
    This PR adds a new section to the reports, segmentReport. This contains the 
segment type created, if the query is an ingestion, and null otherwise.
---
 docs/api-reference/sql-ingestion-api.md            |  9 ++-
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 53 ++++++++++-----
 .../msq/indexing/report/MSQSegmentReport.java      | 78 ++++++++++++++++++++++
 .../druid/msq/indexing/report/MSQStatusReport.java | 15 ++++-
 .../org/apache/druid/msq/exec/MSQInsertTest.java   |  8 +++
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 21 +++++-
 .../msq/indexing/report/MSQTaskReportTest.java     |  6 +-
 .../sql/resources/SqlStatementResourceTest.java    |  2 +
 .../org/apache/druid/msq/test/MSQTestBase.java     | 11 +++
 .../msq/util/SqlStatementResourceHelperTest.java   |  8 +++
 10 files changed, 189 insertions(+), 22 deletions(-)

diff --git a/docs/api-reference/sql-ingestion-api.md 
b/docs/api-reference/sql-ingestion-api.md
index 988f860a85d..67a6ccf9588 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -299,7 +299,7 @@ The response shows an example report for a query.
         },
         "pendingTasks": 0,
         "runningTasks": 2,
-        "segmentLoadStatus": {
+        "segmentLoadWaiterStatus": {
           "state": "SUCCESS",
           "dataSource": "kttm_simple",
           "startTime": "2022-09-14T23:12:09.266Z",
@@ -310,6 +310,10 @@ The response shows an example report for a query.
           "onDemandSegments": 0,
           "pendingSegments": 0,
           "unknownSegments": 0
+        },
+        "segmentReport": {
+          "shardSpec": "NumberedShardSpec",
+          "details": "Cannot use RangeShardSpec, RangedShardSpec only supports 
string CLUSTER BY keys. Using NumberedShardSpec instead."
         }
       },
       "stages": [
@@ -631,6 +635,9 @@ The following table describes the response fields when you 
retrieve a report for
 | `multiStageQuery.payload.status.segmentLoadStatus.onDemandSegments` | The 
number of segments which are not loaded on any historical, as per the load 
rules. |
 | `multiStageQuery.payload.status.segmentLoadStatus.pendingSegments` | The 
number of segments remaining to be loaded. |
 | `multiStageQuery.payload.status.segmentLoadStatus.unknownSegments` | The 
number of segments whose status is unknown. |
+| `multiStageQuery.payload.status.segmentReport` | Segment report. Only 
present if the query is an ingestion. |
+| `multiStageQuery.payload.status.segmentReport.shardSpec` | Contains the 
shard spec chosen. |
+| `multiStageQuery.payload.status.segmentReport.details` | Contains further 
reasoning about the shard spec chosen. |
 | `multiStageQuery.payload.status.errorReport` | Error object. Only present if 
there was an error. |
 | `multiStageQuery.payload.status.errorReport.taskId` | The task that reported 
the error, if known. May be a controller task or a worker task. |
 | `multiStageQuery.payload.status.errorReport.host` | The hostname and port of 
the task that reported the error, if known. |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 6db998bfef2..c29259e318c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -130,6 +130,7 @@ import org.apache.druid.msq.indexing.error.UnknownFault;
 import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
 import 
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
 import org.apache.druid.msq.indexing.report.MSQStagesReport;
 import org.apache.druid.msq.indexing.report.MSQStatusReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
@@ -313,6 +314,8 @@ public class ControllerImpl implements Controller
   private final boolean isFaultToleranceEnabled;
   private final boolean isFailOnEmptyInsertEnabled;
   private volatile SegmentLoadStatusFetcher segmentLoadWaiter;
+  @Nullable
+  private MSQSegmentReport segmentReport;
 
   public ControllerImpl(
       final MSQControllerTask task,
@@ -565,7 +568,8 @@ public class ControllerImpl implements Controller
               queryStartTime,
               new Interval(queryStartTime, 
DateTimes.nowUtc()).toDurationMillis(),
               workerTaskLauncher,
-              segmentLoadWaiter
+              segmentLoadWaiter,
+              segmentReport
           ),
           stagesReport,
           countersSnapshot,
@@ -935,7 +939,8 @@ public class ControllerImpl implements Controller
                     queryStartTime,
                     queryStartTime == null ? -1L : new 
Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
                     workerTaskLauncher,
-                    segmentLoadWaiter
+                    segmentLoadWaiter,
+                    segmentReport
                 ),
                 makeStageReport(
                     queryDef,
@@ -1015,6 +1020,11 @@ public class ControllerImpl implements Controller
 
     String previousSegmentId = null;
 
+    segmentReport = new MSQSegmentReport(
+        NumberedShardSpec.class.getSimpleName(),
+        "Using NumberedShardSpec to generate segments since the query is 
inserting rows."
+    );
+
     for (ClusterByPartition partitionBoundary : partitionBoundaries) {
       final DateTime timestamp = getBucketDateTime(partitionBoundary, 
segmentGranularity, keyReader);
       final SegmentIdWithShardSpec allocation;
@@ -1099,12 +1109,16 @@ public class ControllerImpl implements Controller
     final SegmentIdWithShardSpec[] retVal = new 
SegmentIdWithShardSpec[partitionBoundaries.size()];
     final Granularity segmentGranularity = destination.getSegmentGranularity();
     final List<String> shardColumns;
-
-    if (mayHaveMultiValuedClusterByFields) {
-      // DimensionRangeShardSpec cannot handle multi-valued fields.
-      shardColumns = Collections.emptyList();
+    final Pair<List<String>, String> shardReasonPair;
+
+    shardReasonPair = computeShardColumns(signature, clusterBy, 
task.getQuerySpec().getColumnMappings(), mayHaveMultiValuedClusterByFields);
+    shardColumns = shardReasonPair.lhs;
+    String reason = shardReasonPair.rhs;
+    log.info(StringUtils.format("ShardSpec chosen: %s", reason));
+    if (shardColumns.isEmpty()) {
+      segmentReport = new 
MSQSegmentReport(NumberedShardSpec.class.getSimpleName(), reason);
     } else {
-      shardColumns = computeShardColumns(signature, clusterBy, 
task.getQuerySpec().getColumnMappings());
+      segmentReport = new 
MSQSegmentReport(DimensionRangeShardSpec.class.getSimpleName(), reason);
     }
 
     // Group partition ranges by bucket (time chunk), so we can generate 
shardSpecs for each bucket independently.
@@ -2039,19 +2053,24 @@ public class ControllerImpl implements Controller
    * Compute shard columns for {@link DimensionRangeShardSpec}. Returns an 
empty list if range-based sharding
    * is not applicable.
    */
-  private static List<String> computeShardColumns(
+  private static Pair<List<String>, String> computeShardColumns(
       final RowSignature signature,
       final ClusterBy clusterBy,
-      final ColumnMappings columnMappings
+      final ColumnMappings columnMappings,
+      boolean mayHaveMultiValuedClusterByFields
   )
   {
+    if (mayHaveMultiValuedClusterByFields) {
+      // DimensionRangeShardSpec cannot handle multivalued fields.
+      return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, the 
fields in the CLUSTERED BY clause contains multivalued fields. Using 
NumberedShardSpec instead.");
+    }
     final List<KeyColumn> clusterByColumns = clusterBy.getColumns();
     final List<String> shardColumns = new ArrayList<>();
     final boolean boosted = isClusterByBoosted(clusterBy);
     final int numShardColumns = clusterByColumns.size() - 
clusterBy.getBucketByCount() - (boosted ? 1 : 0);
 
     if (numShardColumns == 0) {
-      return Collections.emptyList();
+      return Pair.of(Collections.emptyList(), "Using NumberedShardSpec as no 
columns are supplied in the 'CLUSTERED BY' clause.");
     }
 
     for (int i = clusterBy.getBucketByCount(); i < 
clusterBy.getBucketByCount() + numShardColumns; i++) {
@@ -2060,25 +2079,25 @@ public class ControllerImpl implements Controller
 
       // DimensionRangeShardSpec only handles ascending order.
       if (column.order() != KeyOrder.ASCENDING) {
-        return Collections.emptyList();
+        return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, 
RangedShardSpec only supports ascending CLUSTER BY keys. Using 
NumberedShardSpec instead.");
       }
 
       ColumnType columnType = 
signature.getColumnType(column.columnName()).orElse(null);
 
       // DimensionRangeShardSpec only handles strings.
       if (!(ColumnType.STRING.equals(columnType))) {
-        return Collections.emptyList();
+        return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, 
RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec 
instead.");
       }
 
       // DimensionRangeShardSpec only handles columns that appear as-is in the 
output.
       if (outputColumns.isEmpty()) {
-        return Collections.emptyList();
+        return Pair.of(Collections.emptyList(), StringUtils.format("Cannot use 
RangeShardSpec, Could not find output column name for column [%s]. Using 
NumberedShardSpec instead.", column.columnName()));
       }
 
       
shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
     }
 
-    return shardColumns;
+    return Pair.of(shardColumns, "Using RangeShardSpec to generate segments.");
   }
 
   /**
@@ -2354,7 +2373,8 @@ public class ControllerImpl implements Controller
       @Nullable final DateTime queryStartTime,
       final long queryDuration,
       MSQWorkerTaskLauncher taskLauncher,
-      final SegmentLoadStatusFetcher segmentLoadWaiter
+      final SegmentLoadStatusFetcher segmentLoadWaiter,
+      @Nullable MSQSegmentReport msqSegmentReport
   )
   {
     int pendingTasks = -1;
@@ -2379,7 +2399,8 @@ public class ControllerImpl implements Controller
         workerStatsMap,
         pendingTasks,
         runningTasks,
-        status
+        status,
+        msqSegmentReport
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQSegmentReport.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQSegmentReport.java
new file mode 100644
index 00000000000..d045b8d7b51
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQSegmentReport.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.report;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class MSQSegmentReport
+{
+  private final String shardSpec;
+  private final String details;
+
+  @JsonCreator
+  public MSQSegmentReport(@JsonProperty("shardSpec") String shardSpec, 
@JsonProperty("details") String reason)
+  {
+    this.shardSpec = shardSpec;
+    this.details = reason;
+  }
+
+  @JsonProperty
+  public String getShardSpec()
+  {
+    return shardSpec;
+  }
+
+  @JsonProperty
+  public String getDetails()
+  {
+    return details;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MSQSegmentReport that = (MSQSegmentReport) o;
+    return Objects.equals(shardSpec, that.shardSpec) && 
Objects.equals(details, that.details);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(shardSpec, details);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "MSQSegmentReport{" +
+           "shardSpec='" + shardSpec + '\'' +
+           ", details='" + details + '\'' +
+           '}';
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
index 1dd7d658903..eca8998f865 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
@@ -59,6 +59,9 @@ public class MSQStatusReport
   @Nullable
   private final SegmentLoadStatusFetcher.SegmentLoadWaiterStatus 
segmentLoadWaiterStatus;
 
+  @Nullable
+  private final MSQSegmentReport segmentReport;
+
   @JsonCreator
   public MSQStatusReport(
       @JsonProperty("status") TaskState status,
@@ -69,7 +72,8 @@ public class MSQStatusReport
       @JsonProperty("workers") Map<Integer, 
List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
       @JsonProperty("pendingTasks") int pendingTasks,
       @JsonProperty("runningTasks") int runningTasks,
-      @JsonProperty("segmentLoadWaiterStatus") @Nullable 
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
+      @JsonProperty("segmentLoadWaiterStatus") @Nullable 
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus,
+      @JsonProperty("segmentReport") @Nullable MSQSegmentReport segmentReport
   )
   {
     this.status = Preconditions.checkNotNull(status, "status");
@@ -81,6 +85,7 @@ public class MSQStatusReport
     this.pendingTasks = pendingTasks;
     this.runningTasks = runningTasks;
     this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
+    this.segmentReport = segmentReport;
   }
 
   @JsonProperty
@@ -144,6 +149,14 @@ public class MSQStatusReport
     return segmentLoadWaiterStatus;
   }
 
+  @JsonProperty("segmentReport")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public MSQSegmentReport getSegmentReport()
+  {
+    return segmentReport;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index d22b2d7481e..f05e35c304c 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
 import org.apache.druid.msq.indexing.error.RowTooLargeFault;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.msq.test.CounterSnapshotMatcher;
 import org.apache.druid.msq.test.MSQTestBase;
@@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.hamcrest.CoreMatchers;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -110,6 +112,12 @@ public class MSQInsertTest extends MSQTestBase
                      .setExpectedRowSignature(rowSignature)
                      .setExpectedSegment(expectedFooSegments())
                      .setExpectedResultRows(expectedRows)
+                     .setExpectedMSQSegmentReport(
+                         new MSQSegmentReport(
+                             NumberedShardSpec.class.getSimpleName(),
+                             "Using NumberedShardSpec to generate segments 
since the query is inserting rows."
+                         )
+                     )
                      .setExpectedCountersForStageWorkerChannel(
                          CounterSnapshotMatcher
                              .with().totalFiles(1),
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 77eeed05367..9a158f5aec8 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
 import org.apache.druid.msq.test.CounterSnapshotMatcher;
 import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestTaskActionClient;
@@ -37,6 +38,7 @@ import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -45,7 +47,6 @@ import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
 import javax.annotation.Nonnull;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -238,6 +239,12 @@ public class MSQReplaceTest extends MSQTestBase
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedRowSignature(rowSignature)
                      .setQueryContext(context)
+                     .setExpectedMSQSegmentReport(
+                         new MSQSegmentReport(
+                             NumberedShardSpec.class.getSimpleName(),
+                             "Cannot use RangeShardSpec, RangedShardSpec only 
supports string CLUSTER BY keys. Using NumberedShardSpec instead."
+                         )
+                     )
                      .setExpectedSegment(ImmutableSet.of(
                                              SegmentId.of(
                                                  "foo1",
@@ -872,6 +879,12 @@ public class MSQReplaceTest extends MSQTestBase
                      .setExpectedDataSource("foo1")
                      .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedShardSpec(DimensionRangeShardSpec.class)
+                     .setExpectedMSQSegmentReport(
+                         new MSQSegmentReport(
+                             DimensionRangeShardSpec.class.getSimpleName(),
+                             "Using RangeShardSpec to generate segments."
+                         )
+                     )
                      .setExpectedRowSignature(rowSignature)
                      .setQueryContext(context)
                      .setExpectedSegment(expectedFooSegments())
@@ -993,6 +1006,12 @@ public class MSQReplaceTest extends MSQTestBase
                          "test",
                          0
                      )))
+                     .setExpectedMSQSegmentReport(
+                         new MSQSegmentReport(
+                             NumberedShardSpec.class.getSimpleName(),
+                             "Using NumberedShardSpec as no columns are 
supplied in the 'CLUSTERED BY' clause."
+                         )
+                     )
                      .setExpectedResultRows(
                          ImmutableList.of(
                              new Object[]{946684800000L, "a"},
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index df492a70056..f5b6b29119f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -107,7 +107,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, new HashMap<>(), 1, 2, status),
+            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, new HashMap<>(), 1, 2, status, null),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
@@ -172,7 +172,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.FAILED, errorReport, new 
ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
+            new MSQStatusReport(TaskState.FAILED, errorReport, new 
ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status, null),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
@@ -220,7 +220,7 @@ public class MSQTaskReportTest
     final MSQTaskReport report = new MSQTaskReport(
         TASK_ID,
         new MSQTaskReportPayload(
-            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, new HashMap<>(), 1, 2, status),
+            new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), 
null, 0, new HashMap<>(), 1, 2, status, null),
             MSQStagesReport.create(
                 QUERY_DEFINITION,
                 ImmutableMap.of(),
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 3d41b46825a..257efa4c307 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -247,6 +247,7 @@ public class SqlStatementResourceTest extends MSQTestBase
               new HashMap<>(),
               1,
               2,
+              null,
               null
           ),
           MSQStagesReport.create(
@@ -314,6 +315,7 @@ public class SqlStatementResourceTest extends MSQTestBase
               new HashMap<>(),
               1,
               2,
+              null,
               null
           ),
           MSQStagesReport.create(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5e8bb34abf2..7da53c4ee99 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -105,6 +105,7 @@ import org.apache.druid.msq.indexing.error.MSQFaultUtils;
 import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.kernel.StageDefinition;
@@ -861,6 +862,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
     protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
     protected MSQFault expectedMSQFault = null;
     protected Class<? extends MSQFault> expectedMSQFaultClass = null;
+    protected MSQSegmentReport expectedSegmentReport = null;
     protected Map<Integer, Integer> expectedStageVsWorkerCount = new 
HashMap<>();
     protected final Map<Integer, Map<Integer, Map<String, 
CounterSnapshotMatcher>>>
         expectedStageWorkerChannelToCounters = new HashMap<>();
@@ -949,6 +951,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
       return asBuilder();
     }
 
+    public Builder setExpectedMSQSegmentReport(MSQSegmentReport 
expectedSegmentReport)
+    {
+      this.expectedSegmentReport = expectedSegmentReport;
+      return asBuilder();
+    }
+
     public Builder setExpectedCountersForStageWorkerChannel(
         CounterSnapshotMatcher counterSnapshot,
         int stage,
@@ -1240,6 +1248,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
         if (expectedTuningConfig != null) {
           assertTuningConfig(expectedTuningConfig, 
foundSpec.getTuningConfig());
         }
+        if (expectedSegmentReport != null) {
+          Assert.assertEquals(expectedSegmentReport, 
reportPayload.getStatus().getSegmentReport());
+        }
         if (expectedDestinationIntervals != null) {
           Assert.assertNotNull(foundSpec);
           DataSourceMSQDestination destination = (DataSourceMSQDestination) 
foundSpec.getDestination();
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 65bd004c9b5..3c14f4f1cd9 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -70,6 +70,7 @@ public class SqlStatementResourceHelperTest
         new HashMap<>(),
         1,
         2,
+        null,
         null
     ), MSQStagesReport.create(
         MSQTaskReportTest.QUERY_DEFINITION,
@@ -109,6 +110,7 @@ public class SqlStatementResourceHelperTest
         new HashMap<>(),
         1,
         2,
+        null,
         null
     ), MSQStagesReport.create(
         MSQTaskReportTest.QUERY_DEFINITION,
@@ -149,6 +151,7 @@ public class SqlStatementResourceHelperTest
         new HashMap<>(),
         1,
         2,
+        null,
         null
     ), MSQStagesReport.create(
         MSQTaskReportTest.QUERY_DEFINITION,
@@ -187,6 +190,7 @@ public class SqlStatementResourceHelperTest
         new HashMap<>(),
         1,
         2,
+        null,
         null
     ), MSQStagesReport.create(
         MSQTaskReportTest.QUERY_DEFINITION,
@@ -226,6 +230,7 @@ public class SqlStatementResourceHelperTest
         new HashMap<>(),
         1,
         2,
+        null,
         null
     ), MSQStagesReport.create(
         MSQTaskReportTest.QUERY_DEFINITION,
@@ -265,6 +270,7 @@ public class SqlStatementResourceHelperTest
           new HashMap<>(),
           1,
           2,
+          null,
           null
         ),
         MSQStagesReport.create(
@@ -301,6 +307,7 @@ public class SqlStatementResourceHelperTest
             new HashMap<>(),
             1,
             2,
+            null,
             null
         ),
         MSQStagesReport.create(
@@ -339,6 +346,7 @@ public class SqlStatementResourceHelperTest
             new HashMap<>(),
             1,
             2,
+            null,
             null
         ),
         MSQStagesReport.create(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to