This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d87a84dd90f MSQ: Controller result batching, consolidated lifecycle.
(#19043)
d87a84dd90f is described below
commit d87a84dd90f92308bfef1ed66f2fecb7b0eba7c7
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Feb 23 11:15:09 2026 -0800
MSQ: Controller result batching, consolidated lifecycle. (#19043)
Main changes:
1) QueryListener is changed to deal in RowsAndColumns rather than
individual rows.
2) Logic for converting Frames to SQL-formatted rows is moved into
TaskReportQueryListener, out of QueryResultsReader.
3) DartQueryMaker's streaming mode (fullReport = false) is adjusted
to use SequenceQueryListener + FrameChannelSequence.
4) Controller register/deregister and execution lifecycle related logic
is consolidated and moved into ControllerHolder#runAsync, to simplify
DartQueryMaker.
---
.../msq/dart/controller/ControllerHolder.java | 188 +++++--
.../msq/dart/controller/ControllerThreadPool.java | 52 ++
.../msq/dart/controller/sql/DartQueryMaker.java | 541 ++++++---------------
.../msq/dart/controller/sql/DartSqlEngine.java | 37 +-
.../druid/msq/dart/guice/DartControllerModule.java | 18 +
.../druid/msq/exec/CaptureReportQueryListener.java | 24 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 3 -
.../msq/exec/ControllerQueryResultsReader.java | 64 +--
.../org/apache/druid/msq/exec/QueryListener.java | 26 +-
.../druid/msq/exec/SequenceQueryListener.java | 122 +++++
.../druid/msq/indexing/MSQControllerTask.java | 5 +-
.../msq/indexing/TaskReportQueryListener.java | 99 +++-
.../druid/msq/util/SqlStatementResourceHelper.java | 18 +-
.../dart/controller/http/DartSqlResourceTest.java | 63 ++-
.../msq/indexing/TaskReportQueryListenerTest.java | 112 ++++-
.../msq/test/MSQTestOverlordServiceClient.java | 49 +-
.../apache/druid/msq/test/NoopQueryListener.java | 11 +-
.../frame/channel/BlockingQueueFrameChannel.java | 2 +
.../druid/frame/channel/FrameChannelSequence.java | 24 +-
19 files changed, 812 insertions(+), 646 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
index a0b9f7c34f9..b6595a89ba4 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
@@ -20,19 +20,30 @@
package org.apache.druid.msq.dart.controller;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.worker.WorkerId;
import org.apache.druid.msq.exec.CaptureReportQueryListener;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.QueryListener;
+import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.BaseQuery;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.http.StandardQueryState;
import org.joda.time.DateTime;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -40,45 +51,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ControllerHolder
{
- public enum State
- {
- /**
- * Query has been accepted, but not yet {@link
Controller#run(QueryListener)}.
- */
- ACCEPTED(StandardQueryState.ACCEPTED),
-
- /**
- * Query has had {@link Controller#run(QueryListener)} called.
- */
- RUNNING(StandardQueryState.RUNNING),
-
- /**
- * Query has been canceled.
- */
- CANCELED(StandardQueryState.CANCELED),
-
- /**
- * Query has exited successfully.
- */
- SUCCESS(StandardQueryState.SUCCESS),
-
- /**
- * Query has failed.
- */
- FAILED(StandardQueryState.FAILED);
-
- private final String statusString;
-
- State(String statusString)
- {
- this.statusString = statusString;
- }
-
- public String getStatusString()
- {
- return statusString;
- }
- }
+ private static final Logger log = new Logger(ControllerHolder.class);
private final Controller controller;
private final String sqlQueryId;
@@ -188,21 +161,70 @@ public class ControllerHolder
}
/**
- * Calls {@link Controller#run(QueryListener)}, and returns true, if this
holder was previously in state
- * {@link State#ACCEPTED}. Otherwise returns false.
+ * Runs {@link Controller#run(QueryListener)} in the provided executor.
Registers the controller with the provided
+ * registry while it is running.
*
- * @return whether {@link Controller#run(QueryListener)} was called.
+ * @return future that resolves when the controller is done or canceled.
*/
- public boolean run(final QueryListener listener) throws Exception
- {
- if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
- final CaptureReportQueryListener reportListener = new
CaptureReportQueryListener(listener);
- controller.run(reportListener);
- updateStateOnQueryComplete(reportListener.getReport());
- return true;
- } else {
- return false;
- }
+ public ListenableFuture<?> runAsync(
+ final QueryListener listener,
+ final DartControllerRegistry controllerRegistry,
+ final ControllerThreadPool threadPool
+ )
+ {
+ // Register controller before submitting anything to controllerExeuctor,
so it shows up in
+ // "active controllers" lists.
+ controllerRegistry.register(this);
+
+ final ListenableFuture<?> future =
threadPool.getExecutorService().submit(() -> {
+ final String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(makeThreadName());
+
+ try {
+ final CaptureReportQueryListener reportListener = new
CaptureReportQueryListener(listener);
+
+ try {
+ if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
+ controller.run(reportListener);
+ updateStateOnQueryComplete(reportListener.getReport());
+ } else {
+ // Canceled before running.
+ reportListener.onQueryComplete(makeCanceledReport());
+ }
+ }
+ catch (Throwable e) {
+ log.warn(
+ e,
+ "Controller[%s] failed, queryId[%s], sqlQueryId[%s]",
+ controller.queryId(),
+ controller.getQueryContext().getString(BaseQuery.QUERY_ID),
+ sqlQueryId
+ );
+ }
+ finally {
+ // Build report and then call "deregister".
+ final MSQTaskReport taskReport;
+
+ if (reportListener.hasReport()) {
+ taskReport = new MSQTaskReport(controller.queryId(),
reportListener.getReport());
+ } else {
+ taskReport = null;
+ }
+
+ final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
+ controllerRegistry.deregister(this, reportMap);
+ }
+ }
+ finally {
+ Thread.currentThread().setName(threadName);
+ }
+ });
+
+ // Must not cancel the above future, otherwise "deregister" may never get
called. If a controller is canceled
+ // before it runs, the runnable above stays in the queue until it gets a
thread, then it exits without running
+ // the controller.
+ return Futures.nonCancellationPropagating(future);
}
private void updateStateOnQueryComplete(final MSQTaskReportPayload report)
@@ -217,4 +239,66 @@ public class ControllerHolder
break;
}
}
+
+ /**
+ * Generate a name for the thread that {@link #runAsync} uses.
+ */
+ private String makeThreadName()
+ {
+ return StringUtils.format(
+ "%s[%s]-sqlQueryId[%s]",
+ Thread.currentThread().getName(),
+ controller.queryId(),
+ sqlQueryId
+ );
+ }
+
+ private MSQTaskReportPayload makeCanceledReport()
+ {
+ final MSQErrorReport errorReport =
+ MSQErrorReport.fromFault(controller.queryId(), null, null,
CanceledFault.userRequest());
+ final MSQStatusReport statusReport =
+ new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0,
Map.of(), 0, 0, null, null);
+ return new MSQTaskReportPayload(statusReport, null, null, null);
+ }
+
+ public enum State
+ {
+ /**
+ * Query has been accepted, but not yet {@link
Controller#run(QueryListener)}.
+ */
+ ACCEPTED(StandardQueryState.ACCEPTED),
+
+ /**
+ * Query has had {@link Controller#run(QueryListener)} called.
+ */
+ RUNNING(StandardQueryState.RUNNING),
+
+ /**
+ * Query has been canceled.
+ */
+ CANCELED(StandardQueryState.CANCELED),
+
+ /**
+ * Query has exited successfully.
+ */
+ SUCCESS(StandardQueryState.SUCCESS),
+
+ /**
+ * Query has failed.
+ */
+ FAILED(StandardQueryState.FAILED);
+
+ private final String statusString;
+
+ State(String statusString)
+ {
+ this.statusString = statusString;
+ }
+
+ public String getStatusString()
+ {
+ return statusString;
+ }
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
new file mode 100644
index 00000000000..8e369d6fa81
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
+import org.apache.druid.msq.exec.Controller;
+
+/**
+ * Thread pool for running {@link Controller}. Number of threads is equal to
+ * {@link DartControllerConfig#getConcurrentQueries()}, which limits the
number of concurrent controllers.
+ */
+@ManageLifecycle
+public class ControllerThreadPool
+{
+ private final ListeningExecutorService executorService;
+
+ public ControllerThreadPool(final ListeningExecutorService executorService)
+ {
+ this.executorService = executorService;
+ }
+
+ public ListeningExecutorService getExecutorService()
+ {
+ return executorService;
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ executorService.shutdown();
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index 5270c9f63d8..d9e3ad30001 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -19,43 +19,35 @@
package org.apache.druid.msq.dart.controller.sql;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.io.LimitedOutputStream;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Either;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.guava.SequenceWrapper;
+import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
-import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerImpl;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
-import org.apache.druid.msq.exec.QueryListener;
import org.apache.druid.msq.exec.ResultsContext;
+import org.apache.druid.msq.exec.SequenceQueryListener;
import org.apache.druid.msq.indexing.LegacyMSQSpec;
import org.apache.druid.msq.indexing.QueryDefMSQSpec;
import org.apache.druid.msq.indexing.TaskReportQueryListener;
import org.apache.druid.msq.indexing.destination.MSQDestination;
-import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CancellationReason;
-import org.apache.druid.msq.indexing.error.MSQErrorReport;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.indexing.report.MSQStatusReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
+import org.apache.druid.msq.util.SqlStatementResourceHelper;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
@@ -68,38 +60,24 @@ import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlResults;
-import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
-import java.time.Duration;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* SQL {@link QueryMaker}. Executes queries in two ways, depending on whether
the user asked for a full report.
*
- * When including a full report, the controller runs in the SQL planning
thread (typically an HTTP thread) using
- * the method {@link #runWithReport(ControllerHolder)}. The entire response is
buffered in memory, up to
+ * When including a full report, the entire response is buffered in memory, up
to
* {@link DartControllerConfig#getMaxQueryReportSize()}.
*
- * When not including a full report, the controller runs in {@link
#controllerExecutor} and results are streamed
- * back to the user through {@link ResultIterator}. There is no limit to the
size of the returned results.
+ * When not including a full report, results are streamed back to the user
through a {@link SequenceQueryListener}.
+ * There is no limit to the size of the returned results.
*/
public class DartQueryMaker implements QueryMaker
{
- private static final Logger log = new Logger(DartQueryMaker.class);
-
final List<Entry<Integer, String>> fieldMapping;
private final DartControllerContextFactory controllerContextFactory;
private final PlannerContext plannerContext;
@@ -115,10 +93,9 @@ public class DartQueryMaker implements QueryMaker
private final DartControllerConfig controllerConfig;
/**
- * Executor for {@link #runWithIterator(ControllerHolder)}. Number of thread
is equal to
- * {@link DartControllerConfig#getConcurrentQueries()}, which limits the
number of concurrent controllers.
+ * Executor for running controllers.
*/
- private final ExecutorService controllerExecutor;
+ private final ControllerThreadPool controllerThreadPool;
private final ServerConfig serverConfig;
final QueryKitSpecFactory queryKitSpecFactory;
@@ -130,7 +107,7 @@ public class DartQueryMaker implements QueryMaker
PlannerContext plannerContext,
DartControllerRegistry controllerRegistry,
DartControllerConfig controllerConfig,
- ExecutorService controllerExecutor,
+ ControllerThreadPool controllerThreadPool,
QueryKitSpecFactory queryKitSpecFactory,
MultiQueryKit queryKit,
ServerConfig serverConfig
@@ -141,7 +118,7 @@ public class DartQueryMaker implements QueryMaker
this.plannerContext = plannerContext;
this.controllerRegistry = controllerRegistry;
this.controllerConfig = controllerConfig;
- this.controllerExecutor = controllerExecutor;
+ this.controllerThreadPool = controllerThreadPool;
this.queryKitSpecFactory = queryKitSpecFactory;
this.queryKit = queryKit;
this.serverConfig = serverConfig;
@@ -166,8 +143,11 @@ public class DartQueryMaker implements QueryMaker
return runLegacyMSQSpec(querySpec, druidQuery.getQuery().context(),
resultsContext);
}
- public static ResultsContext makeResultsContext(DruidQuery druidQuery,
List<Entry<Integer, String>> fieldMapping,
- PlannerContext plannerContext)
+ public static ResultsContext makeResultsContext(
+ DruidQuery druidQuery,
+ List<Entry<Integer, String>> fieldMapping,
+ PlannerContext plannerContext
+ )
{
final List<Pair<SqlTypeName, ColumnType>> types =
MSQTaskQueryMaker.getTypes(druidQuery, fieldMapping, plannerContext);
final ResultsContext resultsContext = new ResultsContext(
@@ -177,6 +157,32 @@ public class DartQueryMaker implements QueryMaker
return resultsContext;
}
+ /**
+ * Creates a controller using {@link LegacyMSQSpec} and calls {@link
#runController}.
+ */
+ public QueryResponse<Object[]> runLegacyMSQSpec(
+ LegacyMSQSpec querySpec,
+ QueryContext context,
+ ResultsContext resultsContext
+ )
+ {
+ final ControllerImpl controller = makeLegacyController(querySpec, context,
resultsContext);
+ return runController(controller, context.getFullReport(),
querySpec.getColumnMappings(), resultsContext);
+ }
+
+ /**
+ * Creates a controller using {@link QueryDefMSQSpec} and calls {@link
#runController}.
+ */
+ public QueryResponse<Object[]> runQueryDefMSQSpec(
+ QueryDefMSQSpec querySpec,
+ QueryContext context,
+ ResultsContext resultsContext
+ )
+ {
+ final ControllerImpl controller = makeQueryDefController(querySpec,
context, resultsContext);
+ return runController(controller, context.getFullReport(),
querySpec.getColumnMappings(), resultsContext);
+ }
+
private ControllerImpl makeLegacyController(LegacyMSQSpec querySpec,
QueryContext context, ResultsContext resultsContext)
{
final ControllerContext controllerContext =
controllerContextFactory.newContext(context);
@@ -201,46 +207,6 @@ public class DartQueryMaker implements QueryMaker
);
}
- public QueryResponse<Object[]> runLegacyMSQSpec(LegacyMSQSpec querySpec,
QueryContext context, ResultsContext resultsContext)
- {
- final ControllerImpl controller = makeLegacyController(querySpec, context,
resultsContext);
- return runController(controller, context.getFullReport());
- }
-
- public QueryResponse<Object[]> runQueryDefMSQSpec(QueryDefMSQSpec querySpec,
QueryContext context, ResultsContext resultsContext)
- {
- final ControllerImpl controller = makeQueryDefController(querySpec,
context, resultsContext);
- return runController(controller, context.getFullReport());
- }
-
- private QueryResponse<Object[]> runController(final ControllerImpl
controller, final boolean fullReport)
- {
- final ControllerHolder controllerHolder = new ControllerHolder(
- controller,
- plannerContext.getSqlQueryId(),
- plannerContext.getSql(),
- plannerContext.getAuthenticationResult(),
- DateTimes.nowUtc()
- );
-
- // Register controller before submitting anything to controllerExeuctor,
so it shows up in
- // "active controllers" lists.
- controllerRegistry.register(controllerHolder);
-
- try {
- // runWithReport, runWithoutReport are responsible for calling
controllerRegistry.deregister(controllerHolder)
- // when their work is done.
- final Sequence<Object[]> results =
- fullReport ? runWithReport(controllerHolder) :
runWithIterator(controllerHolder);
- return QueryResponse.withEmptyContext(results);
- }
- catch (Throwable e) {
- // Error while calling runWithReport or runWithoutReport. Deregister
controller immediately.
- controllerRegistry.deregister(controllerHolder, null);
- throw e;
- }
- }
-
/**
* Adds the timeout parameter to the query context, considering the default
and maximum values from
* {@link ServerConfig}.
@@ -254,353 +220,124 @@ public class DartQueryMaker implements QueryMaker
}
/**
- * Run a query and return the full report, buffered in memory up to
- * {@link DartControllerConfig#getMaxQueryReportSize()}.
+ * Runs a controller in {@link #controllerThreadPool} and returns a {@link
QueryResponse} object.
*
- * Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
+ * @param controller controller to run
+ * @param fullReport if true, buffer the results into a report and
return it in a single row.
+ * if false, stream the results back
+ * @param columnMappings SQL column mappings
+ * @param resultsContext SQL results context
*/
- private Sequence<Object[]> runWithReport(final ControllerHolder
controllerHolder)
+ private QueryResponse<Object[]> runController(
+ final ControllerImpl controller,
+ final boolean fullReport,
+ final ColumnMappings columnMappings,
+ final ResultsContext resultsContext
+ )
{
- final Future<TaskReport.ReportMap> reportFuture;
-
- // Run in controllerExecutor. Control doesn't really *need* to be moved to
another thread, but we have to
- // use the controllerExecutor anyway, to ensure we respect the
concurrentQueries configuration.
- reportFuture = controllerExecutor.submit(() -> {
- TaskReport.ReportMap retVal = null;
- final String threadName = Thread.currentThread().getName();
-
- try {
- Thread.currentThread().setName(nameThread(plannerContext));
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final TaskReportQueryListener queryListener = new
TaskReportQueryListener(
- () -> new LimitedOutputStream(
- baos,
- controllerConfig.getMaxQueryReportSize(),
- limit -> StringUtils.format(
- "maxQueryReportSize[%,d] exceeded. "
- + "Try limiting the result set for your query, or run it
with %s[false]",
- limit,
- QueryContexts.CTX_FULL_REPORT
- )
- ),
- plannerContext.getJsonMapper(),
- controllerHolder.getController().queryId(),
- Collections.emptyMap(),
- MSQDestination.UNLIMITED
- );
-
- if (controllerHolder.run(queryListener)) {
- retVal = plannerContext.getJsonMapper()
- .readValue(baos.toByteArray(),
TaskReport.ReportMap.class);
- } else {
- // Controller was canceled before it ran.
- throw MSQErrorReport
- .fromFault(
- controllerHolder.getController().queryId(),
- null,
- null,
- CanceledFault.userRequest()
- )
- .toDruidException();
- }
- }
- finally {
- controllerRegistry.deregister(controllerHolder, retVal);
- Thread.currentThread().setName(threadName);
- }
-
- return retVal;
- });
-
- // Return a sequence that reads one row (the report) from reportFuture.
- return new BaseSequence<>(
- new BaseSequence.IteratorMaker<>()
- {
- @Override
- public Iterator<Object[]> make()
- {
- try {
- return Iterators.singletonIterator(new
Object[]{reportFuture.get()});
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap ExecutionExceptions, so errors such as DruidException
are serialized properly.
- Throwables.throwIfUnchecked(e.getCause());
- throw new RuntimeException(e.getCause());
- }
- }
-
- @Override
- public void cleanup(Iterator<Object[]> iterFromMake)
- {
- // Nothing to do.
- }
- }
+ final ControllerHolder controllerHolder = new ControllerHolder(
+ controller,
+ plannerContext.getSqlQueryId(),
+ plannerContext.getSql(),
+ plannerContext.getAuthenticationResult(),
+ DateTimes.nowUtc()
);
+
+ // runWithReport, runWithoutReport are responsible for calling
controllerRegistry.deregister(controllerHolder)
+ // when their work is done.
+ final Sequence<Object[]> results =
+ fullReport
+ ? runWithReport(controllerHolder, columnMappings, resultsContext)
+ : runWithSequence(controllerHolder, columnMappings, resultsContext);
+ return QueryResponse.withEmptyContext(results);
}
/**
- * Run a query and return the results only, streamed back using {@link
ResultIteratorMaker}.
+ * Run a query and return the full report, buffered in memory up to
+ * {@link DartControllerConfig#getMaxQueryReportSize()}.
*
* Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
*/
- private Sequence<Object[]> runWithIterator(final ControllerHolder
controllerHolder)
- {
- return new BaseSequence<>(new ResultIteratorMaker(controllerHolder));
- }
-
- /**
- * Generate a name for a thread in {@link #controllerExecutor}.
- */
- private String nameThread(final PlannerContext plannerContext)
+ private Sequence<Object[]> runWithReport(
+ final ControllerHolder controllerHolder,
+ final ColumnMappings columnMappings,
+ final ResultsContext resultsContext
+ )
{
- return StringUtils.format(
- "%s-sqlQueryId[%s]-queryId[%s]",
- Thread.currentThread().getName(),
- plannerContext.getSqlQueryId(),
- plannerContext.queryContext().get(QueryContexts.CTX_DART_QUERY_ID)
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final TaskReportQueryListener listener = new TaskReportQueryListener(
+ () -> new LimitedOutputStream(
+ baos,
+ controllerConfig.getMaxQueryReportSize(),
+ limit -> StringUtils.format(
+ "maxQueryReportSize[%,d] exceeded. "
+ + "Try limiting the result set for your query, or run it with
%s[false]",
+ limit,
+ QueryContexts.CTX_FULL_REPORT
+ )
+ ),
+ plannerContext.getJsonMapper(),
+ controllerHolder.getController().queryId(),
+ Collections.emptyMap(),
+ MSQDestination.UNLIMITED,
+ columnMappings,
+ resultsContext
);
- }
-
- /**
- * Helper for {@link #runWithIterator(ControllerHolder)}.
- */
- class ResultIteratorMaker implements BaseSequence.IteratorMaker<Object[],
ResultIterator>
- {
- private final ControllerHolder controllerHolder;
- private final ResultIterator resultIterator;
- private boolean made;
- public ResultIteratorMaker(ControllerHolder holder)
- {
- this.controllerHolder = holder;
- this.resultIterator = new
ResultIterator(controllerHolder.getController().getQueryContext().getTimeoutDuration());
- submitController();
- }
-
- /**
- * Submits the controller to the executor in the constructor, and remove
it from the registry when the
- * future resolves.
- */
- private void submitController()
- {
- controllerExecutor.submit(() -> {
- final Controller controller = controllerHolder.getController();
- final String threadName = Thread.currentThread().getName();
-
- try {
- Thread.currentThread().setName(nameThread(plannerContext));
-
- if (!controllerHolder.run(resultIterator)) {
- // Controller was canceled before it ran. Push a cancellation
error to the resultIterator, so the sequence
- // returned by "runWithoutReport" can resolve.
- resultIterator.pushError(
- MSQErrorReport.fromFault(
- controllerHolder.getController().queryId(),
- null,
- null,
- CanceledFault.userRequest()
- ).toDruidException()
- );
- }
- }
- catch (Exception e) {
- log.warn(
- e,
- "Controller failed for sqlQueryId[%s], controllerHost[%s]",
- plannerContext.getSqlQueryId(),
- controller.queryId()
- );
- }
- catch (Throwable e) {
- log.error(
- e,
- "Controller failed for sqlQueryId[%s], controllerHost[%s]",
- plannerContext.getSqlQueryId(),
- controller.queryId()
- );
- throw e;
- }
- finally {
- final MSQTaskReport taskReport;
-
- if (resultIterator.report != null) {
- taskReport = new MSQTaskReport(
- controllerHolder.getController().queryId(),
- resultIterator.report
- );
- } else {
- taskReport = null;
- }
+ try {
+ // Submit controller and wait for it to finish.
+ controllerHolder.runAsync(listener, controllerRegistry,
controllerThreadPool).get();
- final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
- reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
- controllerRegistry.deregister(controllerHolder, reportMap);
- Thread.currentThread().setName(threadName);
- }
- });
+ // Return a sequence with just one row (the report).
+ final TaskReport.ReportMap reportMap =
+ plannerContext.getJsonMapper().readValue(baos.toByteArray(),
TaskReport.ReportMap.class);
+ return Sequences.simple(List.<Object[]>of(new Object[]{reportMap}));
}
-
- @Override
- public ResultIterator make()
- {
- if (made) {
- throw new ISE("Cannot call make() more than once");
- }
-
- made = true;
- return resultIterator;
+ catch (InterruptedException e) {
+ controllerHolder.cancel(CancellationReason.UNKNOWN);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
-
- @Override
- public void cleanup(final ResultIterator iterFromMake)
- {
- if (!iterFromMake.complete) {
- controllerHolder.cancel(CancellationReason.UNKNOWN);
- }
+ catch (Exception e) {
+ throw new RuntimeException(e);
}
}
/**
- * Helper for {@link ResultIteratorMaker}, which is in turn a helper for
{@link #runWithIterator(ControllerHolder)}.
+ * Run a query and return the results only, streamed back using {@link
SequenceQueryListener}.
+ *
+ * Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
*/
- static class ResultIterator implements Iterator<Object[]>, QueryListener
+ private Sequence<Object[]> runWithSequence(
+ final ControllerHolder controllerHolder,
+ final ColumnMappings columnMappings,
+ final ResultsContext resultsContext
+ )
{
- /**
- * Number of rows to buffer from {@link #onResultRow(Object[])}.
- */
- private static final int BUFFER_SIZE = 128;
-
- /**
- * Empty optional signifies results are complete.
- */
- private final BlockingQueue<Either<Throwable, Object[]>> rowBuffer = new
ArrayBlockingQueue<>(BUFFER_SIZE);
-
- /**
- * Only accessed by {@link Iterator} methods, so no need to be thread-safe.
- */
- @Nullable
- private Either<Throwable, Object[]> current;
-
- private volatile boolean complete;
- private volatile MSQTaskReportPayload report;
-
- @Nullable
- private final Duration timeout;
-
- public ResultIterator(@Nullable Duration timeout)
- {
- this.timeout = timeout;
- }
-
- @Override
- public boolean hasNext()
- {
- return populateAndReturnCurrent().isPresent();
- }
-
- @Override
- public Object[] next()
- {
- final Object[] retVal =
populateAndReturnCurrent().orElseThrow(NoSuchElementException::new);
- current = null;
- return retVal;
- }
-
- private Optional<Object[]> populateAndReturnCurrent()
- {
- if (current == null) {
- try {
- if (timeout != null) {
- current = rowBuffer.poll(timeout.toMillis(),
TimeUnit.MILLISECONDS);
- if (current == null) {
- throw DruidException.defensive("Result reader timed out [%s]",
timeout);
+ final SequenceQueryListener listener = new SequenceQueryListener();
+ final ListenableFuture<?> runFuture =
+ controllerHolder.runAsync(listener, controllerRegistry,
controllerThreadPool);
+
+ return Sequences.wrap(
+ listener.getSequence().flatMap(
+ rac -> SqlStatementResourceHelper.getResultSequence(
+ rac.as(Frame.class),
+ listener.getFrameReader(),
+ columnMappings,
+ resultsContext,
+ plannerContext.getJsonMapper()
+ )
+ ),
+ new SequenceWrapper()
+ {
+ @Override
+ public void after(final boolean isDone, final Throwable thrown)
+ {
+ if (!isDone || thrown != null) {
+ runFuture.cancel(true); // Cancel on early stop or failure
}
- } else {
- current = rowBuffer.take();
}
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- if (current.isValue()) {
- return Optional.ofNullable(current.valueOrThrow());
- } else {
- // Don't use valueOrThrow to throw errors; here we *don't* want the
wrapping in RuntimeException
- // that Either.valueOrThrow does. We want the original DruidException
to be propagated to the user, if
- // there is one.
- final Throwable e = current.error();
- Throwables.throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean readResults()
- {
- return !complete;
- }
-
- @Override
- public void onResultsStart(
- final List<MSQResultsReport.ColumnAndType> signature,
- @Nullable final List<SqlTypeName> sqlTypeNames
- )
- {
- // Nothing to do.
- }
-
- @Override
- public boolean onResultRow(Object[] row)
- {
- try {
- rowBuffer.put(Either.value(row));
- return !complete;
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onResultsComplete()
- {
- // Nothing to do.
- }
-
- @Override
- public void onQueryComplete(MSQTaskReportPayload report)
- {
- try {
- this.report = report;
- this.complete = true;
-
- final MSQStatusReport statusReport = report.getStatus();
-
- if (statusReport.getStatus().isSuccess()) {
- rowBuffer.put(Either.value(null));
- } else {
- pushError(statusReport.getErrorReport().toDruidException());
- }
- }
- catch (InterruptedException e) {
- // Can't fix this by pushing an error, because the rowBuffer isn't
accepting new entries.
- // Give up, allow controllerHolder.run() to fail.
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- public void pushError(final Throwable e) throws InterruptedException
- {
- rowBuffer.put(Either.error(e));
- }
+ );
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 954c44d2305..2410ea8486d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -32,10 +32,10 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.QueryInfoAndReport;
@@ -72,7 +72,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@LazySingleton
@@ -84,7 +83,7 @@ public class DartSqlEngine implements SqlEngine
private final DartControllerContextFactory controllerContextFactory;
private final DartControllerRegistry controllerRegistry;
private final DartControllerConfig controllerConfig;
- private final ExecutorService controllerExecutor;
+ private final ControllerThreadPool controllerThreadPool;
private final ServerConfig serverConfig;
private final QueryKitSpecFactory queryKitSpecFactory;
private final MultiQueryKit queryKit;
@@ -97,6 +96,7 @@ public class DartSqlEngine implements SqlEngine
DartControllerContextFactory controllerContextFactory,
DartControllerRegistry controllerRegistry,
DartControllerConfig controllerConfig,
+ @Dart ControllerThreadPool controllerThreadPool,
DartQueryKitSpecFactory queryKitSpecFactory,
MultiQueryKit queryKit,
ServerConfig serverConfig,
@@ -104,38 +104,11 @@ public class DartSqlEngine implements SqlEngine
SqlToolbox toolbox,
DartSqlClients sqlClients
)
- {
- this(
- controllerContextFactory,
- controllerRegistry,
- controllerConfig,
- Execs.multiThreaded(controllerConfig.getConcurrentQueries(),
"dart-controller-%s"),
- queryKitSpecFactory,
- queryKit,
- serverConfig,
- dartQueryConfig,
- toolbox,
- sqlClients
- );
- }
-
- public DartSqlEngine(
- DartControllerContextFactory controllerContextFactory,
- DartControllerRegistry controllerRegistry,
- DartControllerConfig controllerConfig,
- ExecutorService controllerExecutor,
- QueryKitSpecFactory queryKitSpecFactory,
- MultiQueryKit queryKit,
- ServerConfig serverConfig,
- DefaultQueryConfig dartQueryConfig,
- SqlToolbox toolbox,
- DartSqlClients sqlClients
- )
{
this.controllerContextFactory = controllerContextFactory;
this.controllerRegistry = controllerRegistry;
this.controllerConfig = controllerConfig;
- this.controllerExecutor = controllerExecutor;
+ this.controllerThreadPool = controllerThreadPool;
this.queryKitSpecFactory = queryKitSpecFactory;
this.queryKit = queryKit;
this.serverConfig = serverConfig;
@@ -224,7 +197,7 @@ public class DartSqlEngine implements SqlEngine
plannerContext,
controllerRegistry,
controllerConfig,
- controllerExecutor,
+ controllerThreadPool,
queryKitSpecFactory,
queryKit,
serverConfig
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 3ee647e65ba..a4175ecad0f 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.dart.guice;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
@@ -34,9 +35,11 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.DartResourcePermissionMapper;
import org.apache.druid.msq.dart.controller.ControllerMessageListener;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerContextFactoryImpl;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
@@ -122,6 +125,21 @@ public class DartControllerModule implements DruidModule
}
}
+ @Provides
+ @Dart
+ @ManageLifecycle
+ public ControllerThreadPool makeControllerThreadPool(DartControllerConfig
dartControllerConfig)
+ {
+ return new ControllerThreadPool(
+ MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(
+ dartControllerConfig.getConcurrentQueries(),
+ "dart-controller-%s"
+ )
+ )
+ );
+ }
+
@Override
public List<? extends com.fasterxml.jackson.databind.Module>
getJacksonModules()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
index 597ffc82692..7632ca2ecf2 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
@@ -19,13 +19,12 @@
package org.apache.druid.msq.exec;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.error.DruidException;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
import javax.annotation.Nullable;
-import java.util.List;
/**
* A {@link QueryListener} wrapper that captures the report from {@link
#onQueryComplete(MSQTaskReportPayload)}.
@@ -42,6 +41,14 @@ public class CaptureReportQueryListener implements
QueryListener
this.delegate = delegate;
}
+ /**
+ * Whether this listener has captured a report. Will be true if the query
has completed, false otherwise.
+ */
+ public boolean hasReport()
+ {
+ return report != null;
+ }
+
/**
* Retrieves the report. Can only be called once the query is complete.
*/
@@ -61,18 +68,15 @@ public class CaptureReportQueryListener implements
QueryListener
}
@Override
- public void onResultsStart(
- final List<MSQResultsReport.ColumnAndType> signature,
- @Nullable final List<SqlTypeName> sqlTypeNames
- )
+ public void onResultsStart(final FrameReader frameReader)
{
- delegate.onResultsStart(signature, sqlTypeNames);
+ delegate.onResultsStart(frameReader);
}
@Override
- public boolean onResultRow(final Object[] row)
+ public boolean onResultBatch(RowsAndColumns rac)
{
- return delegate.onResultRow(row);
+ return delegate.onResultBatch(rac);
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 879f8177868..78d021dca6c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2839,9 +2839,6 @@ public class ControllerImpl implements Controller
final ControllerQueryResultsReader resultsReader = new
ControllerQueryResultsReader(
resultsChannel,
queryDef.getFinalStageDefinition().getFrameReader(),
- querySpec.getColumnMappings(),
- resultsContext,
- context.jsonMapper(),
queryListener
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
index 6ef4b743c21..e8766f049fd 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
@@ -19,25 +19,14 @@
package org.apache.druid.msq.exec;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.util.SqlStatementResourceHelper;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.planner.ColumnMapping;
-import org.apache.druid.sql.calcite.planner.ColumnMappings;
-import org.apache.druid.utils.CloseableUtils;
import java.io.IOException;
import java.util.Collections;
@@ -52,9 +41,6 @@ public class ControllerQueryResultsReader implements
FrameProcessor<Void>
private final ReadableFrameChannel in;
private final FrameReader frameReader;
- private final ColumnMappings columnMappings;
- private final ResultsContext resultsContext;
- private final ObjectMapper jsonMapper;
private final QueryListener queryListener;
private boolean wroteResultsStart;
@@ -62,17 +48,11 @@ public class ControllerQueryResultsReader implements
FrameProcessor<Void>
ControllerQueryResultsReader(
final ReadableFrameChannel in,
final FrameReader frameReader,
- final ColumnMappings columnMappings,
- final ResultsContext resultsContext,
- final ObjectMapper jsonMapper,
final QueryListener queryListener
)
{
this.in = in;
this.frameReader = frameReader;
- this.columnMappings = columnMappings;
- this.resultsContext = resultsContext;
- this.jsonMapper = jsonMapper;
this.queryListener = queryListener;
}
@@ -96,23 +76,7 @@ public class ControllerQueryResultsReader implements
FrameProcessor<Void>
}
if (!wroteResultsStart) {
- final RowSignature querySignature = frameReader.signature();
- final ImmutableList.Builder<MSQResultsReport.ColumnAndType>
mappedSignature = ImmutableList.builder();
-
- for (final ColumnMapping mapping : columnMappings.getMappings()) {
- mappedSignature.add(
- new MSQResultsReport.ColumnAndType(
- mapping.getOutputColumn(),
-
querySignature.getColumnType(mapping.getQueryColumn()).orElse(null)
- )
- );
- }
-
- queryListener.onResultsStart(
- mappedSignature.build(),
- resultsContext.getSqlTypeNames()
- );
-
+ queryListener.onResultsStart(frameReader);
wroteResultsStart = true;
}
@@ -121,31 +85,7 @@ public class ControllerQueryResultsReader implements
FrameProcessor<Void>
queryListener.onResultsComplete();
return ReturnOrAwait.returnObject(null);
} else {
- final Frame frame = in.readFrame();
- Yielder<Object[]> rowYielder = Yielders.each(
- SqlStatementResourceHelper.getResultSequence(
- frame,
- frameReader,
- columnMappings,
- resultsContext,
- jsonMapper
- )
- );
-
- try {
- while (!rowYielder.isDone()) {
- if (queryListener.onResultRow(rowYielder.get())) {
- rowYielder = rowYielder.next(null);
- } else {
- // Caller wanted to stop reading.
- return ReturnOrAwait.returnObject(null);
- }
- }
- }
- finally {
- CloseableUtils.closeAndSuppressExceptions(rowYielder, e -> log.warn(e,
"Failed to close frame yielder"));
- }
-
+ queryListener.onResultBatch(in.read());
return ReturnOrAwait.awaitAll(inputChannels().size());
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
index 997fe4c8682..1987c4cc666 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
@@ -19,12 +19,10 @@
package org.apache.druid.msq.exec;
-import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-
-import javax.annotation.Nullable;
-import java.util.List;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
/**
* Object passed to {@link Controller#run(QueryListener)} to enable retrieval
of results, status, counters, etc.
@@ -39,27 +37,21 @@ public interface QueryListener
/**
* Called when results start coming in.
*
- * @param signature signature of results
- * @param sqlTypeNames SQL type names of results; same length as the
signature
+ * @param frameReader reader for frames that will be passed to {@link
#onResultBatch(RowsAndColumns)}
*/
- void onResultsStart(
- List<MSQResultsReport.ColumnAndType> signature,
- @Nullable List<SqlTypeName> sqlTypeNames
- );
+ void onResultsStart(FrameReader frameReader);
/**
- * Called for each result row. Follows a call to {@link
#onResultsStart(List, List)}.
- *
- * @param row result row
+ * Called for each result batch. Follows a call to {@link #onResultsStart}.
*
* @return whether the controller should keep reading results
*/
- boolean onResultRow(Object[] row);
+ boolean onResultBatch(RowsAndColumns rac);
/**
- * Called after the last result has been delivered by {@link
#onResultRow(Object[])}. Only called if results are
- * actually complete. If results are truncated due to {@link #readResults()}
or {@link #onResultRow(Object[])}
- * returning false, this method is not called.
+ * Called after the last result has been delivered by {@link
#onResultBatch(RowsAndColumns)}. Only called if results
+ * are actually complete. If results are truncated due to {@link
#readResults()} or
+ * {@link #onResultBatch(RowsAndColumns)} returning false, this method is
not called.
*/
void onResultsComplete();
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SequenceQueryListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SequenceQueryListener.java
new file mode 100644
index 00000000000..6837a9f6c4b
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SequenceQueryListener.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.FrameChannelSequence;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+import java.io.IOException;
+
+/**
+ * Listener that provides a {@link Sequence} of {@link RowsAndColumns}, via
{@link #getSequence()}.
+ */
+public class SequenceQueryListener implements QueryListener
+{
+ private static final int DEFAULT_CAPACITY = 2;
+
+ private final BlockingQueueFrameChannel channel;
+ private volatile FrameReader frameReader;
+
+ public SequenceQueryListener()
+ {
+ this(DEFAULT_CAPACITY);
+ }
+
+ public SequenceQueryListener(final int bufferSize)
+ {
+ this.channel = new BlockingQueueFrameChannel(bufferSize);
+ }
+
+ /**
+ * Returns a {@link Sequence} of {@link RowsAndColumns} read from the
channel. The sequence performs blocking
+ * reads; it should be consumed from a thread that is allowed to block.
+ */
+ public Sequence<RowsAndColumns> getSequence()
+ {
+ return new FrameChannelSequence(channel.readable());
+ }
+
+ /**
+ * Returns the {@link FrameReader} set by {@link
#onResultsStart(FrameReader)}.
+ */
+ public FrameReader getFrameReader()
+ {
+ return frameReader;
+ }
+
+ @Override
+ public boolean readResults()
+ {
+ return true;
+ }
+
+ @Override
+ public void onResultsStart(final FrameReader frameReader)
+ {
+ this.frameReader = frameReader;
+ }
+
+ @Override
+ public boolean onResultBatch(final RowsAndColumns rac)
+ {
+ try {
+ final WritableFrameChannel writable = channel.writable();
+
+ // Blocking write.
+ FutureUtils.getUnchecked(writable.writabilityFuture(), false);
+ writable.write(rac);
+ return true;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onResultsComplete()
+ {
+ // Nothing to do. We'll close the writable channel in onQueryComplete.
+ }
+
+ @Override
+ public void onQueryComplete(final MSQTaskReportPayload report)
+ {
+ try {
+ final WritableFrameChannel writable = channel.writable();
+
+ if (!writable.isClosed()) {
+ if (!report.getStatus().getStatus().isSuccess()) {
+
writable.fail(report.getStatus().getErrorReport().toDruidException());
+ }
+
+ writable.close();
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index ab051344921..c43d855ad50 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -269,12 +269,15 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
injector.getInstance(MSQTaskQueryKitSpecFactory.class)
);
+ final ResultsContext resultsContext = new
ResultsContext(getSqlTypeNames(), getSqlResultsContext());
final TaskReportQueryListener queryListener = new TaskReportQueryListener(
() ->
toolbox.getTaskReportFileWriter().openReportOutputStream(getId()),
toolbox.getJsonMapper(),
getId(),
getContext(),
- querySpec.getDestination().getRowsInTaskReport()
+ querySpec.getDestination().getRowsInTaskReport(),
+ querySpec.getColumnMappings(),
+ resultsContext
);
controller.run(queryListener);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
index 4c21ca005ee..4cb9f957d39 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
@@ -22,21 +22,31 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.calcite.sql.type.SqlTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.exec.QueryListener;
+import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -67,18 +77,25 @@ public class TaskReportQueryListener implements
QueryListener
private final SerializerProvider serializers;
private final String taskId;
private final Map<String, Object> taskContext;
+ @Nullable
+ private final ColumnMappings columnMappings;
+ @Nullable
+ private final ResultsContext resultsContext;
private JsonGenerator jg;
private long numResults;
private MSQStatusReport statusReport;
private boolean resultsCurrentlyOpen;
+ private FrameReader frameReader; // Set after onResultsStart
public TaskReportQueryListener(
final OutputStreamSupplier reportSink,
final ObjectMapper jsonMapper,
final String taskId,
final Map<String, Object> taskContext,
- final long rowsInTaskReport
+ final long rowsInTaskReport,
+ @Nullable final ColumnMappings columnMappings,
+ @Nullable final ResultsContext resultsContext
)
{
this.reportSink = reportSink;
@@ -87,6 +104,36 @@ public class TaskReportQueryListener implements
QueryListener
this.taskId = taskId;
this.taskContext = taskContext;
this.rowsInTaskReport = rowsInTaskReport;
+ this.columnMappings = columnMappings;
+ this.resultsContext = resultsContext;
+ }
+
+ /**
+ * Maps {@link FrameReader#signature()} using {@link ColumnMappings}, then
returns the result in the
+ * form expected for {@link MSQResultsReport#getSignature()}.
+ */
+ public static List<MSQResultsReport.ColumnAndType> computeResultSignature(
+ final FrameReader frameReader,
+ @Nullable final ColumnMappings columnMappings
+ )
+ {
+ if (columnMappings == null) {
+ return computeResultSignature(frameReader,
ColumnMappings.identity(frameReader.signature()));
+ }
+
+ final RowSignature querySignature = frameReader.signature();
+ final ImmutableList.Builder<MSQResultsReport.ColumnAndType>
mappedSignature = ImmutableList.builder();
+
+ for (final ColumnMapping mapping : columnMappings.getMappings()) {
+ mappedSignature.add(
+ new MSQResultsReport.ColumnAndType(
+ mapping.getOutputColumn(),
+
querySignature.getColumnType(mapping.getQueryColumn()).orElse(null)
+ )
+ );
+ }
+
+ return mappedSignature.build();
}
@Override
@@ -96,16 +143,18 @@ public class TaskReportQueryListener implements
QueryListener
}
@Override
- public void onResultsStart(List<MSQResultsReport.ColumnAndType> signature,
@Nullable List<SqlTypeName> sqlTypeNames)
+ public void onResultsStart(final FrameReader frameReader)
{
+ this.frameReader = frameReader;
+
try {
openGenerator();
resultsCurrentlyOpen = true;
jg.writeObjectFieldStart(FIELD_RESULTS);
- writeObjectField(FIELD_RESULTS_SIGNATURE, signature);
- if (sqlTypeNames != null) {
- writeObjectField(FIELD_RESULTS_SQL_TYPE_NAMES, sqlTypeNames);
+ writeObjectField(FIELD_RESULTS_SIGNATURE,
computeResultSignature(frameReader, columnMappings));
+ if (resultsContext != null && resultsContext.getSqlTypeNames() != null) {
+ writeObjectField(FIELD_RESULTS_SQL_TYPE_NAMES,
resultsContext.getSqlTypeNames());
}
jg.writeArrayFieldStart(FIELD_RESULTS_RESULTS);
}
@@ -115,16 +164,40 @@ public class TaskReportQueryListener implements
QueryListener
}
@Override
- public boolean onResultRow(Object[] row)
+ public boolean onResultBatch(RowsAndColumns rac)
{
- try {
- JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, row);
- numResults++;
- return rowsInTaskReport == MSQDestination.UNLIMITED || numResults <
rowsInTaskReport;
+ final Frame frame = rac.as(Frame.class);
+ if (frame == null) {
+ throw DruidException.defensive(
+ "Expected Frame, got RAC[%s]. Can only handle Frames in task
reports.",
+ rac.getClass().getName()
+ );
}
- catch (IOException e) {
- throw new RuntimeException(e);
+
+ final Iterator<Object[]> resultIterator =
SqlStatementResourceHelper.getResultIterator(
+ frame,
+ frameReader,
+ columnMappings,
+ resultsContext,
+ jsonMapper
+ );
+
+ while (resultIterator.hasNext()) {
+ final Object[] row = resultIterator.next();
+ try {
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, row);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ numResults++;
+
+ if (rowsInTaskReport != MSQDestination.UNLIMITED && numResults >=
rowsInTaskReport) {
+ return false;
+ }
}
+
+ return true;
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
index f70dd81caee..f8662643c95 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
@@ -304,7 +304,7 @@ public class SqlStatementResourceHelper
));
}
- public static Sequence<Object[]> getResultSequence(
+ public static Iterator<Object[]> getResultIterator(
final Frame resultsFrame,
final FrameReader resultFrameReader,
final ColumnMappings resultColumnMappings,
@@ -321,7 +321,7 @@ public class SqlStatementResourceHelper
.map(mapping ->
columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn()))
.collect(Collectors.toList());
- final Iterable<Object[]> retVal = () -> new Iterator<>()
+ return new Iterator<>()
{
@Override
public boolean hasNext()
@@ -356,7 +356,19 @@ public class SqlStatementResourceHelper
return row;
}
};
- return Sequences.simple(retVal);
+ }
+
+ public static Sequence<Object[]> getResultSequence(
+ final Frame resultsFrame,
+ final FrameReader resultFrameReader,
+ final ColumnMappings resultColumnMappings,
+ final ResultsContext resultsContext,
+ final ObjectMapper jsonMapper
+ )
+ {
+ return Sequences.simple(
+ () -> getResultIterator(resultsFrame, resultFrameReader,
resultColumnMappings, resultsContext, jsonMapper)
+ );
}
@Nullable
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 4cf48396142..0b77461d178 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLockType;
@@ -36,6 +37,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
import org.apache.druid.msq.dart.controller.sql.DartSqlClient;
@@ -149,7 +151,7 @@ public class DartSqlResourceTest extends MSQTestBase
private SqlResource sqlResource;
private DartControllerRegistry controllerRegistry;
- private ExecutorService controllerExecutor;
+ private ControllerThreadPool controllerThreadPool;
private AutoCloseable mockCloser;
private final StubServiceEmitter serviceEmitter = new StubServiceEmitter();
@@ -255,9 +257,13 @@ public class DartSqlResourceTest extends MSQTestBase
}
},
objectMapper.convertValue(ImmutableMap.of(),
DartControllerConfig.class),
- controllerExecutor = Execs.multiThreaded(
- MAX_CONTROLLERS,
- StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-exec")
+ controllerThreadPool = new ControllerThreadPool(
+ MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(
+ MAX_CONTROLLERS,
+ StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-exec")
+ )
+ )
),
new DartQueryKitSpecFactory(new
TestTimelineServerView(Collections.emptyList())),
injector.getInstance(MultiQueryKit.class),
@@ -291,9 +297,9 @@ public class DartSqlResourceTest extends MSQTestBase
mockCloser.close();
// shutdown(), not shutdownNow(), to ensure controllers stop timely on
their own.
- controllerExecutor.shutdown();
+ controllerThreadPool.getExecutorService().shutdown();
- if (!controllerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+ if (!controllerThreadPool.getExecutorService().awaitTermination(1,
TimeUnit.MINUTES)) {
throw new IAE("controllerExecutor.awaitTermination() timed out");
}
@@ -744,7 +750,7 @@ public class DartSqlResourceTest extends MSQTestBase
.thenReturn(makeAuthenticationResult(REGULAR_USER_NAME));
// Block up the controllerExecutor so the controller runs long enough to
cancel it.
- final Future<?> sleepFuture = controllerExecutor.submit(() -> {
+ final Future<?> sleepFuture =
controllerThreadPool.getExecutorService().submit(() -> {
try {
Thread.sleep(3_600_000);
}
@@ -798,19 +804,38 @@ public class DartSqlResourceTest extends MSQTestBase
// Wait for the SQL POST to come back.
Assertions.assertNull(doPostFuture.get());
-
Assertions.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
asyncResponse.getStatus());
- // Ensure MSQ fault (CanceledFault) is properly translated to a
DruidException and then properly serialized.
- final Map<String, Object> e = objectMapper.readValue(
- asyncResponse.baos.toByteArray(),
- JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
- );
- Assertions.assertEquals("Canceled", e.get("errorCode"));
- Assertions.assertEquals("CANCELED", e.get("category"));
- Assertions.assertEquals(
-
MSQFaultUtils.generateMessageWithErrorCode(CanceledFault.userRequest()),
- e.get("errorMessage")
- );
+ if (fullReport) {
+ // Buffered report path -- should get a cancellation error in the report.
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
asyncResponse.getStatus());
+
+ final List<List<TaskReport.ReportMap>> reportMaps =
objectMapper.readValue(
+ asyncResponse.baos.toByteArray(),
+ new TypeReference<>() {}
+ );
+
+ final MSQTaskReport report = (MSQTaskReport)
Iterables.getOnlyElement(Iterables.getOnlyElement(reportMaps)).get(MSQTaskReport.REPORT_KEY);
+ final MSQStatusReport statusReport = report.getPayload().getStatus();
+
+ Assertions.assertEquals(TaskState.FAILED, statusReport.getStatus());
+ Assertions.assertEquals(CanceledFault.userRequest(),
statusReport.getErrorReport().getFault());
+
+ } else {
+ // Streaming path -- should get a serialized DruidException.
+
Assertions.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
asyncResponse.getStatus());
+
+ // Ensure MSQ fault (CanceledFault) is properly translated to a
DruidException and then properly serialized.
+ final Map<String, Object> e = objectMapper.readValue(
+ asyncResponse.baos.toByteArray(),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+ Assertions.assertEquals("Canceled", e.get("errorCode"));
+ Assertions.assertEquals("CANCELED", e.get("category"));
+ Assertions.assertEquals(
+
MSQFaultUtils.generateMessageWithErrorCode(CanceledFault.userRequest()),
+ e.get("errorMessage")
+ );
+ }
}
@Test
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
index e50da1c75ed..99e443e5144 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
@@ -26,22 +26,31 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.exec.Limits;
+import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.indexing.report.MSQTaskReportTest;
+import org.apache.druid.segment.RowAdapter;
+import org.apache.druid.segment.RowBasedCursorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.junit.Assert;
import org.junit.Test;
@@ -52,6 +61,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -59,30 +70,60 @@ public class TaskReportQueryListenerTest
{
private static final String TASK_ID = "mytask";
private static final Map<String, Object> TASK_CONTEXT =
ImmutableMap.of("foo", "bar");
- private static final List<MSQResultsReport.ColumnAndType> SIGNATURE =
ImmutableList.of(
- new MSQResultsReport.ColumnAndType("x", ColumnType.STRING)
- );
+ private static final RowSignature SIGNATURE = RowSignature.builder()
+ .add("x",
ColumnType.STRING)
+ .build();
private static final List<SqlTypeName> SQL_TYPE_NAMES =
ImmutableList.of(SqlTypeName.VARCHAR);
private static final ObjectMapper JSON_MAPPER =
TestHelper.makeJsonMapper().registerModules(new
MSQIndexingModule().getJacksonModules());
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ /**
+ * Creates a Frame containing the given rows using the test signature.
+ */
+ private Frame createFrame(final List<Map<String, Object>> rows)
+ {
+ final RowBasedCursorFactory<Map<String, Object>> cursorFactory = new
RowBasedCursorFactory<>(
+ Sequences.simple(rows),
+ new MapRowAdapter(SIGNATURE),
+ SIGNATURE
+ );
+
+ return FrameSequenceBuilder.fromCursorFactory(cursorFactory)
+ .frameType(FrameType.latestRowBased())
+ .maxRowsPerFrame(Integer.MAX_VALUE)
+ .frames()
+ .toList()
+ .get(0);
+ }
+
@Test
public void test_taskReportDestination() throws IOException
{
+ final FrameReader frameReader = FrameReader.create(SIGNATURE);
+ final ResultsContext resultsContext = new ResultsContext(SQL_TYPE_NAMES,
null);
+
final TaskReportQueryListener listener = new TaskReportQueryListener(
Suppliers.ofInstance(baos)::get,
JSON_MAPPER,
TASK_ID,
TASK_CONTEXT,
- TaskReportMSQDestination.instance().getRowsInTaskReport()
+ TaskReportMSQDestination.instance().getRowsInTaskReport(),
+ ColumnMappings.identity(SIGNATURE),
+ resultsContext
);
Assert.assertTrue(listener.readResults());
- listener.onResultsStart(SIGNATURE, SQL_TYPE_NAMES);
- Assert.assertTrue(listener.onResultRow(new Object[]{"foo"}));
- Assert.assertTrue(listener.onResultRow(new Object[]{"bar"}));
+ listener.onResultsStart(frameReader);
+
+ // Create a frame with two rows
+ final Frame frame = createFrame(ImmutableList.of(
+ ImmutableMap.of("x", "foo"),
+ ImmutableMap.of("x", "bar")
+ ));
+ Assert.assertTrue(listener.onResultBatch(frame.asRAC()));
+
listener.onResultsComplete();
listener.onQueryComplete(
new MSQTaskReportPayload(
@@ -139,20 +180,40 @@ public class TaskReportQueryListenerTest
@Test
public void test_durableDestination() throws IOException
{
+ final FrameReader frameReader = FrameReader.create(SIGNATURE);
+ final ResultsContext resultsContext = new ResultsContext(SQL_TYPE_NAMES,
null);
+
final TaskReportQueryListener listener = new TaskReportQueryListener(
Suppliers.ofInstance(baos)::get,
JSON_MAPPER,
TASK_ID,
TASK_CONTEXT,
- DurableStorageMSQDestination.instance().getRowsInTaskReport()
+ DurableStorageMSQDestination.instance().getRowsInTaskReport(),
+ ColumnMappings.identity(SIGNATURE),
+ resultsContext
);
Assert.assertTrue(listener.readResults());
- listener.onResultsStart(SIGNATURE, SQL_TYPE_NAMES);
- for (int i = 0; i < Limits.MAX_SELECT_RESULT_ROWS - 1; i++) {
- Assert.assertTrue("row #" + i, listener.onResultRow(new
Object[]{"foo"}));
+ listener.onResultsStart(frameReader);
+
+ // Create frames with rows up to MAX_SELECT_RESULT_ROWS
+ final int batchSize = 100;
+ int rowsAdded = 0;
+ boolean keepGoing = true;
+
+ while (keepGoing && rowsAdded < Limits.MAX_SELECT_RESULT_ROWS) {
+ final int rowsToAdd = (int) Math.min(batchSize,
Limits.MAX_SELECT_RESULT_ROWS - rowsAdded);
+ final List<Map<String, Object>> rows = IntStream.range(0, rowsToAdd)
+ .mapToObj(i ->
ImmutableMap.<String, Object>of("x", "foo"))
+
.collect(Collectors.toList());
+ final Frame frame = createFrame(rows);
+ keepGoing = listener.onResultBatch(frame.asRAC());
+ rowsAdded += rowsToAdd;
}
- Assert.assertFalse(listener.onResultRow(new Object[]{"foo"}));
+
+ // Should have stopped accepting results after MAX_SELECT_RESULT_ROWS
+ Assert.assertFalse(keepGoing);
+
listener.onQueryComplete(
new MSQTaskReportPayload(
new MSQStatusReport(
@@ -203,4 +264,29 @@ public class TaskReportQueryListenerTest
Assert.assertTrue(report.getPayload().getResults().isResultsTruncated());
Assert.assertEquals(TaskState.SUCCESS,
report.getPayload().getStatus().getStatus());
}
+
+ /**
+ * Simple RowAdapter for Map-based rows.
+ */
+ private static class MapRowAdapter implements RowAdapter<Map<String, Object>>
+ {
+ private final RowSignature signature;
+
+ MapRowAdapter(final RowSignature signature)
+ {
+ this.signature = signature;
+ }
+
+ @Override
+ public ToLongFunction<Map<String, Object>> timestampFunction()
+ {
+ return row -> 0L;
+ }
+
+ @Override
+ public Function<Map<String, Object>, Object> columnFunction(final String
columnName)
+ {
+ return row -> row.get(columnName);
+ }
+ }
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index 8bfcc49f917..658139289db 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -31,6 +31,8 @@ import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
@@ -45,6 +47,7 @@ import org.apache.druid.msq.exec.QueryListener;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.TaskReportQueryListener;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
@@ -52,12 +55,16 @@ import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.MSQTaskQueryKitSpecFactory;
+import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -172,10 +179,14 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
testTaskDetails.addController(controller);
+ final ResultsContext resultsContext = new
ResultsContext(cTask.getSqlTypeNames(), cTask.getSqlResultsContext());
queryListener =
new TestQueryListener(
cTask.getId(),
- cTask.getQuerySpec().getDestination()
+ cTask.getQuerySpec().getDestination(),
+ objectMapper,
+ cTask.getQuerySpec().getColumnMappings(),
+ resultsContext
);
try {
@@ -296,17 +307,30 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
{
private final String taskId;
private final MSQDestination destination;
+ private final ObjectMapper jsonMapper;
private final List<Object[]> results = new ArrayList<>();
+ private final ColumnMappings columnMappings;
+ private final ResultsContext resultsContext;
+ private FrameReader frameReader;
private List<MSQResultsReport.ColumnAndType> signature;
private List<SqlTypeName> sqlTypeNames;
private boolean resultsTruncated = true;
private TaskReport.ReportMap reportMap;
- public TestQueryListener(final String taskId, final MSQDestination
destination)
+ public TestQueryListener(
+ final String taskId,
+ final MSQDestination destination,
+ final ObjectMapper jsonMapper,
+ final ColumnMappings columnMappings,
+ final ResultsContext resultsContext
+ )
{
this.taskId = taskId;
this.destination = destination;
+ this.jsonMapper = jsonMapper;
+ this.columnMappings = columnMappings;
+ this.resultsContext = resultsContext;
}
@Override
@@ -316,18 +340,29 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
}
@Override
- public void onResultsStart(List<MSQResultsReport.ColumnAndType> signature,
@Nullable List<SqlTypeName> sqlTypeNames)
+ public void onResultsStart(final FrameReader frameReader)
{
- this.signature = signature;
- this.sqlTypeNames = sqlTypeNames;
+ this.frameReader = frameReader;
+ this.signature =
TaskReportQueryListener.computeResultSignature(frameReader, columnMappings);
+ this.sqlTypeNames = resultsContext != null ?
resultsContext.getSqlTypeNames() : null;
}
@Override
- public boolean onResultRow(Object[] row)
+ public boolean onResultBatch(RowsAndColumns rac)
{
if (destination.getRowsInTaskReport() == MSQDestination.UNLIMITED
|| results.size() < destination.getRowsInTaskReport()) {
- results.add(row);
+ final Iterator<Object[]> resultIterator =
SqlStatementResourceHelper.getResultIterator(
+ rac.as(Frame.class),
+ frameReader,
+ columnMappings,
+ resultsContext,
+ jsonMapper
+ );
+ while (resultIterator.hasNext()) {
+ final Object[] row = resultIterator.next();
+ results.add(row);
+ }
return true;
} else {
return false;
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
index fe38819a451..39d88d78c8d 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
@@ -19,13 +19,10 @@
package org.apache.druid.msq.test;
-import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.msq.exec.QueryListener;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-
-import javax.annotation.Nullable;
-import java.util.List;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
public class NoopQueryListener implements QueryListener
{
@@ -36,13 +33,13 @@ public class NoopQueryListener implements QueryListener
}
@Override
- public void onResultsStart(List<MSQResultsReport.ColumnAndType> signature,
@Nullable List<SqlTypeName> sqlTypeNames)
+ public void onResultsStart(final FrameReader frameReader)
{
// Do nothing.
}
@Override
- public boolean onResultRow(Object[] row)
+ public boolean onResultBatch(RowsAndColumns rac)
{
return true;
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
index 7a41a781147..e434b62c209 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
@@ -173,6 +173,8 @@ public class BlockingQueueFrameChannel
// If this happens, it's a bug, potentially due to incorrectly using
this class with multiple writers.
throw new ISE("Could not write error to channel");
}
+
+ notifyReader();
}
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
b/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
index 83075b0e48e..44de5ee263b 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
@@ -20,19 +20,21 @@
package org.apache.druid.frame.channel;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.utils.Throwables;
import java.io.Closeable;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
- * Adapter that converts a {@link ReadableFrameChannel} into a {@link
org.apache.druid.java.util.common.guava.Sequence}
- * of {@link RowsAndColumns}.
+ * Adapter that converts a {@link ReadableFrameChannel} into a {@link
Sequence} of {@link RowsAndColumns}.
*
- * This class does blocking reads on the channel, rather than nonblocking
reads. Therefore, it is preferable to use
- * {@link ReadableFrameChannel} directly whenever nonblocking reads are
desired.
+ * This class does blocking reads on the channel. Therefore, it is preferable
to use {@link ReadableFrameChannel}
+ * directly whenever nonblocking reads are desired.
*/
public class FrameChannelSequence extends BaseSequence<RowsAndColumns,
FrameChannelSequence.FrameChannelIterator>
{
@@ -81,7 +83,19 @@ public class FrameChannelSequence extends
BaseSequence<RowsAndColumns, FrameChan
throw new NoSuchElementException();
}
- return channel.read();
+ try {
+ return channel.read();
+ }
+ catch (Exception e) {
+ // Unwrap DruidException. Bit of a hack to have this here, but it's
necessary to properly bubble up
+ // DruidExceptions that might be wrapped by "channel.read()" due to
being thrown in another thread.
+ final DruidException druidException = Throwables.getCauseOfType(e,
DruidException.class);
+ if (druidException != null) {
+ throw druidException;
+ } else {
+ throw e;
+ }
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]