kgyrtkirk commented on code in PR #15908:
URL: https://github.com/apache/druid/pull/15908#discussion_r1499612423
##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -113,6 +153,302 @@ public void validateWindow(SqlNode windowOrId,
SqlValidatorScope scope, @Nullabl
super.validateWindow(windowOrId, scope, call);
}
+ @Override
+ public void validateInsert(final SqlInsert insert)
+ {
+ final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+ if (insert.isUpsert()) {
+ throw InvalidSqlInput.exception("UPSERT is not supported.");
+ }
+
+
+ // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+ final String operationName = insert.getOperator().getName();
+ if (insert.getTargetColumnList() != null) {
+ throw InvalidSqlInput.exception(
+ "Operation [%s] cannot be run with a target column list, given [%s
(%s)]",
+ operationName,
+ ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
+ );
+ }
+
+ // The target namespace is both the target table ID and the row type for
that table.
+ final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+ final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+ // The target is a new or existing datasource.
+ final DatasourceTable table = validateInsertTarget(targetNamespace,
insertNs, operationName);
+
+ // An existing datasource may have metadata.
+ final DatasourceFacade tableMetadata = table == null ? null :
table.effectiveMetadata().catalogMetadata();
+
+ // Validate segment granularity, which depends on nothing else.
+ if (!(ingestNode.getTargetTable() instanceof
ExternalDestinationSqlIdentifier)) {
+ Granularity effectiveGranularity =
getEffectiveGranularity(operationName, ingestNode, tableMetadata);
+ // Note: though this is the validator, we cheat a bit and write the
target
+ // granularity into the query context. Perhaps this step should be done
+ // during conversion, however, we've just worked out the granularity, so
we
+ // do it here instead.
+ try {
+ plannerContext.queryContextMap().put(
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+
plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(effectiveGranularity)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw InvalidSqlInput.exception(e, "Invalid partition granularity
[%s]", effectiveGranularity);
+ }
+ }
+
+ // The source must be a SELECT
+ final SqlNode source = insert.getSource();
+
+ // Validate the source statement.
+ // Because of the non-standard Druid semantics, we can't define the target
type: we don't know
+ // the target columns yet, and we can't infer types when they must come
from the SELECT.
+ // Normally, the target type is known, and is pushed into the SELECT. In
Druid, the SELECT
+ // usually defines the target types, unless the catalog says otherwise.
Since catalog entries
+ // are optional, we don't know the target type until we validate the
SELECT. (Also, we won't
+ // know names and we match by name.) Thus, we'd have to validate (to know
names and types)
+ // to get the target types, but we need the target types to validate.
Catch-22. So, we punt.
+ final SqlValidatorScope scope;
+ if (source instanceof SqlSelect) {
Review Comment:
note: it seems like most of this is copied over from
`SqlValidatorImpl#validateInsert` ; extended / refactored / methods were
extracted / commented on...
I think it might be challenging to maintain this in the long run - however
`validateInsert` doesn't seem to be changing very often
I think it might be usefull to leave some comments about the origins of this
method as an apidoc of `validateInsert` method
##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -113,6 +153,302 @@ public void validateWindow(SqlNode windowOrId,
SqlValidatorScope scope, @Nullabl
super.validateWindow(windowOrId, scope, call);
}
+ @Override
+ public void validateInsert(final SqlInsert insert)
+ {
+ final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+ if (insert.isUpsert()) {
+ throw InvalidSqlInput.exception("UPSERT is not supported.");
+ }
+
+
+ // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+ final String operationName = insert.getOperator().getName();
+ if (insert.getTargetColumnList() != null) {
+ throw InvalidSqlInput.exception(
+ "Operation [%s] cannot be run with a target column list, given [%s
(%s)]",
+ operationName,
+ ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
+ );
+ }
+
+ // The target namespace is both the target table ID and the row type for
that table.
+ final SqlValidatorNamespace targetNamespace = getNamespace(insert);
Review Comment:
could we have a copy `getNamespaceOrThrow` over from `SqlValidatorImpl` or
use `requireNonNull` (just to avoid possible issue if it ends up being null)
##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,32 +19,72 @@
package org.apache.druid.sql.calcite.planner;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.prepare.BaseDruidSqlValidator;
import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.runtime.CalciteException;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.checkerframework.checker.nullness.qual.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
/**
* Druid extended SQL validator. (At present, it doesn't actually
* have any extensions yet, but it will soon.)
*/
class DruidSqlValidator extends BaseDruidSqlValidator
{
+ private static final Pattern UNNAMED_COLUMN_PATTERN =
Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+ // Copied here from MSQE since that extension is not visible here.
+ public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+ public interface ValidatorContext
Review Comment:
note: unused interface
##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java:
##########
@@ -1765,7 +1762,6 @@ public void testErrorWhenInputSourceInvalid()
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
Review Comment:
I can't comment on the `testInsertWithInvalidColumnNameInIngest` testcase;
but the intention of the check is to ensure that something like:
```
INSERT INTO t SELECT __time, 1+1 FROM foo PARTITIONED BY ALL
```
is catched ; could you change or add something like this as a testcase?
##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -113,6 +153,302 @@ public void validateWindow(SqlNode windowOrId,
SqlValidatorScope scope, @Nullabl
super.validateWindow(windowOrId, scope, call);
}
+ @Override
+ public void validateInsert(final SqlInsert insert)
+ {
+ final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+ if (insert.isUpsert()) {
+ throw InvalidSqlInput.exception("UPSERT is not supported.");
+ }
+
+
+ // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+ final String operationName = insert.getOperator().getName();
+ if (insert.getTargetColumnList() != null) {
+ throw InvalidSqlInput.exception(
+ "Operation [%s] cannot be run with a target column list, given [%s
(%s)]",
+ operationName,
+ ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
+ );
+ }
+
+ // The target namespace is both the target table ID and the row type for
that table.
+ final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+ final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+ // The target is a new or existing datasource.
+ final DatasourceTable table = validateInsertTarget(targetNamespace,
insertNs, operationName);
+
+ // An existing datasource may have metadata.
+ final DatasourceFacade tableMetadata = table == null ? null :
table.effectiveMetadata().catalogMetadata();
+
+ // Validate segment granularity, which depends on nothing else.
+ if (!(ingestNode.getTargetTable() instanceof
ExternalDestinationSqlIdentifier)) {
+ Granularity effectiveGranularity =
getEffectiveGranularity(operationName, ingestNode, tableMetadata);
+ // Note: though this is the validator, we cheat a bit and write the
target
+ // granularity into the query context. Perhaps this step should be done
+ // during conversion, however, we've just worked out the granularity, so
we
+ // do it here instead.
+ try {
+ plannerContext.queryContextMap().put(
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+
plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(effectiveGranularity)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw InvalidSqlInput.exception(e, "Invalid partition granularity
[%s]", effectiveGranularity);
+ }
+ }
+
+ // The source must be a SELECT
+ final SqlNode source = insert.getSource();
+
+ // Validate the source statement.
+ // Because of the non-standard Druid semantics, we can't define the target
type: we don't know
+ // the target columns yet, and we can't infer types when they must come
from the SELECT.
+ // Normally, the target type is known, and is pushed into the SELECT. In
Druid, the SELECT
+ // usually defines the target types, unless the catalog says otherwise.
Since catalog entries
+ // are optional, we don't know the target type until we validate the
SELECT. (Also, we won't
+ // know names and we match by name.) Thus, we'd have to validate (to know
names and types)
+ // to get the target types, but we need the target types to validate.
Catch-22. So, we punt.
+ final SqlValidatorScope scope;
+ if (source instanceof SqlSelect) {
+ final SqlSelect sqlSelect = (SqlSelect) source;
+ validateSelect(sqlSelect, unknownType);
+ scope = null;
+ } else {
+ scope = scopes.get(source);
+ validateQuery(source, scope, unknownType);
+ }
+
+ final SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+ final RelRecordType sourceType = (RelRecordType)
sourceNamespace.getRowType();
+
+ // Determine the output (target) schema.
+ final RelDataType targetType = validateTargetType(scope, insertNs, insert,
sourceType, tableMetadata);
+
+ // Set the type for the INSERT/REPLACE node
+ setValidatedNodeType(insert, targetType);
+
+ // Segment size
+ if (tableMetadata != null &&
!plannerContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) {
+ final Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+ if (targetSegmentRows != null) {
+ plannerContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT,
targetSegmentRows);
+ }
+ }
+ }
+
+ /**
+ * Validate the target table. Druid {@code INSERT/REPLACE} can create a new
datasource,
+ * or insert into an existing one. If the target exists, it must be a
datasource. If it
+ * does not exist, the target must be in the datasource schema, normally
"druid".
+ */
+ private DatasourceTable validateInsertTarget(
+ final SqlValidatorNamespace targetNamespace,
+ final IdentifierNamespace insertNs,
+ final String operationName
+ )
+ {
+ // Get the target table ID
+ final SqlIdentifier destId = insertNs.getId();
+ if (destId.names.isEmpty()) {
+ // I don't think this can happen, but include a branch for it just in
case.
+ throw InvalidSqlInput.exception("Operation [%s] requires a target
table", operationName);
+ }
+
+ // Druid does not support 3+ part names.
+ final int n = destId.names.size();
+ if (n > 2) {
+ throw InvalidSqlInput.exception("Druid does not support 3+ part names:
[%s]", destId, operationName);
+ }
+ String tableName = destId.names.get(n - 1);
+
+ // If this is a 2-part name, the first part must be the datasource schema.
+ if (n == 2 &&
!plannerContext.getPlannerToolbox().druidSchemaName().equals(destId.names.get(0)))
{
+ throw InvalidSqlInput.exception(
+ "Table [%s] does not support operation [%s] because it is not a
Druid datasource",
+ destId,
+ operationName
+ );
+ }
+ try {
+ // Try to resolve the table. Will fail if this is an INSERT into a new
table.
+ validateNamespace(targetNamespace, unknownType);
+ SqlValidatorTable target = insertNs.resolve().getTable();
+ try {
+ return target.unwrap(DatasourceTable.class);
+ }
+ catch (Exception e) {
+ throw InvalidSqlInput.exception(
+ "Table [%s] does not support operation [%s] because it is not a
Druid datasource",
+ destId,
+ operationName
+ );
+ }
+ }
+ catch (CalciteContextException e) {
+ // Something failed. Let's make sure it was the table lookup.
+ // The check is kind of a hack, but its the best we can do given that
Calcite
+ // didn't expect this non-SQL use case.
+ if (e.getCause() instanceof SqlValidatorException && e.getMessage()
+ .contains(StringUtils.format("Object '%s' not found", tableName))) {
+ // The catalog implementation may be "strict": and require that the
target
+ // table already exists, rather than the default "lenient" mode that
can
+ // create a new table.
+ if
(plannerContext.getPlannerToolbox().catalogResolver().ingestRequiresExistingTable())
{
+ throw InvalidSqlInput.exception("Cannot %s into [%s] because it does
not exist", operationName, destId);
+ }
+ // New table. Validate the shape of the name.
+ IdUtils.validateId("table", tableName);
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Gets the effective PARTITIONED BY granularity. Resolves the granularity
from the granularity specified on the
+ * ingest node, and on the table metadata as stored in catalog, if any.
Mismatches between the 2 granularities are
+ * allowed if both are specified. The granularity specified on the ingest
node is taken to be the effective
+ * granulartiy if specified. If no granulartiy is specified on either the
ingestNode or in the table catalog entry
+ * for the table, an error is thrown.
+ *
+ * @param operationName The operation name
+ * @param ingestNode The ingest node.
+ * @param tableMetadata The table metadata as stored in the catalog, if any.
+ *
+ * @return The effective granularity
+ * @throws org.apache.druid.error.DruidException indicating invalud Sql if
both the ingest node and table metadata
+ * for the respective target table have no PARTITIONED BY granularity
defined.
+ */
+ private Granularity getEffectiveGranularity(
+ final String operationName,
+ final DruidSqlIngest ingestNode,
+ @Nullable final DatasourceFacade tableMetadata
+ )
+ {
+ Granularity effectiveGranularity = null;
+ final Granularity ingestionGranularity = ingestNode.getPartitionedBy() !=
null
+ ? ingestNode.getPartitionedBy().getGranularity()
+ : null;
+ if (ingestionGranularity != null) {
+
DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode,
ingestionGranularity);
+ effectiveGranularity = ingestionGranularity;
+ } else {
+ final Granularity definedGranularity = tableMetadata == null
+ ? null
+ : tableMetadata.segmentGranularity();
+ if (definedGranularity != null) {
+ // Should already have been checked when creating the catalog entry
+ DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null,
definedGranularity);
+ effectiveGranularity = definedGranularity;
+ }
+ }
+
+ if (effectiveGranularity == null) {
+ throw InvalidSqlInput.exception(
+ "Operation [%s] requires a PARTITIONED BY to be explicitly defined,
but none was found.",
+ operationName);
+ }
+
+ return effectiveGranularity;
+ }
+
+ /**
+ * Compute and validate the target type. In normal SQL, the engine would
insert
+ * a project operator after the SELECT before the write to cast columns from
the
+ * input type to the (compatible) defined output type. Druid doesn't work
that way.
+ * In MSQ, the output the just is the input type. If the user wants to
control the
+ * output type, then the user must manually insert any required CAST: Druid
is not
+ * in the business of changing the type to suit the catalog.
+ * <p>
+ * As a result, we first propagate column names and types using Druid rules:
the
+ * output is exactly what SELECT says it is. We then apply restrictions from
the
+ * catalog. If the table is strict, only column names from the catalog can be
+ * used.
+ */
+ private RelDataType validateTargetType(
+ SqlValidatorScope scope,
+ final IdentifierNamespace insertNs,
+ SqlInsert insert,
+ RelRecordType sourceType,
+ DatasourceFacade tableMetadata
+ )
+ {
+ final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+ for (final RelDataTypeField sourceField : sourceFields) {
+ // Check that there are no unnamed columns in the insert.
+ if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) {
+ throw InvalidSqlInput.exception(
Review Comment:
could you replace these exceptions with ones communicating the `SqlNode` if
its interesting/valuable....
if its a general error that doesn't matter...but this error is specific to a
selected column:
```
throw buildCalciteContextException(
"Insertion requires columns to be named....",
getSqlNodeFor(insert, sourceFields.indexOf(sourceField))
```
rough `sqlNodeFor` method:
```
SqlNode getSqlNodeFor(SqlInsert insert, int idx) {
SqlNode src = insert.getSource();
if(src instanceof SqlSelect) {
SqlSelect sqlSelect = (SqlSelect) src;
SqlNodeList selectList = sqlSelect.getSelectList();
if(idx < selectList.size()) {
return selectList.get(idx);
}
}
return src;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]