This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c1c7dff2ad Using DruidExceptions in MSQ (changes related to the
Broker) (#14534)
c1c7dff2ad is described below
commit c1c7dff2ad082a110aaab3c2c9dc1678773f1a92
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Jul 13 19:08:49 2023 +0000
Using DruidExceptions in MSQ (changes related to the Broker) (#14534)
MSQ engine returns correct error codes for invalid user inputs in the query
context. Also, using DruidExceptions for MSQ related errors happening in the
Broker with improved error messages.
---
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 53 +++++++-----
.../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 77 +++++++++--------
.../msq/sql/resources/SqlStatementResource.java | 98 ++++++++++------------
.../druid/msq/util/MultiStageQueryContext.java | 49 ++++-------
.../org/apache/druid/msq/exec/MSQInsertTest.java | 25 +++---
.../msq/sql/SqlMSQStatementResourcePostTest.java | 59 +++++++------
.../druid/msq/sql/SqlStatementResourceTest.java | 2 +-
.../msq/test/CalciteSelectJoinQueryMSQTest.java | 2 -
.../druid/msq/util/MultiStageQueryContextTest.java | 14 ----
.../java/org/apache/druid/error/Forbidden.java | 68 +++++++++++++++
.../apache/druid/error/QueryExceptionCompat.java | 3 +-
.../java/org/apache/druid/query/QueryContext.java | 10 ---
.../java/org/apache/druid/error/ForbiddenTest.java | 59 +++++++++++++
.../external/ExternalOperatorConversion.java | 84 +++++++++++++------
.../druid/sql/calcite/run/NativeSqlEngine.java | 3 +-
.../apache/druid/sql/calcite/run/SqlEngines.java | 21 +++++
.../druid/sql/calcite/view/ViewSqlEngine.java | 4 +-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 93 +++++++++++++++++++-
.../druid/sql/calcite/IngestionTestSqlEngine.java | 4 +-
19 files changed, 481 insertions(+), 247 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index a5dc579419..772af8524e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -26,8 +26,8 @@ import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -48,7 +48,6 @@ import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
@@ -76,10 +75,6 @@ import java.util.stream.Collectors;
public class MSQTaskQueryMaker implements QueryMaker
{
-
- private static final String DESTINATION_DATASOURCE = "dataSource";
- private static final String DESTINATION_REPORT = "taskReport";
-
public static final String USER_KEY = "__user";
private static final Granularity DEFAULT_SEGMENT_GRANULARITY =
Granularities.ALL;
@@ -128,9 +123,6 @@ public class MSQTaskQueryMaker implements QueryMaker
MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext);
}
- final String ctxDestination =
-
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext));
-
Object segmentGranularity;
try {
segmentGranularity = Optional.ofNullable(plannerContext.queryContext()
@@ -138,15 +130,24 @@ public class MSQTaskQueryMaker implements QueryMaker
.orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY));
}
catch (JsonProcessingException e) {
- throw new IAE("Unable to deserialize the insert granularity. Please
retry the query with a valid "
- + "segment graularity");
+ // This would only be thrown if we are unable to serialize the
DEFAULT_SEGMENT_GRANULARITY, which we don't expect
+ // to happen
+ throw DruidException.defensive()
+ .build(
+ e,
+ "Unable to deserialize the
DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. "
+ + "This shouldn't have happened since the
DEFAULT_SEGMENT_GRANULARITY object is guaranteed to be "
+ + "serializable. Please raise an issue in case
you are seeing this message while executing a query."
+ );
}
final int maxNumTasks =
MultiStageQueryContext.getMaxNumTasks(sqlQueryContext);
if (maxNumTasks < 2) {
- throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS
- + " cannot be less than 2 since at least 1 controller and
1 worker is necessary.");
+ throw InvalidInput.exception(
+ "MSQ context maxNumTasks [%,d] cannot be less than 2, since at least
1 controller and 1 worker is necessary",
+ maxNumTasks
+ );
}
// This parameter is used internally for the number of worker tasks only,
so we subtract 1
@@ -202,16 +203,19 @@ public class MSQTaskQueryMaker implements QueryMaker
final MSQDestination destination;
if (targetDataSource != null) {
- if (ctxDestination != null &&
!DESTINATION_DATASOURCE.equals(ctxDestination)) {
- throw new IAE("Cannot INSERT with destination [%s]", ctxDestination);
- }
-
Granularity segmentGranularityObject;
try {
segmentGranularityObject = jsonMapper.readValue((String)
segmentGranularity, Granularity.class);
}
catch (Exception e) {
- throw new ISE("Unable to convert %s to a segment granularity",
segmentGranularity);
+ throw DruidException.defensive()
+ .build(
+ e,
+ "Unable to deserialize the provided
segmentGranularity [%s]. "
+ + "This is populated internally by Druid and
therefore should not occur. "
+ + "Please contact the developers if you are
seeing this error message.",
+ segmentGranularity
+ );
}
final List<String> segmentSortOrder =
MultiStageQueryContext.getSortOrder(sqlQueryContext);
@@ -228,16 +232,19 @@ public class MSQTaskQueryMaker implements QueryMaker
replaceTimeChunks
);
} else {
- if (ctxDestination != null &&
!DESTINATION_REPORT.equals(ctxDestination)) {
- throw new IAE("Cannot SELECT with destination [%s]", ctxDestination);
- }
final MSQSelectDestination msqSelectDestination =
MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
destination = TaskReportMSQDestination.instance();
} else if
(msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
destination = DurableStorageMSQDestination.instance();
} else {
- throw new IAE("Cannot SELECT with destination [%s]",
msqSelectDestination.name());
+ throw InvalidInput.exception(
+ "Unsupported select destination [%s] provided in the query
context. MSQ can currently write the select results to "
+ + "[%s] and [%s]",
+ msqSelectDestination.name(),
+ MSQSelectDestination.TASK_REPORT.toString(),
+ MSQSelectDestination.DURABLE_STORAGE.toString()
+ );
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index c5fe182ea9..37b8692cb4 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -30,15 +30,13 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.InvalidSqlInput;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
-import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
@@ -60,7 +58,6 @@ public class MSQTaskSqlEngine implements SqlEngine
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
- .add(MultiStageQueryContext.CTX_DESTINATION)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.build();
@@ -125,7 +122,7 @@ public class MSQTaskSqlEngine implements SqlEngine
case SCAN_NEEDS_SIGNATURE:
return true;
default:
- throw new IAE("Unrecognized feature: %s", feature);
+ throw
SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(),
feature);
}
}
@@ -133,9 +130,9 @@ public class MSQTaskSqlEngine implements SqlEngine
public QueryMaker buildQueryMakerForSelect(
final RelRoot relRoot,
final PlannerContext plannerContext
- ) throws ValidationException
+ )
{
- validateSelect(relRoot.fields, plannerContext);
+ validateSelect(plannerContext);
return new MSQTaskQueryMaker(
null,
@@ -156,7 +153,7 @@ public class MSQTaskSqlEngine implements SqlEngine
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
- ) throws ValidationException
+ )
{
validateInsert(relRoot.rel, relRoot.fields, plannerContext);
@@ -169,15 +166,23 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
- private static void validateSelect(
- final List<Pair<Integer, String>> fieldMappings,
- final PlannerContext plannerContext
- ) throws ValidationException
+ /**
+ * Checks if the SELECT contains {@link
DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
+ * defensive cheeck because {@link
org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
+ * {@link #validateContext}
+ */
+ private static void validateSelect(final PlannerContext plannerContext)
{
if
(plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY))
{
- throw new ValidationException(
- StringUtils.format("Cannot use \"%s\" without INSERT",
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
- );
+ throw DruidException
+ .forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.DEFENSIVE)
+ .build(
+ "The SELECT query's context contains invalid parameter [%s]
which is supposed to be populated "
+ + "by Druid for INSERT queries. If the user is seeing this
exception, that means there's a bug in Druid "
+ + "that is populating the query context with the segment's
granularity.",
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
+ );
}
}
@@ -185,7 +190,7 @@ public class MSQTaskSqlEngine implements SqlEngine
final RelNode rootRel,
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
- ) throws ValidationException
+ )
{
validateNoDuplicateAliases(fieldMappings);
@@ -199,12 +204,10 @@ public class MSQTaskSqlEngine implements SqlEngine
// Validate the __time field has the proper type.
final SqlTypeName timeType =
rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName();
if (timeType != SqlTypeName.TIMESTAMP) {
- throw new ValidationException(
- StringUtils.format(
- "Field \"%s\" must be of type TIMESTAMP (was %s)",
- ColumnHolder.TIME_COLUMN_NAME,
- timeType
- )
+ throw InvalidSqlInput.exception(
+ "Field [%s] was the wrong type [%s], expected TIMESTAMP",
+ ColumnHolder.TIME_COLUMN_NAME,
+ timeType
);
}
}
@@ -220,13 +223,18 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
catch (Exception e) {
- throw new ValidationException(
- StringUtils.format(
- "Invalid segmentGranularity: %s",
-
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
- ),
- e
- );
+ // This is a defensive check as the
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is
+ // populated by Druid. If the user entered an incorrect granularity,
that should have been flagged before reaching
+ // here
+ throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.DEFENSIVE)
+ .build(
+ e,
+ "[%s] is not a valid value for [%s]",
+
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY),
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
+ );
+
}
final boolean hasSegmentGranularity =
!Granularities.ALL.equals(segmentGranularity);
@@ -237,11 +245,10 @@ public class MSQTaskSqlEngine implements SqlEngine
validateLimitAndOffset(rootRel, !hasSegmentGranularity);
if (hasSegmentGranularity && timeFieldIndex < 0) {
- throw new ValidationException(
- StringUtils.format(
- "INSERT queries with segment granularity other than \"all\" must
have a \"%s\" field.",
- ColumnHolder.TIME_COLUMN_NAME
- )
+ throw InvalidInput.exception(
+ "The granularity [%s] specified in the PARTITIONED BY clause of the
INSERT query is different from ALL. "
+ + "Therefore, the query must specify a time column (named __time).",
+ segmentGranularity
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
index 2be55a2f84..7b92872ee6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
@@ -30,6 +30,8 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.ErrorResponse;
+import org.apache.druid.error.Forbidden;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.QueryExceptionCompat;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.guice.annotations.MSQ;
@@ -198,19 +200,15 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
- return buildNonOkResponse(
- DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.FORBIDDEN)
- .build(Access.DEFAULT_ERROR_MESSAGE)
- );
+ return buildNonOkResponse(Forbidden.exception());
}
// Calcite throws java.lang.AssertionError at various points in
planning/validation.
catch (AssertionError | Exception e) {
stmt.reporter().failed(e);
if (isDebug) {
- log.warn(e, "Failed to handle query: %s", sqlQueryId);
+ log.warn(e, "Failed to handle query [%s]", sqlQueryId);
} else {
- log.noStackTrace().warn(e, "Failed to handle query: %s", sqlQueryId);
+ log.noStackTrace().warn(e, "Failed to handle query [%s]", sqlQueryId);
}
return buildNonOkResponse(
DruidException.forPersona(DruidException.Persona.DEVELOPER)
@@ -260,17 +258,13 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
- return buildNonOkResponse(
- DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.FORBIDDEN)
- .build(Access.DEFAULT_ERROR_MESSAGE)
- );
+ return buildNonOkResponse(Forbidden.exception());
}
catch (Exception e) {
- log.warn(e, "Failed to handle query: %s", queryId);
+ log.warn(e, "Failed to handle query [%s]", queryId);
return
buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
- .build(e, "Failed to handle
query: [%s]", queryId));
+ .build(e, "Failed to handle
query [%s]", queryId));
}
}
@@ -345,17 +339,13 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
- return buildNonOkResponse(
- DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.FORBIDDEN)
- .build(Access.DEFAULT_ERROR_MESSAGE)
- );
+ return buildNonOkResponse(Forbidden.exception());
}
catch (Exception e) {
- log.warn(e, "Failed to handle query: %s", queryId);
+ log.warn(e, "Failed to handle query [%s]", queryId);
return
buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
- .build(e, "Failed to handle
query: [%s]", queryId));
+ .build(e, "Failed to handle
query [%s]", queryId));
}
}
@@ -412,17 +402,13 @@ public class SqlStatementResource
}
catch (ForbiddenException e) {
log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
- return buildNonOkResponse(
- DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.FORBIDDEN)
- .build(Access.DEFAULT_ERROR_MESSAGE)
- );
+ return buildNonOkResponse(Forbidden.exception());
}
catch (Exception e) {
- log.warn(e, "Failed to handle query: %s", queryId);
+ log.warn(e, "Failed to handle query [%s]", queryId);
return
buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
- .build(e, "Failed to handle
query: [%s]", queryId));
+ .build(e, "Failed to handle
query [%s]", queryId));
}
}
@@ -679,9 +665,9 @@ public class SqlStatementResource
if (msqControllerTask.getQuerySpec().getDestination() instanceof
TaskReportMSQDestination) {
// Results from task report are only present as one page.
if (page != null && page > 0) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build("Page number is out of range of the
results.");
+ throw InvalidInput.exception(
+ "Page number [%d] is out of the range of results", page
+ );
}
MSQTaskReportPayload msqTaskReportPayload =
jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
@@ -769,9 +755,7 @@ public class SqlStatementResource
return pageInfo;
}
}
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build("Invalid page id [%d] passed.", pageId);
+ throw InvalidInput.exception("Invalid page id [%d] passed.", pageId);
}
private void resultPusher(
@@ -832,7 +816,7 @@ public class SqlStatementResource
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
- "Query[%s] failed. Hit status api for more
details.",
+ "Query[%s] failed. Check the status api for more
details.",
queryId
);
} else {
@@ -842,22 +826,25 @@ public class SqlStatementResource
private void contextChecks(QueryContext queryContext)
{
- ExecutionMode executionMode = queryContext.getEnum(
- QueryContexts.CTX_EXECUTION_MODE,
- ExecutionMode.class,
- null
- );
+ ExecutionMode executionMode =
queryContext.getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class,
null);
+
+ if (executionMode == null) {
+ throw InvalidInput.exception(
+ "Execution mode is not provided to the SQL statement API. "
+ + "Please set [%s] to [%s] in the query context",
+ QueryContexts.CTX_EXECUTION_MODE,
+ ExecutionMode.ASYNC
+ );
+ }
+
if (ExecutionMode.ASYNC != executionMode) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build(
- StringUtils.format(
- "The statement sql api only supports sync
mode[%s]. Please set context parameter [%s=%s] in the context payload",
- ExecutionMode.ASYNC,
- QueryContexts.CTX_EXECUTION_MODE,
- ExecutionMode.ASYNC
- )
- );
+ throw InvalidInput.exception(
+ "The SQL statement API currently does not support the provided
execution mode [%s]. "
+ + "Please set [%s] to [%s] in the query context",
+ executionMode,
+ QueryContexts.CTX_EXECUTION_MODE,
+ ExecutionMode.ASYNC
+ );
}
MSQSelectDestination selectDestination =
MultiStageQueryContext.getSelectDestination(queryContext);
@@ -873,11 +860,12 @@ public class SqlStatementResource
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
StringUtils.format(
- "The statement sql api cannot read from
select destination [%s=%s] since its not configured. "
- + "Its recommended to configure durable
storage as it allows the user to fetch big results. "
- + "Please contact your cluster admin to
configure durable storage.",
-
MultiStageQueryContext.CTX_SELECT_DESTINATION,
- MSQSelectDestination.DURABLE_STORAGE.name()
+ "The SQL Statement API cannot read from the
select destination [%s] provided "
+ + "in the query context [%s] since it is not
configured. It is recommended to configure the durable storage "
+ + "as it allows the user to fetch large
result sets. Please contact your cluster admin to "
+ + "configure durable storage.",
+ MSQSelectDestination.DURABLE_STORAGE.name(),
+ MultiStageQueryContext.CTX_SELECT_DESTINATION
)
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index ca364be4c7..b3f012d2c6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -39,7 +39,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Locale;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -103,9 +102,6 @@ public class MultiStageQueryContext
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE =
"clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE =
ClusterStatisticsMergeMode.SEQUENTIAL.toString();
- public static final String CTX_DESTINATION = "destination";
- private static final String DEFAULT_DESTINATION = null;
-
public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
@@ -160,13 +156,10 @@ public class MultiStageQueryContext
public static ClusterStatisticsMergeMode
getClusterStatisticsMergeMode(QueryContext queryContext)
{
- return ClusterStatisticsMergeMode.valueOf(
- String.valueOf(
- queryContext.getString(
- CTX_CLUSTER_STATISTICS_MERGE_MODE,
- DEFAULT_CLUSTER_STATISTICS_MERGE_MODE
- )
- )
+ return QueryContexts.getAsEnum(
+ CTX_CLUSTER_STATISTICS_MERGE_MODE,
+ queryContext.getString(CTX_CLUSTER_STATISTICS_MERGE_MODE,
DEFAULT_CLUSTER_STATISTICS_MERGE_MODE),
+ ClusterStatisticsMergeMode.class
);
}
@@ -180,12 +173,11 @@ public class MultiStageQueryContext
public static WorkerAssignmentStrategy getAssignmentStrategy(final
QueryContext queryContext)
{
- String assignmentStrategyString = queryContext.getString(
+ return QueryContexts.getAsEnum(
CTX_TASK_ASSIGNMENT_STRATEGY,
- DEFAULT_TASK_ASSIGNMENT_STRATEGY
+ queryContext.getString(CTX_TASK_ASSIGNMENT_STRATEGY,
DEFAULT_TASK_ASSIGNMENT_STRATEGY),
+ WorkerAssignmentStrategy.class
);
-
- return WorkerAssignmentStrategy.fromString(assignmentStrategyString);
}
public static int getMaxNumTasks(final QueryContext queryContext)
@@ -196,14 +188,6 @@ public class MultiStageQueryContext
);
}
- public static Object getDestination(final QueryContext queryContext)
- {
- return queryContext.get(
- CTX_DESTINATION,
- DEFAULT_DESTINATION
- );
- }
-
public static int getRowsPerSegment(final QueryContext queryContext)
{
return queryContext.getInt(
@@ -214,22 +198,21 @@ public class MultiStageQueryContext
public static MSQSelectDestination getSelectDestination(final QueryContext
queryContext)
{
- return MSQSelectDestination.valueOf(
- queryContext.getString(
- CTX_SELECT_DESTINATION,
- DEFAULT_SELECT_DESTINATION
- ).toUpperCase(Locale.ENGLISH)
+ return QueryContexts.getAsEnum(
+ CTX_SELECT_DESTINATION,
+ queryContext.getString(CTX_SELECT_DESTINATION,
DEFAULT_SELECT_DESTINATION),
+ MSQSelectDestination.class
);
}
@Nullable
public static MSQSelectDestination getSelectDestinationOrNull(final
QueryContext queryContext)
{
- String selectDestination = queryContext.getString(CTX_SELECT_DESTINATION);
- if (selectDestination == null) {
- return null;
- }
- return
MSQSelectDestination.valueOf(selectDestination.toUpperCase(Locale.ENGLISH));
+ return QueryContexts.getAsEnum(
+ CTX_SELECT_DESTINATION,
+ queryContext.getString(CTX_SELECT_DESTINATION),
+ MSQSelectDestination.class
+ );
}
public static int getRowsInMemory(final QueryContext queryContext)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 63170e91d4..dd0d2ab611 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -26,6 +26,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -1024,11 +1025,13 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
- .setExpectedValidationErrorMatcher(CoreMatchers.allOf(
- CoreMatchers.instanceOf(DruidException.class),
- ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
- "Field \"__time\" must be of type TIMESTAMP"))
- ))
+ .setExpectedValidationErrorMatcher(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs("Field [__time] was the wrong type [VARCHAR],
expected TIMESTAMP")
+ )
.verifyPlanningErrors();
}
@@ -1106,11 +1109,13 @@ public class MSQInsertTest extends MSQTestBase
"insert into foo1 select __time, dim1 , count(*) as
cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered
by dim1")
.setQueryContext(localContext)
.setExpectedExecutionErrorMatcher(
- ThrowableMessageMatcher.hasMessage(
- CoreMatchers.startsWith(
- MultiStageQueryContext.CTX_MAX_NUM_TASKS
- + " cannot be less than 2 since at least 1
controller and 1 worker is necessary."
- )
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageIs(
+ "MSQ context maxNumTasks [1] cannot be less than
2, since at least 1 controller "
+ + "and 1 worker is necessary"
)
)
.verifyExecutionError();
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
index e5133ee3d2..70ac5386ba 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
@@ -138,24 +138,36 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
@Test
public void nonSupportedModes()
{
- for (ImmutableMap<?, ?> context : ImmutableList.of(ImmutableMap.of(
- QueryContexts.CTX_EXECUTION_MODE,
- ExecutionMode.SYNC.name()
- ), ImmutableMap.of())) {
- SqlStatementResourceTest.assertExceptionMessage(
- resource.doPost(new SqlQuery(
- "select * from foo",
- null,
- false,
- false,
- false,
- (Map<String, Object>) context,
- null
- ), SqlStatementResourceTest.makeOkRequest()),
- "The statement sql api only supports sync mode[ASYNC]. Please set
context parameter [executionMode=ASYNC] in the context payload",
- Response.Status.BAD_REQUEST
- );
- }
+
+ SqlStatementResourceTest.assertExceptionMessage(
+ resource.doPost(new SqlQuery(
+ "select * from foo",
+ null,
+ false,
+ false,
+ false,
+ ImmutableMap.of(),
+ null
+ ), SqlStatementResourceTest.makeOkRequest()),
+ "Execution mode is not provided to the SQL statement API. "
+ + "Please set [executionMode] to [ASYNC] in the query context",
+ Response.Status.BAD_REQUEST
+ );
+
+ SqlStatementResourceTest.assertExceptionMessage(
+ resource.doPost(new SqlQuery(
+ "select * from foo",
+ null,
+ false,
+ false,
+ false,
+ ImmutableMap.of(QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.SYNC.name()),
+ null
+ ), SqlStatementResourceTest.makeOkRequest()),
+ "The SQL statement API currently does not support the provided
execution mode [SYNC]. "
+ + "Please set [executionMode] to [ASYNC] in the query context",
+ Response.Status.BAD_REQUEST
+ );
}
@@ -260,13 +272,10 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
NilStorageConnector.getInstance()
);
- String errorMessage = StringUtils.format(
- "The statement sql api cannot read from select destination [%s=%s]
since its not configured. "
- + "Its recommended to configure durable storage as it allows the user
to fetch big results. "
- + "Please contact your cluster admin to configure durable storage.",
- MultiStageQueryContext.CTX_SELECT_DESTINATION,
- MSQSelectDestination.DURABLE_STORAGE.name()
- );
+ String errorMessage = "The SQL Statement API cannot read from the select
destination [DURABLE_STORAGE] provided in "
+ + "the query context [selectDestination] since it is
not configured. It is recommended to "
+ + "configure the durable storage as it allows the
user to fetch large result sets. "
+ + "Please contact your cluster admin to configure
durable storage.";
Map<String, Object> context = defaultAsyncContext();
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLE_STORAGE.name());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
index 51b64b7eed..049b57c340 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
@@ -783,7 +783,7 @@ public class SqlStatementResourceTest extends MSQTestBase
assertExceptionMessage(
resource.doGetResults(queryID, 0L, makeOkRequest()),
StringUtils.format(
- "Query[%s] failed. Hit status api for more details.",
+ "Query[%s] failed. Check the status api for more details.",
queryID
),
Response.Status.BAD_REQUEST
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
index df40b00a31..ab7b1ed7d7 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.tools.ValidationException;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
@@ -142,7 +141,6 @@ public class CalciteSelectJoinQueryMSQTest
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot,
PlannerContext plannerContext)
- throws ValidationException
{
plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM,
joinAlgorithm.toString());
return super.buildQueryMakerForSelect(relRoot, plannerContext);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 16aad3d7b9..6d93892674 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DESTINATION;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
@@ -142,19 +141,6 @@ public class MultiStageQueryContextTest
Assert.assertEquals(101,
MultiStageQueryContext.getMaxNumTasks(QueryContext.of(propertyMap)));
}
- @Test
- public void getDestination_noParameterSetReturnsDefaultValue()
- {
-
Assert.assertNull(MultiStageQueryContext.getDestination(QueryContext.empty()));
- }
-
- @Test
- public void getDestination_parameterSetReturnsCorrectValue()
- {
- Map<String, Object> propertyMap = ImmutableMap.of(CTX_DESTINATION,
"dataSource");
- Assert.assertEquals("dataSource",
MultiStageQueryContext.getDestination(QueryContext.of(propertyMap)));
- }
-
@Test
public void getRowsPerSegment_noParameterSetReturnsDefaultValue()
{
diff --git a/processing/src/main/java/org/apache/druid/error/Forbidden.java
b/processing/src/main/java/org/apache/druid/error/Forbidden.java
new file mode 100644
index 0000000000..13470d241c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/error/Forbidden.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+public class Forbidden extends DruidException.Failure
+{
+
+ public static DruidException exception()
+ {
+ return exception("Unauthorized");
+ }
+
+ public static DruidException exception(String msg, Object... args)
+ {
+ return exception(null, msg, args);
+ }
+
+ public static DruidException exception(Throwable t, String msg, Object...
args)
+ {
+ return DruidException.fromFailure(new Forbidden(t, msg, args));
+ }
+
+ private final Throwable t;
+ private final String msg;
+ private final Object[] args;
+
+ private Forbidden(
+ Throwable t,
+ String msg,
+ Object... args
+ )
+ {
+ super("forbidden");
+ this.t = t;
+ this.msg = msg;
+ this.args = args;
+ }
+
+ @Override
+ public DruidException makeException(DruidException.DruidExceptionBuilder bob)
+ {
+ bob = bob.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.FORBIDDEN);
+
+ if (t == null) {
+ return bob.build(msg, args);
+ } else {
+ return bob.build(t, msg, args);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
b/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
index 12e4905efa..9894208e9f 100644
--- a/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
+++ b/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
@@ -66,12 +66,11 @@ public class QueryExceptionCompat extends
DruidException.Failure
return DruidException.Category.RUNTIME_FAILURE;
case CANCELED:
return DruidException.Category.CANCELED;
- case UNKNOWN:
- return DruidException.Category.UNCATEGORIZED;
case UNSUPPORTED:
return DruidException.Category.UNSUPPORTED;
case TIMEOUT:
return DruidException.Category.TIMEOUT;
+ case UNKNOWN:
default:
return DruidException.Category.UNCATEGORIZED;
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 147ebbec18..403cef1fa4 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -106,16 +106,6 @@ public class QueryContext
return context.get(key);
}
- /**
- * Return a value as a generic {@code Object}, returning the default value
if the
- * context value is not set.
- */
- public Object get(String key, Object defaultValue)
- {
- final Object val = get(key);
- return val == null ? defaultValue : val;
- }
-
/**
* Return a value as an {@code String}, returning {@link null} if the
* context value is not set.
diff --git a/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java
b/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java
new file mode 100644
index 0000000000..90faeabe42
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+import org.apache.druid.matchers.DruidMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ForbiddenTest
+{
+
+ @Test
+ public void testAsErrorResponse()
+ {
+ ErrorResponse errorResponse = new ErrorResponse(Forbidden.exception());
+ final Map<String, Object> asMap = errorResponse.getAsMap();
+
+ MatcherAssert.assertThat(
+ asMap,
+ DruidMatchers.mapMatcher(
+ "error", "druidException",
+ "errorCode", "forbidden",
+ "persona", "USER",
+ "category", "FORBIDDEN",
+ "errorMessage", "Unauthorized"
+ )
+ );
+
+ ErrorResponse recomposed = ErrorResponse.fromMap(asMap);
+
+ MatcherAssert.assertThat(
+ recomposed.getUnderlyingException(),
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.FORBIDDEN,
+ "forbidden"
+ ).expectMessageContains("Unauthorized")
+ );
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 26fdd514e8..945e452941 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -30,6 +30,7 @@ import org.apache.druid.catalog.model.table.ExternalTableSpec;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.RowSignature;
@@ -90,44 +91,73 @@ public class ExternalOperatorConversion extends
DruidExternTableMacroConversion
final ObjectMapper jsonMapper
)
{
- try {
- final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM);
- if (sigValue == null && columns == null) {
- throw new IAE(
- "EXTERN requires either a %s value or an EXTEND clause",
- SIGNATURE_PARAM
- );
+ final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM);
+ if (sigValue == null && columns == null) {
+ throw InvalidInput.exception(
+ "EXTERN requires either a [%s] value or an EXTEND clause",
+ SIGNATURE_PARAM
+ );
+ }
+ if (sigValue != null && columns != null) {
+ throw InvalidInput.exception(
+ "EXTERN requires either a [%s] value or an EXTEND clause, but not
both",
+ SIGNATURE_PARAM
+ );
+ }
+ final RowSignature rowSignature;
+ if (columns != null) {
+ try {
+ rowSignature = Columns.convertSignature(columns);
}
- if (sigValue != null && columns != null) {
- throw new IAE(
- "EXTERN requires either a %s value or an EXTEND clause, but not
both",
- SIGNATURE_PARAM
- );
+ catch (IAE e) {
+ throw badArgumentException(e, "columns");
}
- final RowSignature rowSignature;
- if (columns != null) {
- rowSignature = Columns.convertSignature(columns);
- } else {
+ } else {
+ try {
rowSignature = jsonMapper.readValue(sigValue, RowSignature.class);
}
+ catch (JsonProcessingException e) {
+ throw badArgumentException(e, "rowSignature");
+ }
+ }
- String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
- InputSource inputSource = jsonMapper.readValue(inputSrcStr,
InputSource.class);
- return new ExternalTableSpec(
- inputSource,
- jsonMapper.readValue(CatalogUtils.getString(args,
INPUT_FORMAT_PARAM), InputFormat.class),
- rowSignature,
- inputSource::getTypes
- );
+ String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
+ InputSource inputSource;
+ try {
+ inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class);
}
catch (JsonProcessingException e) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build(e, e.getMessage());
+ throw badArgumentException(e, "inputSource");
}
+ InputFormat inputFormat;
+ try {
+ inputFormat = jsonMapper.readValue(CatalogUtils.getString(args,
INPUT_FORMAT_PARAM), InputFormat.class);
+ }
+ catch (JsonProcessingException e) {
+ throw badArgumentException(e, "inputFormat");
+ }
+ return new ExternalTableSpec(
+ inputSource,
+ inputFormat,
+ rowSignature,
+ inputSource::getTypes
+ );
}
}
+ private static DruidException badArgumentException(
+ Throwable cause,
+ String fieldName
+ )
+ {
+ return InvalidInput.exception(
+ cause,
+ "Invalid value for the field [%s]. Reason: [%s]",
+ fieldName,
+ cause.getMessage()
+ );
+ }
+
@Inject
public ExternalOperatorConversion(@Json final ObjectMapper jsonMapper)
{
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
index f5d9056246..d7fc7d043b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.server.QueryLifecycleFactory;
@@ -116,7 +115,7 @@ public class NativeSqlEngine implements SqlEngine
case SCAN_NEEDS_SIGNATURE:
return false;
default:
- throw new IAE("Unrecognized feature: %s", feature);
+ throw
SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(),
feature);
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
index cc7bef80f7..e8375feb3a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.run;
import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import java.util.Map;
@@ -46,4 +47,24 @@ public class SqlEngines
}
}
}
+
+ /**
+ * This is a helper function that provides a developer-friendly exception
when an engine cannot recognize the feature.
+ */
+ public static DruidException generateUnrecognizedFeatureException(
+ final String engineName,
+ final EngineFeature unrecognizedFeature
+ )
+ {
+ return DruidException
+ .forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.DEFENSIVE)
+ .build(
+ "Engine [%s] is unable to recognize the feature [%s] for
availability. This might happen when a "
+ + "newer feature is added without updating all the implementations
of SqlEngine(s) to either allow or disallow "
+ + "its availability. Please raise an issue if you encounter this
exception while using Druid.",
+ engineName,
+ unrecognizedFeature
+ );
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
index 740cac15ee..47dae5c4d9 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
@@ -22,11 +22,11 @@ package org.apache.druid.sql.calcite.view;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.run.SqlEngines;
import java.util.Map;
@@ -78,7 +78,7 @@ public class ViewSqlEngine implements SqlEngine
return false;
default:
- throw new IAE("Unrecognized feature: %s", feature);
+ throw
SqlEngines.generateUnrecognizedFeatureException(ViewSqlEngine.class.getSimpleName(),
feature);
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 1a19f9ddf7..433eb98ace 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -1635,11 +1635,96 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
- "general"
+ "invalidInput"
+ ).expectMessageContains(
+ "Cannot construct instance of
`org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be
provided and non-empty"
+ )
+ )
+ .verify();
+ }
+
+ @Test
+ public void testErrorWhenBothRowSignatureAndExtendsProvidedToExtern()
+ {
+ final String sqlString = "insert into dst \n"
+ + "select time_parse(\"time\") as __time, * \n"
+ + "from table( \n"
+ + "extern(\n"
+ + "'{\"type\": \"s3\", \"uris\":
[\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n"
+ + "'{\"type\": \"json\"}',\n"
+ + "'[{\"name\": \"time\", \"type\": \"string\"},
{\"name\": \"channel\", \"type\": \"string\"}]'\n"
+ + ")\n"
+ + ") EXTEND (\"time\" VARCHAR, \"channel\"
VARCHAR)\n"
+ + "partitioned by DAY\n"
+ + "clustered by channel";
+ HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+ context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
+ testIngestionQuery().context(context).sql(sqlString)
+ .expectValidationError(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageContains(
+ "EXTERN requires either a [signature] value or
an EXTEND clause, but not both"
+ )
+ )
+ .verify();
+ }
+
+ @Test
+ public void testErrorWhenNoneOfRowSignatureAndExtendsProvidedToExtern()
+ {
+ final String sqlString = "insert into dst \n"
+ + "select time_parse(\"time\") as __time, * \n"
+ + "from table( \n"
+ + "extern(\n"
+ + "'{\"type\": \"s3\", \"uris\":
[\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n"
+ + "'{\"type\": \"json\"}'\n"
+ + ")\n"
+ + ")\n"
+ + "partitioned by DAY\n"
+ + "clustered by channel";
+ HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+ context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
+ testIngestionQuery().context(context).sql(sqlString)
+ .expectValidationError(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageContains(
+ "EXTERN requires either a [signature] value or
an EXTEND clause"
+ )
+ )
+ .verify();
+ }
+
+ @Test
+ public void testErrorWhenInputSourceInvalid()
+ {
+ final String sqlString = "insert into dst \n"
+ + "select time_parse(\"time\") as __time, * \n"
+ + "from table( \n"
+ + "extern(\n"
+ + "'{\"type\": \"local\"}',\n"
+ + "'{\"type\": \"json\"}',\n"
+ + "'[{\"name\": \"time\", \"type\": \"string\"},
{\"name\": \"channel\", \"type\": \"string\"}]'\n"
+ + ")\n"
+ + ")\n"
+ + "partitioned by DAY\n"
+ + "clustered by channel";
+ HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+ context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
+ testIngestionQuery().context(context).sql(sqlString)
+ .expectValidationError(
+ new DruidExceptionMatcher(
+ DruidException.Persona.USER,
+ DruidException.Category.INVALID_INPUT,
+ "invalidInput"
+ ).expectMessageContains(
+ "Invalid value for the field [inputSource].
Reason:"
)
- .expectMessageContains(
- "Cannot construct instance of
`org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be
provided and non-empty"
- )
)
.verify();
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
index c1773ef62a..272fddbd8a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.run.SqlEngines;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.Map;
@@ -91,7 +91,7 @@ public class IngestionTestSqlEngine implements SqlEngine
case ALLOW_BROADCAST_RIGHTY_JOIN:
return true;
default:
- throw new IAE("Unrecognized feature: %s", feature);
+ throw
SqlEngines.generateUnrecognizedFeatureException(IngestionTestSqlEngine.class.getSimpleName(),
feature);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]