This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c1c7dff2ad Using DruidExceptions in MSQ (changes related to the 
Broker) (#14534)
c1c7dff2ad is described below

commit c1c7dff2ad082a110aaab3c2c9dc1678773f1a92
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Jul 13 19:08:49 2023 +0000

    Using DruidExceptions in MSQ (changes related to the Broker) (#14534)
    
    MSQ engine returns correct error codes for invalid user inputs in the query 
context. Also, using DruidExceptions for MSQ related errors happening in the 
Broker with improved error messages.
---
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    | 53 +++++++-----
 .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 77 +++++++++--------
 .../msq/sql/resources/SqlStatementResource.java    | 98 ++++++++++------------
 .../druid/msq/util/MultiStageQueryContext.java     | 49 ++++-------
 .../org/apache/druid/msq/exec/MSQInsertTest.java   | 25 +++---
 .../msq/sql/SqlMSQStatementResourcePostTest.java   | 59 +++++++------
 .../druid/msq/sql/SqlStatementResourceTest.java    |  2 +-
 .../msq/test/CalciteSelectJoinQueryMSQTest.java    |  2 -
 .../druid/msq/util/MultiStageQueryContextTest.java | 14 ----
 .../java/org/apache/druid/error/Forbidden.java     | 68 +++++++++++++++
 .../apache/druid/error/QueryExceptionCompat.java   |  3 +-
 .../java/org/apache/druid/query/QueryContext.java  | 10 ---
 .../java/org/apache/druid/error/ForbiddenTest.java | 59 +++++++++++++
 .../external/ExternalOperatorConversion.java       | 84 +++++++++++++------
 .../druid/sql/calcite/run/NativeSqlEngine.java     |  3 +-
 .../apache/druid/sql/calcite/run/SqlEngines.java   | 21 +++++
 .../druid/sql/calcite/view/ViewSqlEngine.java      |  4 +-
 .../druid/sql/calcite/CalciteInsertDmlTest.java    | 93 +++++++++++++++++++-
 .../druid/sql/calcite/IngestionTestSqlEngine.java  |  4 +-
 19 files changed, 481 insertions(+), 247 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index a5dc579419..772af8524e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -26,8 +26,8 @@ import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
 import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -48,7 +48,6 @@ import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.server.QueryResponse;
@@ -76,10 +75,6 @@ import java.util.stream.Collectors;
 
 public class MSQTaskQueryMaker implements QueryMaker
 {
-
-  private static final String DESTINATION_DATASOURCE = "dataSource";
-  private static final String DESTINATION_REPORT = "taskReport";
-
   public static final String USER_KEY = "__user";
 
   private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.ALL;
@@ -128,9 +123,6 @@ public class MSQTaskQueryMaker implements QueryMaker
       MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext);
     }
 
-    final String ctxDestination =
-        
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext));
-
     Object segmentGranularity;
     try {
       segmentGranularity = Optional.ofNullable(plannerContext.queryContext()
@@ -138,15 +130,24 @@ public class MSQTaskQueryMaker implements QueryMaker
                                    
.orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY));
     }
     catch (JsonProcessingException e) {
-      throw new IAE("Unable to deserialize the insert granularity. Please 
retry the query with a valid "
-                    + "segment graularity");
+      // This would only be thrown if we are unable to serialize the 
DEFAULT_SEGMENT_GRANULARITY, which we don't expect
+      // to happen
+      throw DruidException.defensive()
+                          .build(
+                              e,
+                              "Unable to deserialize the 
DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. "
+                              + "This shouldn't have happened since the 
DEFAULT_SEGMENT_GRANULARITY object is guaranteed to be "
+                              + "serializable. Please raise an issue in case 
you are seeing this message while executing a query."
+                          );
     }
 
     final int maxNumTasks = 
MultiStageQueryContext.getMaxNumTasks(sqlQueryContext);
 
     if (maxNumTasks < 2) {
-      throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS
-                    + " cannot be less than 2 since at least 1 controller and 
1 worker is necessary.");
+      throw InvalidInput.exception(
+          "MSQ context maxNumTasks [%,d] cannot be less than 2, since at least 
1 controller and 1 worker is necessary",
+          maxNumTasks
+      );
     }
 
     // This parameter is used internally for the number of worker tasks only, 
so we subtract 1
@@ -202,16 +203,19 @@ public class MSQTaskQueryMaker implements QueryMaker
     final MSQDestination destination;
 
     if (targetDataSource != null) {
-      if (ctxDestination != null && 
!DESTINATION_DATASOURCE.equals(ctxDestination)) {
-        throw new IAE("Cannot INSERT with destination [%s]", ctxDestination);
-      }
-
       Granularity segmentGranularityObject;
       try {
         segmentGranularityObject = jsonMapper.readValue((String) 
segmentGranularity, Granularity.class);
       }
       catch (Exception e) {
-        throw new ISE("Unable to convert %s to a segment granularity", 
segmentGranularity);
+        throw DruidException.defensive()
+                            .build(
+                                e,
+                                "Unable to deserialize the provided 
segmentGranularity [%s]. "
+                                + "This is populated internally by Druid and 
therefore should not occur. "
+                                + "Please contact the developers if you are 
seeing this error message.",
+                                segmentGranularity
+                            );
       }
 
       final List<String> segmentSortOrder = 
MultiStageQueryContext.getSortOrder(sqlQueryContext);
@@ -228,16 +232,19 @@ public class MSQTaskQueryMaker implements QueryMaker
           replaceTimeChunks
       );
     } else {
-      if (ctxDestination != null && 
!DESTINATION_REPORT.equals(ctxDestination)) {
-        throw new IAE("Cannot SELECT with destination [%s]", ctxDestination);
-      }
       final MSQSelectDestination msqSelectDestination = 
MultiStageQueryContext.getSelectDestination(sqlQueryContext);
       if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
         destination = TaskReportMSQDestination.instance();
       } else if 
(msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
         destination = DurableStorageMSQDestination.instance();
       } else {
-        throw new IAE("Cannot SELECT with destination [%s]", 
msqSelectDestination.name());
+        throw InvalidInput.exception(
+            "Unsupported select destination [%s] provided in the query 
context. MSQ can currently write the select results to "
+            + "[%s] and [%s]",
+            msqSelectDestination.name(),
+            MSQSelectDestination.TASK_REPORT.toString(),
+            MSQSelectDestination.DURABLE_STORAGE.toString()
+        );
       }
     }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index c5fe182ea9..37b8692cb4 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -30,15 +30,13 @@ import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Pair;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.error.InvalidSqlInput;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.msq.querykit.QueryKitUtils;
-import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
@@ -60,7 +58,6 @@ public class MSQTaskSqlEngine implements SqlEngine
   public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
       ImmutableSet.<String>builder()
                   .addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
-                  .add(MultiStageQueryContext.CTX_DESTINATION)
                   .add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
                   .build();
 
@@ -125,7 +122,7 @@ public class MSQTaskSqlEngine implements SqlEngine
       case SCAN_NEEDS_SIGNATURE:
         return true;
       default:
-        throw new IAE("Unrecognized feature: %s", feature);
+        throw 
SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(),
 feature);
     }
   }
 
@@ -133,9 +130,9 @@ public class MSQTaskSqlEngine implements SqlEngine
   public QueryMaker buildQueryMakerForSelect(
       final RelRoot relRoot,
       final PlannerContext plannerContext
-  ) throws ValidationException
+  )
   {
-    validateSelect(relRoot.fields, plannerContext);
+    validateSelect(plannerContext);
 
     return new MSQTaskQueryMaker(
         null,
@@ -156,7 +153,7 @@ public class MSQTaskSqlEngine implements SqlEngine
       final String targetDataSource,
       final RelRoot relRoot,
       final PlannerContext plannerContext
-  ) throws ValidationException
+  )
   {
     validateInsert(relRoot.rel, relRoot.fields, plannerContext);
 
@@ -169,15 +166,23 @@ public class MSQTaskSqlEngine implements SqlEngine
     );
   }
 
-  private static void validateSelect(
-      final List<Pair<Integer, String>> fieldMappings,
-      final PlannerContext plannerContext
-  ) throws ValidationException
+  /**
+   * Checks if the SELECT contains {@link 
DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
+   * defensive cheeck because {@link 
org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
+   * {@link #validateContext}
+   */
+  private static void validateSelect(final PlannerContext plannerContext)
   {
     if 
(plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY))
 {
-      throw new ValidationException(
-          StringUtils.format("Cannot use \"%s\" without INSERT", 
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
-      );
+      throw DruidException
+          .forPersona(DruidException.Persona.DEVELOPER)
+          .ofCategory(DruidException.Category.DEFENSIVE)
+          .build(
+              "The SELECT query's context contains invalid parameter [%s] 
which is supposed to be populated "
+              + "by Druid for INSERT queries. If the user is seeing this 
exception, that means there's a bug in Druid "
+              + "that is populating the query context with the segment's 
granularity.",
+              DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
+          );
     }
   }
 
@@ -185,7 +190,7 @@ public class MSQTaskSqlEngine implements SqlEngine
       final RelNode rootRel,
       final List<Pair<Integer, String>> fieldMappings,
       final PlannerContext plannerContext
-  ) throws ValidationException
+  )
   {
     validateNoDuplicateAliases(fieldMappings);
 
@@ -199,12 +204,10 @@ public class MSQTaskSqlEngine implements SqlEngine
         // Validate the __time field has the proper type.
         final SqlTypeName timeType = 
rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName();
         if (timeType != SqlTypeName.TIMESTAMP) {
-          throw new ValidationException(
-              StringUtils.format(
-                  "Field \"%s\" must be of type TIMESTAMP (was %s)",
-                  ColumnHolder.TIME_COLUMN_NAME,
-                  timeType
-              )
+          throw InvalidSqlInput.exception(
+              "Field [%s] was the wrong type [%s], expected TIMESTAMP",
+              ColumnHolder.TIME_COLUMN_NAME,
+              timeType
           );
         }
       }
@@ -220,13 +223,18 @@ public class MSQTaskSqlEngine implements SqlEngine
       );
     }
     catch (Exception e) {
-      throw new ValidationException(
-          StringUtils.format(
-              "Invalid segmentGranularity: %s",
-              
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
-          ),
-          e
-      );
+      // This is a defensive check as the 
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is
+      // populated by Druid. If the user entered an incorrect granularity, 
that should have been flagged before reaching
+      // here
+      throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+                          .ofCategory(DruidException.Category.DEFENSIVE)
+                          .build(
+                              e,
+                              "[%s] is not a valid value for [%s]",
+                              
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY),
+                              DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
+                          );
+
     }
 
     final boolean hasSegmentGranularity = 
!Granularities.ALL.equals(segmentGranularity);
@@ -237,11 +245,10 @@ public class MSQTaskSqlEngine implements SqlEngine
     validateLimitAndOffset(rootRel, !hasSegmentGranularity);
 
     if (hasSegmentGranularity && timeFieldIndex < 0) {
-      throw new ValidationException(
-          StringUtils.format(
-              "INSERT queries with segment granularity other than \"all\" must 
have a \"%s\" field.",
-              ColumnHolder.TIME_COLUMN_NAME
-          )
+      throw InvalidInput.exception(
+          "The granularity [%s] specified in the PARTITIONED BY clause of the 
INSERT query is different from ALL. "
+          + "Therefore, the query must specify a time column (named __time).",
+          segmentGranularity
       );
     }
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
index 2be55a2f84..7b92872ee6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
@@ -30,6 +30,8 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.ErrorResponse;
+import org.apache.druid.error.Forbidden;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.error.QueryExceptionCompat;
 import org.apache.druid.frame.channel.FrameChannelSequence;
 import org.apache.druid.guice.annotations.MSQ;
@@ -198,19 +200,15 @@ public class SqlStatementResource
     }
     catch (ForbiddenException e) {
       log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
-      return buildNonOkResponse(
-          DruidException.forPersona(DruidException.Persona.USER)
-                        .ofCategory(DruidException.Category.FORBIDDEN)
-                        .build(Access.DEFAULT_ERROR_MESSAGE)
-      );
+      return buildNonOkResponse(Forbidden.exception());
     }
     // Calcite throws java.lang.AssertionError at various points in 
planning/validation.
     catch (AssertionError | Exception e) {
       stmt.reporter().failed(e);
       if (isDebug) {
-        log.warn(e, "Failed to handle query: %s", sqlQueryId);
+        log.warn(e, "Failed to handle query [%s]", sqlQueryId);
       } else {
-        log.noStackTrace().warn(e, "Failed to handle query: %s", sqlQueryId);
+        log.noStackTrace().warn(e, "Failed to handle query [%s]", sqlQueryId);
       }
       return buildNonOkResponse(
           DruidException.forPersona(DruidException.Persona.DEVELOPER)
@@ -260,17 +258,13 @@ public class SqlStatementResource
     }
     catch (ForbiddenException e) {
       log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
-      return buildNonOkResponse(
-          DruidException.forPersona(DruidException.Persona.USER)
-                        .ofCategory(DruidException.Category.FORBIDDEN)
-                        .build(Access.DEFAULT_ERROR_MESSAGE)
-      );
+      return buildNonOkResponse(Forbidden.exception());
     }
     catch (Exception e) {
-      log.warn(e, "Failed to handle query: %s", queryId);
+      log.warn(e, "Failed to handle query [%s]", queryId);
       return 
buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
                                               
.ofCategory(DruidException.Category.UNCATEGORIZED)
-                                              .build(e, "Failed to handle 
query: [%s]", queryId));
+                                              .build(e, "Failed to handle 
query [%s]", queryId));
     }
   }
 
@@ -345,17 +339,13 @@ public class SqlStatementResource
     }
     catch (ForbiddenException e) {
       log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
-      return buildNonOkResponse(
-          DruidException.forPersona(DruidException.Persona.USER)
-                        .ofCategory(DruidException.Category.FORBIDDEN)
-                        .build(Access.DEFAULT_ERROR_MESSAGE)
-      );
+      return buildNonOkResponse(Forbidden.exception());
     }
     catch (Exception e) {
-      log.warn(e, "Failed to handle query: %s", queryId);
+      log.warn(e, "Failed to handle query [%s]", queryId);
       return 
buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
                                               
.ofCategory(DruidException.Category.UNCATEGORIZED)
-                                              .build(e, "Failed to handle 
query: [%s]", queryId));
+                                              .build(e, "Failed to handle 
query [%s]", queryId));
     }
   }
 
@@ -412,17 +402,13 @@ public class SqlStatementResource
     }
     catch (ForbiddenException e) {
       log.debug("Got forbidden request for reason [%s]", e.getErrorMessage());
-      return buildNonOkResponse(
-          DruidException.forPersona(DruidException.Persona.USER)
-                        .ofCategory(DruidException.Category.FORBIDDEN)
-                        .build(Access.DEFAULT_ERROR_MESSAGE)
-      );
+      return buildNonOkResponse(Forbidden.exception());
     }
     catch (Exception e) {
-      log.warn(e, "Failed to handle query: %s", queryId);
+      log.warn(e, "Failed to handle query [%s]", queryId);
       return 
buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER)
                                               
.ofCategory(DruidException.Category.UNCATEGORIZED)
-                                              .build(e, "Failed to handle 
query: [%s]", queryId));
+                                              .build(e, "Failed to handle 
query [%s]", queryId));
     }
   }
 
@@ -679,9 +665,9 @@ public class SqlStatementResource
     if (msqControllerTask.getQuerySpec().getDestination() instanceof 
TaskReportMSQDestination) {
       // Results from task report are only present as one page.
       if (page != null && page > 0) {
-        throw DruidException.forPersona(DruidException.Persona.USER)
-                            .ofCategory(DruidException.Category.INVALID_INPUT)
-                            .build("Page number is out of range of the 
results.");
+        throw InvalidInput.exception(
+            "Page number [%d] is out of the range of results", page
+        );
       }
 
       MSQTaskReportPayload msqTaskReportPayload = 
jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
@@ -769,9 +755,7 @@ public class SqlStatementResource
         return pageInfo;
       }
     }
-    throw DruidException.forPersona(DruidException.Persona.USER)
-                        .ofCategory(DruidException.Category.INVALID_INPUT)
-                        .build("Invalid page id [%d] passed.", pageId);
+    throw InvalidInput.exception("Invalid page id [%d] passed.", pageId);
   }
 
   private void resultPusher(
@@ -832,7 +816,7 @@ public class SqlStatementResource
       throw DruidException.forPersona(DruidException.Persona.USER)
                           .ofCategory(DruidException.Category.INVALID_INPUT)
                           .build(
-                              "Query[%s] failed. Hit status api for more 
details.",
+                              "Query[%s] failed. Check the status api for more 
details.",
                               queryId
                           );
     } else {
@@ -842,22 +826,25 @@ public class SqlStatementResource
 
   private void contextChecks(QueryContext queryContext)
   {
-    ExecutionMode executionMode = queryContext.getEnum(
-        QueryContexts.CTX_EXECUTION_MODE,
-        ExecutionMode.class,
-        null
-    );
+    ExecutionMode executionMode = 
queryContext.getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class, 
null);
+
+    if (executionMode == null) {
+      throw InvalidInput.exception(
+          "Execution mode is not provided to the SQL statement API. "
+          + "Please set [%s] to [%s] in the query context",
+          QueryContexts.CTX_EXECUTION_MODE,
+          ExecutionMode.ASYNC
+      );
+    }
+
     if (ExecutionMode.ASYNC != executionMode) {
-      throw DruidException.forPersona(DruidException.Persona.USER)
-                          .ofCategory(DruidException.Category.INVALID_INPUT)
-                          .build(
-                              StringUtils.format(
-                                  "The statement sql api only supports sync 
mode[%s]. Please set context parameter [%s=%s] in the context payload",
-                                  ExecutionMode.ASYNC,
-                                  QueryContexts.CTX_EXECUTION_MODE,
-                                  ExecutionMode.ASYNC
-                              )
-                          );
+      throw InvalidInput.exception(
+          "The SQL statement API currently does not support the provided 
execution mode [%s]. "
+          + "Please set [%s] to [%s] in the query context",
+          executionMode,
+          QueryContexts.CTX_EXECUTION_MODE,
+          ExecutionMode.ASYNC
+      );
     }
 
     MSQSelectDestination selectDestination = 
MultiStageQueryContext.getSelectDestination(queryContext);
@@ -873,11 +860,12 @@ public class SqlStatementResource
                           .ofCategory(DruidException.Category.INVALID_INPUT)
                           .build(
                               StringUtils.format(
-                                  "The statement sql api cannot read from 
select destination [%s=%s] since its not configured. "
-                                  + "Its recommended to configure durable 
storage as it allows the user to fetch big results. "
-                                  + "Please contact your cluster admin to 
configure durable storage.",
-                                  
MultiStageQueryContext.CTX_SELECT_DESTINATION,
-                                  MSQSelectDestination.DURABLE_STORAGE.name()
+                                  "The SQL Statement API cannot read from the 
select destination [%s] provided "
+                                  + "in the query context [%s] since it is not 
configured. It is recommended to configure the durable storage "
+                                  + "as it allows the user to fetch large 
result sets. Please contact your cluster admin to "
+                                  + "configure durable storage.",
+                                  MSQSelectDestination.DURABLE_STORAGE.name(),
+                                  MultiStageQueryContext.CTX_SELECT_DESTINATION
                               )
                           );
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index ca364be4c7..b3f012d2c6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -39,7 +39,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Locale;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -103,9 +102,6 @@ public class MultiStageQueryContext
   public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = 
"clusterStatisticsMergeMode";
   public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = 
ClusterStatisticsMergeMode.SEQUENTIAL.toString();
 
-  public static final String CTX_DESTINATION = "destination";
-  private static final String DEFAULT_DESTINATION = null;
-
   public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
   static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
 
@@ -160,13 +156,10 @@ public class MultiStageQueryContext
 
   public static ClusterStatisticsMergeMode 
getClusterStatisticsMergeMode(QueryContext queryContext)
   {
-    return ClusterStatisticsMergeMode.valueOf(
-        String.valueOf(
-            queryContext.getString(
-                CTX_CLUSTER_STATISTICS_MERGE_MODE,
-                DEFAULT_CLUSTER_STATISTICS_MERGE_MODE
-            )
-        )
+    return QueryContexts.getAsEnum(
+        CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        queryContext.getString(CTX_CLUSTER_STATISTICS_MERGE_MODE, 
DEFAULT_CLUSTER_STATISTICS_MERGE_MODE),
+        ClusterStatisticsMergeMode.class
     );
   }
 
@@ -180,12 +173,11 @@ public class MultiStageQueryContext
 
   public static WorkerAssignmentStrategy getAssignmentStrategy(final 
QueryContext queryContext)
   {
-    String assignmentStrategyString = queryContext.getString(
+    return QueryContexts.getAsEnum(
         CTX_TASK_ASSIGNMENT_STRATEGY,
-        DEFAULT_TASK_ASSIGNMENT_STRATEGY
+        queryContext.getString(CTX_TASK_ASSIGNMENT_STRATEGY, 
DEFAULT_TASK_ASSIGNMENT_STRATEGY),
+        WorkerAssignmentStrategy.class
     );
-
-    return WorkerAssignmentStrategy.fromString(assignmentStrategyString);
   }
 
   public static int getMaxNumTasks(final QueryContext queryContext)
@@ -196,14 +188,6 @@ public class MultiStageQueryContext
     );
   }
 
-  public static Object getDestination(final QueryContext queryContext)
-  {
-    return queryContext.get(
-        CTX_DESTINATION,
-        DEFAULT_DESTINATION
-    );
-  }
-
   public static int getRowsPerSegment(final QueryContext queryContext)
   {
     return queryContext.getInt(
@@ -214,22 +198,21 @@ public class MultiStageQueryContext
 
   public static MSQSelectDestination getSelectDestination(final QueryContext 
queryContext)
   {
-    return MSQSelectDestination.valueOf(
-        queryContext.getString(
-            CTX_SELECT_DESTINATION,
-            DEFAULT_SELECT_DESTINATION
-        ).toUpperCase(Locale.ENGLISH)
+    return QueryContexts.getAsEnum(
+        CTX_SELECT_DESTINATION,
+        queryContext.getString(CTX_SELECT_DESTINATION, 
DEFAULT_SELECT_DESTINATION),
+        MSQSelectDestination.class
     );
   }
 
   @Nullable
   public static MSQSelectDestination getSelectDestinationOrNull(final 
QueryContext queryContext)
   {
-    String selectDestination = queryContext.getString(CTX_SELECT_DESTINATION);
-    if (selectDestination == null) {
-      return null;
-    }
-    return 
MSQSelectDestination.valueOf(selectDestination.toUpperCase(Locale.ENGLISH));
+    return QueryContexts.getAsEnum(
+        CTX_SELECT_DESTINATION,
+        queryContext.getString(CTX_SELECT_DESTINATION),
+        MSQSelectDestination.class
+    );
   }
 
   public static int getRowsInMemory(final QueryContext queryContext)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 63170e91d4..dd0d2ab611 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -26,6 +26,7 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.hll.HyperLogLogCollector;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -1024,11 +1025,13 @@ public class MSQInsertTest extends MSQTestBase
         .setExpectedDataSource("foo1")
         .setExpectedRowSignature(rowSignature)
         .setQueryContext(context)
-        .setExpectedValidationErrorMatcher(CoreMatchers.allOf(
-            CoreMatchers.instanceOf(DruidException.class),
-            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
-                "Field \"__time\" must be of type TIMESTAMP"))
-        ))
+        .setExpectedValidationErrorMatcher(
+            new DruidExceptionMatcher(
+                DruidException.Persona.USER,
+                DruidException.Category.INVALID_INPUT,
+                "invalidInput"
+            ).expectMessageIs("Field [__time] was the wrong type [VARCHAR], 
expected TIMESTAMP")
+        )
         .verifyPlanningErrors();
   }
 
@@ -1106,11 +1109,13 @@ public class MSQInsertTest extends MSQTestBase
                          "insert into foo1 select  __time, dim1 , count(*) as 
cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered 
by dim1")
                      .setQueryContext(localContext)
                      .setExpectedExecutionErrorMatcher(
-                         ThrowableMessageMatcher.hasMessage(
-                             CoreMatchers.startsWith(
-                                 MultiStageQueryContext.CTX_MAX_NUM_TASKS
-                                 + " cannot be less than 2 since at least 1 
controller and 1 worker is necessary."
-                             )
+                         new DruidExceptionMatcher(
+                             DruidException.Persona.USER,
+                             DruidException.Category.INVALID_INPUT,
+                             "invalidInput"
+                         ).expectMessageIs(
+                             "MSQ context maxNumTasks [1] cannot be less than 
2, since at least 1 controller "
+                             + "and 1 worker is necessary"
                          )
                      )
                      .verifyExecutionError();
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
index e5133ee3d2..70ac5386ba 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java
@@ -138,24 +138,36 @@ public class SqlMSQStatementResourcePostTest extends 
MSQTestBase
   @Test
   public void nonSupportedModes()
   {
-    for (ImmutableMap<?, ?> context : ImmutableList.of(ImmutableMap.of(
-        QueryContexts.CTX_EXECUTION_MODE,
-        ExecutionMode.SYNC.name()
-    ), ImmutableMap.of())) {
-      SqlStatementResourceTest.assertExceptionMessage(
-          resource.doPost(new SqlQuery(
-              "select * from foo",
-              null,
-              false,
-              false,
-              false,
-              (Map<String, Object>) context,
-              null
-          ), SqlStatementResourceTest.makeOkRequest()),
-          "The statement sql api only supports sync mode[ASYNC]. Please set 
context parameter [executionMode=ASYNC] in the context payload",
-          Response.Status.BAD_REQUEST
-      );
-    }
+
+    SqlStatementResourceTest.assertExceptionMessage(
+        resource.doPost(new SqlQuery(
+            "select * from foo",
+            null,
+            false,
+            false,
+            false,
+            ImmutableMap.of(),
+            null
+        ), SqlStatementResourceTest.makeOkRequest()),
+        "Execution mode is not provided to the SQL statement API. "
+        + "Please set [executionMode] to [ASYNC] in the query context",
+        Response.Status.BAD_REQUEST
+    );
+
+    SqlStatementResourceTest.assertExceptionMessage(
+        resource.doPost(new SqlQuery(
+            "select * from foo",
+            null,
+            false,
+            false,
+            false,
+            ImmutableMap.of(QueryContexts.CTX_EXECUTION_MODE, 
ExecutionMode.SYNC.name()),
+            null
+        ), SqlStatementResourceTest.makeOkRequest()),
+        "The SQL statement API currently does not support the provided 
execution mode [SYNC]. "
+        + "Please set [executionMode] to [ASYNC] in the query context",
+        Response.Status.BAD_REQUEST
+    );
   }
 
 
@@ -260,13 +272,10 @@ public class SqlMSQStatementResourcePostTest extends 
MSQTestBase
         NilStorageConnector.getInstance()
     );
 
-    String errorMessage = StringUtils.format(
-        "The statement sql api cannot read from select destination [%s=%s] 
since its not configured. "
-        + "Its recommended to configure durable storage as it allows the user 
to fetch big results. "
-        + "Please contact your cluster admin to configure durable storage.",
-        MultiStageQueryContext.CTX_SELECT_DESTINATION,
-        MSQSelectDestination.DURABLE_STORAGE.name()
-    );
+    String errorMessage = "The SQL Statement API cannot read from the select 
destination [DURABLE_STORAGE] provided in "
+                          + "the query context [selectDestination] since it is 
not configured. It is recommended to "
+                          + "configure the durable storage as it allows the 
user to fetch large result sets. "
+                          + "Please contact your cluster admin to configure 
durable storage.";
     Map<String, Object> context = defaultAsyncContext();
     context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, 
MSQSelectDestination.DURABLE_STORAGE.name());
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
index 51b64b7eed..049b57c340 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java
@@ -783,7 +783,7 @@ public class SqlStatementResourceTest extends MSQTestBase
       assertExceptionMessage(
           resource.doGetResults(queryID, 0L, makeOkRequest()),
           StringUtils.format(
-              "Query[%s] failed. Hit status api for more details.",
+              "Query[%s] failed. Check the status api for more details.",
               queryID
           ),
           Response.Status.BAD_REQUEST
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
index df40b00a31..ab7b1ed7d7 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.tools.ValidationException;
 import org.apache.druid.guice.DruidInjectorBuilder;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.sql.MSQTaskSqlEngine;
@@ -142,7 +141,6 @@ public class CalciteSelectJoinQueryMSQTest
 
         @Override
         public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, 
PlannerContext plannerContext)
-            throws ValidationException
         {
           
plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, 
joinAlgorithm.toString());
           return super.buildQueryMakerForSelect(relRoot, plannerContext);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 16aad3d7b9..6d93892674 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -39,7 +39,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DESTINATION;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
@@ -142,19 +141,6 @@ public class MultiStageQueryContextTest
     Assert.assertEquals(101, 
MultiStageQueryContext.getMaxNumTasks(QueryContext.of(propertyMap)));
   }
 
-  @Test
-  public void getDestination_noParameterSetReturnsDefaultValue()
-  {
-    
Assert.assertNull(MultiStageQueryContext.getDestination(QueryContext.empty()));
-  }
-
-  @Test
-  public void getDestination_parameterSetReturnsCorrectValue()
-  {
-    Map<String, Object> propertyMap = ImmutableMap.of(CTX_DESTINATION, 
"dataSource");
-    Assert.assertEquals("dataSource", 
MultiStageQueryContext.getDestination(QueryContext.of(propertyMap)));
-  }
-
   @Test
   public void getRowsPerSegment_noParameterSetReturnsDefaultValue()
   {
diff --git a/processing/src/main/java/org/apache/druid/error/Forbidden.java 
b/processing/src/main/java/org/apache/druid/error/Forbidden.java
new file mode 100644
index 0000000000..13470d241c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/error/Forbidden.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+public class Forbidden extends DruidException.Failure
+{
+
+  public static DruidException exception()
+  {
+    return exception("Unauthorized");
+  }
+
+  public static DruidException exception(String msg, Object... args)
+  {
+    return exception(null, msg, args);
+  }
+
+  public static DruidException exception(Throwable t, String msg, Object... 
args)
+  {
+    return DruidException.fromFailure(new Forbidden(t, msg, args));
+  }
+
+  private final Throwable t;
+  private final String msg;
+  private final Object[] args;
+
+  private Forbidden(
+      Throwable t,
+      String msg,
+      Object... args
+  )
+  {
+    super("forbidden");
+    this.t = t;
+    this.msg = msg;
+    this.args = args;
+  }
+
+  @Override
+  public DruidException makeException(DruidException.DruidExceptionBuilder bob)
+  {
+    bob = bob.forPersona(DruidException.Persona.USER)
+             .ofCategory(DruidException.Category.FORBIDDEN);
+
+    if (t == null) {
+      return bob.build(msg, args);
+    } else {
+      return bob.build(t, msg, args);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java 
b/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
index 12e4905efa..9894208e9f 100644
--- a/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
+++ b/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java
@@ -66,12 +66,11 @@ public class QueryExceptionCompat extends 
DruidException.Failure
         return DruidException.Category.RUNTIME_FAILURE;
       case CANCELED:
         return DruidException.Category.CANCELED;
-      case UNKNOWN:
-        return DruidException.Category.UNCATEGORIZED;
       case UNSUPPORTED:
         return DruidException.Category.UNSUPPORTED;
       case TIMEOUT:
         return DruidException.Category.TIMEOUT;
+      case UNKNOWN:
       default:
         return DruidException.Category.UNCATEGORIZED;
     }
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java 
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 147ebbec18..403cef1fa4 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -106,16 +106,6 @@ public class QueryContext
     return context.get(key);
   }
 
-  /**
-   * Return a value as a generic {@code Object}, returning the default value 
if the
-   * context value is not set.
-   */
-  public Object get(String key, Object defaultValue)
-  {
-    final Object val = get(key);
-    return val == null ? defaultValue : val;
-  }
-
   /**
    * Return a value as an {@code String}, returning {@link null} if the
    * context value is not set.
diff --git a/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java 
b/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java
new file mode 100644
index 0000000000..90faeabe42
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.error;
+
+import org.apache.druid.matchers.DruidMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ForbiddenTest
+{
+
+  @Test
+  public void testAsErrorResponse()
+  {
+    ErrorResponse errorResponse = new ErrorResponse(Forbidden.exception());
+    final Map<String, Object> asMap = errorResponse.getAsMap();
+
+    MatcherAssert.assertThat(
+        asMap,
+        DruidMatchers.mapMatcher(
+            "error", "druidException",
+            "errorCode", "forbidden",
+            "persona", "USER",
+            "category", "FORBIDDEN",
+            "errorMessage", "Unauthorized"
+        )
+    );
+
+    ErrorResponse recomposed = ErrorResponse.fromMap(asMap);
+
+    MatcherAssert.assertThat(
+        recomposed.getUnderlyingException(),
+        new DruidExceptionMatcher(
+            DruidException.Persona.USER,
+            DruidException.Category.FORBIDDEN,
+            "forbidden"
+        ).expectMessageContains("Unauthorized")
+    );
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 26fdd514e8..945e452941 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -30,6 +30,7 @@ import org.apache.druid.catalog.model.table.ExternalTableSpec;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.segment.column.RowSignature;
@@ -90,44 +91,73 @@ public class ExternalOperatorConversion extends 
DruidExternTableMacroConversion
         final ObjectMapper jsonMapper
     )
     {
-      try {
-        final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM);
-        if (sigValue == null && columns == null) {
-          throw new IAE(
-              "EXTERN requires either a %s value or an EXTEND clause",
-              SIGNATURE_PARAM
-          );
+      final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM);
+      if (sigValue == null && columns == null) {
+        throw InvalidInput.exception(
+            "EXTERN requires either a [%s] value or an EXTEND clause",
+            SIGNATURE_PARAM
+        );
+      }
+      if (sigValue != null && columns != null) {
+        throw InvalidInput.exception(
+            "EXTERN requires either a [%s] value or an EXTEND clause, but not 
both",
+            SIGNATURE_PARAM
+        );
+      }
+      final RowSignature rowSignature;
+      if (columns != null) {
+        try {
+          rowSignature = Columns.convertSignature(columns);
         }
-        if (sigValue != null && columns != null) {
-          throw new IAE(
-              "EXTERN requires either a %s value or an EXTEND clause, but not 
both",
-              SIGNATURE_PARAM
-          );
+        catch (IAE e) {
+          throw badArgumentException(e, "columns");
         }
-        final RowSignature rowSignature;
-        if (columns != null) {
-          rowSignature = Columns.convertSignature(columns);
-        } else {
+      } else {
+        try {
           rowSignature = jsonMapper.readValue(sigValue, RowSignature.class);
         }
+        catch (JsonProcessingException e) {
+          throw badArgumentException(e, "rowSignature");
+        }
+      }
 
-        String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
-        InputSource inputSource = jsonMapper.readValue(inputSrcStr, 
InputSource.class);
-        return new ExternalTableSpec(
-            inputSource,
-            jsonMapper.readValue(CatalogUtils.getString(args, 
INPUT_FORMAT_PARAM), InputFormat.class),
-            rowSignature,
-            inputSource::getTypes
-        );
+      String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
+      InputSource inputSource;
+      try {
+        inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class);
       }
       catch (JsonProcessingException e) {
-        throw DruidException.forPersona(DruidException.Persona.USER)
-                            .ofCategory(DruidException.Category.INVALID_INPUT)
-                            .build(e, e.getMessage());
+        throw badArgumentException(e, "inputSource");
       }
+      InputFormat inputFormat;
+      try {
+        inputFormat = jsonMapper.readValue(CatalogUtils.getString(args, 
INPUT_FORMAT_PARAM), InputFormat.class);
+      }
+      catch (JsonProcessingException e) {
+        throw badArgumentException(e, "inputFormat");
+      }
+      return new ExternalTableSpec(
+          inputSource,
+          inputFormat,
+          rowSignature,
+          inputSource::getTypes
+      );
     }
   }
 
+  private static DruidException badArgumentException(
+      Throwable cause,
+      String fieldName
+  )
+  {
+    return InvalidInput.exception(
+        cause,
+        "Invalid value for the field [%s]. Reason: [%s]",
+        fieldName,
+        cause.getMessage()
+    );
+  }
+
   @Inject
   public ExternalOperatorConversion(@Json final ObjectMapper jsonMapper)
   {
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
index f5d9056246..d7fc7d043b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.druid.error.InvalidSqlInput;
 import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
 import org.apache.druid.server.QueryLifecycleFactory;
@@ -116,7 +115,7 @@ public class NativeSqlEngine implements SqlEngine
       case SCAN_NEEDS_SIGNATURE:
         return false;
       default:
-        throw new IAE("Unrecognized feature: %s", feature);
+        throw 
SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(),
 feature);
     }
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
index cc7bef80f7..e8375feb3a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java
@@ -20,6 +20,7 @@
 package org.apache.druid.sql.calcite.run;
 
 import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 
 import java.util.Map;
@@ -46,4 +47,24 @@ public class SqlEngines
       }
     }
   }
+
+  /**
+   * This is a helper function that provides a developer-friendly exception 
when an engine cannot recognize the feature.
+   */
+  public static DruidException generateUnrecognizedFeatureException(
+      final String engineName,
+      final EngineFeature unrecognizedFeature
+  )
+  {
+    return DruidException
+        .forPersona(DruidException.Persona.DEVELOPER)
+        .ofCategory(DruidException.Category.DEFENSIVE)
+        .build(
+            "Engine [%s] is unable to recognize the feature [%s] for 
availability. This might happen when a "
+            + "newer feature is added without updating all the implementations 
of SqlEngine(s) to either allow or disallow "
+            + "its availability. Please raise an issue if you encounter this 
exception while using Druid.",
+            engineName,
+            unrecognizedFeature
+        );
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
index 740cac15ee..47dae5c4d9 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
@@ -22,11 +22,11 @@ package org.apache.druid.sql.calcite.view;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.run.QueryMaker;
 import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.run.SqlEngines;
 
 import java.util.Map;
 
@@ -78,7 +78,7 @@ public class ViewSqlEngine implements SqlEngine
         return false;
 
       default:
-        throw new IAE("Unrecognized feature: %s", feature);
+        throw 
SqlEngines.generateUnrecognizedFeatureException(ViewSqlEngine.class.getSimpleName(),
 feature);
     }
   }
 
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 1a19f9ddf7..433eb98ace 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -1635,11 +1635,96 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
                             new DruidExceptionMatcher(
                                 DruidException.Persona.USER,
                                 DruidException.Category.INVALID_INPUT,
-                                "general"
+                                "invalidInput"
+                            ).expectMessageContains(
+                                "Cannot construct instance of 
`org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be 
provided and non-empty"
+                            )
+                        )
+                        .verify();
+  }
+
+  @Test
+  public void testErrorWhenBothRowSignatureAndExtendsProvidedToExtern()
+  {
+    final String sqlString = "insert into dst \n"
+                             + "select time_parse(\"time\") as __time, * \n"
+                             + "from table( \n"
+                             + "extern(\n"
+                             + "'{\"type\": \"s3\", \"uris\": 
[\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n"
+                             + "'{\"type\": \"json\"}',\n"
+                             + "'[{\"name\": \"time\", \"type\": \"string\"}, 
{\"name\": \"channel\", \"type\": \"string\"}]'\n"
+                             + ")\n"
+                             + ") EXTEND (\"time\" VARCHAR, \"channel\" 
VARCHAR)\n"
+                             + "partitioned by DAY\n"
+                             + "clustered by channel";
+    HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
+    testIngestionQuery().context(context).sql(sqlString)
+                        .expectValidationError(
+                            new DruidExceptionMatcher(
+                                DruidException.Persona.USER,
+                                DruidException.Category.INVALID_INPUT,
+                                "invalidInput"
+                            ).expectMessageContains(
+                                "EXTERN requires either a [signature] value or 
an EXTEND clause, but not both"
+                            )
+                        )
+                        .verify();
+  }
+
+  @Test
+  public void testErrorWhenNoneOfRowSignatureAndExtendsProvidedToExtern()
+  {
+    final String sqlString = "insert into dst \n"
+                             + "select time_parse(\"time\") as __time, * \n"
+                             + "from table( \n"
+                             + "extern(\n"
+                             + "'{\"type\": \"s3\", \"uris\": 
[\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n"
+                             + "'{\"type\": \"json\"}'\n"
+                             + ")\n"
+                             + ")\n"
+                             + "partitioned by DAY\n"
+                             + "clustered by channel";
+    HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
+    testIngestionQuery().context(context).sql(sqlString)
+                        .expectValidationError(
+                            new DruidExceptionMatcher(
+                                DruidException.Persona.USER,
+                                DruidException.Category.INVALID_INPUT,
+                                "invalidInput"
+                            ).expectMessageContains(
+                                "EXTERN requires either a [signature] value or 
an EXTEND clause"
+                            )
+                        )
+                        .verify();
+  }
+
+  @Test
+  public void testErrorWhenInputSourceInvalid()
+  {
+    final String sqlString = "insert into dst \n"
+                             + "select time_parse(\"time\") as __time, * \n"
+                             + "from table( \n"
+                             + "extern(\n"
+                             + "'{\"type\": \"local\"}',\n"
+                             + "'{\"type\": \"json\"}',\n"
+                             + "'[{\"name\": \"time\", \"type\": \"string\"}, 
{\"name\": \"channel\", \"type\": \"string\"}]'\n"
+                             + ")\n"
+                             + ")\n"
+                             + "partitioned by DAY\n"
+                             + "clustered by channel";
+    HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
+    testIngestionQuery().context(context).sql(sqlString)
+                        .expectValidationError(
+                            new DruidExceptionMatcher(
+                                DruidException.Persona.USER,
+                                DruidException.Category.INVALID_INPUT,
+                                "invalidInput"
+                            ).expectMessageContains(
+                                "Invalid value for the field [inputSource]. 
Reason:"
                             )
-                                .expectMessageContains(
-                                    "Cannot construct instance of 
`org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be 
provided and non-empty"
-                                )
                         )
                         .verify();
   }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
index c1773ef62a..272fddbd8a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.run.QueryMaker;
 import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.run.SqlEngines;
 import org.apache.druid.sql.calcite.table.RowSignatures;
 
 import java.util.Map;
@@ -91,7 +91,7 @@ public class IngestionTestSqlEngine implements SqlEngine
       case ALLOW_BROADCAST_RIGHTY_JOIN:
         return true;
       default:
-        throw new IAE("Unrecognized feature: %s", feature);
+        throw 
SqlEngines.generateUnrecognizedFeatureException(IngestionTestSqlEngine.class.getSimpleName(),
 feature);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to