This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e2e0cb905cf Add reasoning for choosing shardSpec to the MSQ report
(#16175)
e2e0cb905cf is described below
commit e2e0cb905cf237f6062e44fde2ec5d501d6cb834
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Apr 9 11:32:02 2024 +0530
Add reasoning for choosing shardSpec to the MSQ report (#16175)
This PR logs the segment type and reason chosen. It also adds it to the
query report, to be displayed in the UI.
This PR adds a new section to the reports, segmentReport. This contains the
segment type created, if the query is an ingestion, and null otherwise.
---
docs/api-reference/sql-ingestion-api.md | 9 ++-
.../org/apache/druid/msq/exec/ControllerImpl.java | 53 ++++++++++-----
.../msq/indexing/report/MSQSegmentReport.java | 78 ++++++++++++++++++++++
.../druid/msq/indexing/report/MSQStatusReport.java | 15 ++++-
.../org/apache/druid/msq/exec/MSQInsertTest.java | 8 +++
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 21 +++++-
.../msq/indexing/report/MSQTaskReportTest.java | 6 +-
.../sql/resources/SqlStatementResourceTest.java | 2 +
.../org/apache/druid/msq/test/MSQTestBase.java | 11 +++
.../msq/util/SqlStatementResourceHelperTest.java | 8 +++
10 files changed, 189 insertions(+), 22 deletions(-)
diff --git a/docs/api-reference/sql-ingestion-api.md
b/docs/api-reference/sql-ingestion-api.md
index 988f860a85d..67a6ccf9588 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -299,7 +299,7 @@ The response shows an example report for a query.
},
"pendingTasks": 0,
"runningTasks": 2,
- "segmentLoadStatus": {
+ "segmentLoadWaiterStatus": {
"state": "SUCCESS",
"dataSource": "kttm_simple",
"startTime": "2022-09-14T23:12:09.266Z",
@@ -310,6 +310,10 @@ The response shows an example report for a query.
"onDemandSegments": 0,
"pendingSegments": 0,
"unknownSegments": 0
+ },
+ "segmentReport": {
+ "shardSpec": "NumberedShardSpec",
+ "details": "Cannot use RangeShardSpec, RangedShardSpec only supports
string CLUSTER BY keys. Using NumberedShardSpec instead."
}
},
"stages": [
@@ -631,6 +635,9 @@ The following table describes the response fields when you
retrieve a report for
| `multiStageQuery.payload.status.segmentLoadStatus.onDemandSegments` | The
number of segments which are not loaded on any historical, as per the load
rules. |
| `multiStageQuery.payload.status.segmentLoadStatus.pendingSegments` | The
number of segments remaining to be loaded. |
| `multiStageQuery.payload.status.segmentLoadStatus.unknownSegments` | The
number of segments whose status is unknown. |
+| `multiStageQuery.payload.status.segmentReport` | Segment report. Only
present if the query is an ingestion. |
+| `multiStageQuery.payload.status.segmentReport.shardSpec` | Contains the
shard spec chosen. |
+| `multiStageQuery.payload.status.segmentReport.details` | Contains further
reasoning about the shard spec chosen. |
| `multiStageQuery.payload.status.errorReport` | Error object. Only present if
there was an error. |
| `multiStageQuery.payload.status.errorReport.taskId` | The task that reported
the error, if known. May be a controller task or a worker task. |
| `multiStageQuery.payload.status.errorReport.host` | The hostname and port of
the task that reported the error, if known. |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 6db998bfef2..c29259e318c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -130,6 +130,7 @@ import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
@@ -313,6 +314,8 @@ public class ControllerImpl implements Controller
private final boolean isFaultToleranceEnabled;
private final boolean isFailOnEmptyInsertEnabled;
private volatile SegmentLoadStatusFetcher segmentLoadWaiter;
+ @Nullable
+ private MSQSegmentReport segmentReport;
public ControllerImpl(
final MSQControllerTask task,
@@ -565,7 +568,8 @@ public class ControllerImpl implements Controller
queryStartTime,
new Interval(queryStartTime,
DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher,
- segmentLoadWaiter
+ segmentLoadWaiter,
+ segmentReport
),
stagesReport,
countersSnapshot,
@@ -935,7 +939,8 @@ public class ControllerImpl implements Controller
queryStartTime,
queryStartTime == null ? -1L : new
Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher,
- segmentLoadWaiter
+ segmentLoadWaiter,
+ segmentReport
),
makeStageReport(
queryDef,
@@ -1015,6 +1020,11 @@ public class ControllerImpl implements Controller
String previousSegmentId = null;
+ segmentReport = new MSQSegmentReport(
+ NumberedShardSpec.class.getSimpleName(),
+ "Using NumberedShardSpec to generate segments since the query is
inserting rows."
+ );
+
for (ClusterByPartition partitionBoundary : partitionBoundaries) {
final DateTime timestamp = getBucketDateTime(partitionBoundary,
segmentGranularity, keyReader);
final SegmentIdWithShardSpec allocation;
@@ -1099,12 +1109,16 @@ public class ControllerImpl implements Controller
final SegmentIdWithShardSpec[] retVal = new
SegmentIdWithShardSpec[partitionBoundaries.size()];
final Granularity segmentGranularity = destination.getSegmentGranularity();
final List<String> shardColumns;
-
- if (mayHaveMultiValuedClusterByFields) {
- // DimensionRangeShardSpec cannot handle multi-valued fields.
- shardColumns = Collections.emptyList();
+ final Pair<List<String>, String> shardReasonPair;
+
+ shardReasonPair = computeShardColumns(signature, clusterBy,
task.getQuerySpec().getColumnMappings(), mayHaveMultiValuedClusterByFields);
+ shardColumns = shardReasonPair.lhs;
+ String reason = shardReasonPair.rhs;
+ log.info(StringUtils.format("ShardSpec chosen: %s", reason));
+ if (shardColumns.isEmpty()) {
+ segmentReport = new
MSQSegmentReport(NumberedShardSpec.class.getSimpleName(), reason);
} else {
- shardColumns = computeShardColumns(signature, clusterBy,
task.getQuerySpec().getColumnMappings());
+ segmentReport = new
MSQSegmentReport(DimensionRangeShardSpec.class.getSimpleName(), reason);
}
// Group partition ranges by bucket (time chunk), so we can generate
shardSpecs for each bucket independently.
@@ -2039,19 +2053,24 @@ public class ControllerImpl implements Controller
* Compute shard columns for {@link DimensionRangeShardSpec}. Returns an
empty list if range-based sharding
* is not applicable.
*/
- private static List<String> computeShardColumns(
+ private static Pair<List<String>, String> computeShardColumns(
final RowSignature signature,
final ClusterBy clusterBy,
- final ColumnMappings columnMappings
+ final ColumnMappings columnMappings,
+ boolean mayHaveMultiValuedClusterByFields
)
{
+ if (mayHaveMultiValuedClusterByFields) {
+ // DimensionRangeShardSpec cannot handle multivalued fields.
+ return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, the
fields in the CLUSTERED BY clause contains multivalued fields. Using
NumberedShardSpec instead.");
+ }
final List<KeyColumn> clusterByColumns = clusterBy.getColumns();
final List<String> shardColumns = new ArrayList<>();
final boolean boosted = isClusterByBoosted(clusterBy);
final int numShardColumns = clusterByColumns.size() -
clusterBy.getBucketByCount() - (boosted ? 1 : 0);
if (numShardColumns == 0) {
- return Collections.emptyList();
+ return Pair.of(Collections.emptyList(), "Using NumberedShardSpec as no
columns are supplied in the 'CLUSTERED BY' clause.");
}
for (int i = clusterBy.getBucketByCount(); i <
clusterBy.getBucketByCount() + numShardColumns; i++) {
@@ -2060,25 +2079,25 @@ public class ControllerImpl implements Controller
// DimensionRangeShardSpec only handles ascending order.
if (column.order() != KeyOrder.ASCENDING) {
- return Collections.emptyList();
+ return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec,
RangedShardSpec only supports ascending CLUSTER BY keys. Using
NumberedShardSpec instead.");
}
ColumnType columnType =
signature.getColumnType(column.columnName()).orElse(null);
// DimensionRangeShardSpec only handles strings.
if (!(ColumnType.STRING.equals(columnType))) {
- return Collections.emptyList();
+ return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec,
RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec
instead.");
}
// DimensionRangeShardSpec only handles columns that appear as-is in the
output.
if (outputColumns.isEmpty()) {
- return Collections.emptyList();
+ return Pair.of(Collections.emptyList(), StringUtils.format("Cannot use
RangeShardSpec, Could not find output column name for column [%s]. Using
NumberedShardSpec instead.", column.columnName()));
}
shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
}
- return shardColumns;
+ return Pair.of(shardColumns, "Using RangeShardSpec to generate segments.");
}
/**
@@ -2354,7 +2373,8 @@ public class ControllerImpl implements Controller
@Nullable final DateTime queryStartTime,
final long queryDuration,
MSQWorkerTaskLauncher taskLauncher,
- final SegmentLoadStatusFetcher segmentLoadWaiter
+ final SegmentLoadStatusFetcher segmentLoadWaiter,
+ @Nullable MSQSegmentReport msqSegmentReport
)
{
int pendingTasks = -1;
@@ -2379,7 +2399,8 @@ public class ControllerImpl implements Controller
workerStatsMap,
pendingTasks,
runningTasks,
- status
+ status,
+ msqSegmentReport
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQSegmentReport.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQSegmentReport.java
new file mode 100644
index 00000000000..d045b8d7b51
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQSegmentReport.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.report;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class MSQSegmentReport
+{
+ private final String shardSpec;
+ private final String details;
+
+ @JsonCreator
+ public MSQSegmentReport(@JsonProperty("shardSpec") String shardSpec,
@JsonProperty("details") String reason)
+ {
+ this.shardSpec = shardSpec;
+ this.details = reason;
+ }
+
+ @JsonProperty
+ public String getShardSpec()
+ {
+ return shardSpec;
+ }
+
+ @JsonProperty
+ public String getDetails()
+ {
+ return details;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MSQSegmentReport that = (MSQSegmentReport) o;
+ return Objects.equals(shardSpec, that.shardSpec) &&
Objects.equals(details, that.details);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(shardSpec, details);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MSQSegmentReport{" +
+ "shardSpec='" + shardSpec + '\'' +
+ ", details='" + details + '\'' +
+ '}';
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
index 1dd7d658903..eca8998f865 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
@@ -59,6 +59,9 @@ public class MSQStatusReport
@Nullable
private final SegmentLoadStatusFetcher.SegmentLoadWaiterStatus
segmentLoadWaiterStatus;
+ @Nullable
+ private final MSQSegmentReport segmentReport;
+
@JsonCreator
public MSQStatusReport(
@JsonProperty("status") TaskState status,
@@ -69,7 +72,8 @@ public class MSQStatusReport
@JsonProperty("workers") Map<Integer,
List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
- @JsonProperty("segmentLoadWaiterStatus") @Nullable
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
+ @JsonProperty("segmentLoadWaiterStatus") @Nullable
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus,
+ @JsonProperty("segmentReport") @Nullable MSQSegmentReport segmentReport
)
{
this.status = Preconditions.checkNotNull(status, "status");
@@ -81,6 +85,7 @@ public class MSQStatusReport
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
+ this.segmentReport = segmentReport;
}
@JsonProperty
@@ -144,6 +149,14 @@ public class MSQStatusReport
return segmentLoadWaiterStatus;
}
+ @JsonProperty("segmentReport")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public MSQSegmentReport getSegmentReport()
+ {
+ return segmentReport;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index d22b2d7481e..f05e35c304c 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
@@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.params.ParameterizedTest;
@@ -110,6 +112,12 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedRows)
+ .setExpectedMSQSegmentReport(
+ new MSQSegmentReport(
+ NumberedShardSpec.class.getSimpleName(),
+ "Using NumberedShardSpec to generate segments
since the query is inserting rows."
+ )
+ )
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 77eeed05367..9a158f5aec8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
@@ -37,6 +38,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.jupiter.params.ParameterizedTest;
@@ -45,7 +47,6 @@ import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import javax.annotation.Nonnull;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -238,6 +239,12 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
+ .setExpectedMSQSegmentReport(
+ new MSQSegmentReport(
+ NumberedShardSpec.class.getSimpleName(),
+ "Cannot use RangeShardSpec, RangedShardSpec only
supports string CLUSTER BY keys. Using NumberedShardSpec instead."
+ )
+ )
.setExpectedSegment(ImmutableSet.of(
SegmentId.of(
"foo1",
@@ -872,6 +879,12 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedShardSpec(DimensionRangeShardSpec.class)
+ .setExpectedMSQSegmentReport(
+ new MSQSegmentReport(
+ DimensionRangeShardSpec.class.getSimpleName(),
+ "Using RangeShardSpec to generate segments."
+ )
+ )
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegment(expectedFooSegments())
@@ -993,6 +1006,12 @@ public class MSQReplaceTest extends MSQTestBase
"test",
0
)))
+ .setExpectedMSQSegmentReport(
+ new MSQSegmentReport(
+ NumberedShardSpec.class.getSimpleName(),
+ "Using NumberedShardSpec as no columns are
supplied in the 'CLUSTERED BY' clause."
+ )
+ )
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, "a"},
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index df492a70056..f5b6b29119f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -107,7 +107,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, new HashMap<>(), 1, 2, status),
+ new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, new HashMap<>(), 1, 2, status, null),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@@ -172,7 +172,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.FAILED, errorReport, new
ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
+ new MSQStatusReport(TaskState.FAILED, errorReport, new
ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status, null),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@@ -220,7 +220,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, new HashMap<>(), 1, 2, status),
+ new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, new HashMap<>(), 1, 2, status, null),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 3d41b46825a..257efa4c307 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -247,6 +247,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new HashMap<>(),
1,
2,
+ null,
null
),
MSQStagesReport.create(
@@ -314,6 +315,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new HashMap<>(),
1,
2,
+ null,
null
),
MSQStagesReport.create(
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5e8bb34abf2..7da53c4ee99 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -105,6 +105,7 @@ import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.kernel.StageDefinition;
@@ -861,6 +862,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
protected Class<? extends MSQFault> expectedMSQFaultClass = null;
+ protected MSQSegmentReport expectedSegmentReport = null;
protected Map<Integer, Integer> expectedStageVsWorkerCount = new
HashMap<>();
protected final Map<Integer, Map<Integer, Map<String,
CounterSnapshotMatcher>>>
expectedStageWorkerChannelToCounters = new HashMap<>();
@@ -949,6 +951,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
+ public Builder setExpectedMSQSegmentReport(MSQSegmentReport
expectedSegmentReport)
+ {
+ this.expectedSegmentReport = expectedSegmentReport;
+ return asBuilder();
+ }
+
public Builder setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher counterSnapshot,
int stage,
@@ -1240,6 +1248,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
if (expectedTuningConfig != null) {
assertTuningConfig(expectedTuningConfig,
foundSpec.getTuningConfig());
}
+ if (expectedSegmentReport != null) {
+ Assert.assertEquals(expectedSegmentReport,
reportPayload.getStatus().getSegmentReport());
+ }
if (expectedDestinationIntervals != null) {
Assert.assertNotNull(foundSpec);
DataSourceMSQDestination destination = (DataSourceMSQDestination)
foundSpec.getDestination();
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 65bd004c9b5..3c14f4f1cd9 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -70,6 +70,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
@@ -109,6 +110,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
@@ -149,6 +151,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
@@ -187,6 +190,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
@@ -226,6 +230,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
), MSQStagesReport.create(
MSQTaskReportTest.QUERY_DEFINITION,
@@ -265,6 +270,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
),
MSQStagesReport.create(
@@ -301,6 +307,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
),
MSQStagesReport.create(
@@ -339,6 +346,7 @@ public class SqlStatementResourceHelperTest
new HashMap<>(),
1,
2,
+ null,
null
),
MSQStagesReport.create(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]