zachjsh commented on code in PR #15908:
URL: https://github.com/apache/druid/pull/15908#discussion_r1492859586
##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -113,6 +158,324 @@ public void validateWindow(SqlNode windowOrId,
SqlValidatorScope scope, @Nullabl
super.validateWindow(windowOrId, scope, call);
}
+ @Override
+ public void validateInsert(final SqlInsert insert)
+ {
+ final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+ if (insert.isUpsert()) {
+ throw InvalidSqlInput.exception("UPSERT is not supported.");
+ }
+
+
+ // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+ final String operationName = insert.getOperator().getName();
+ if (insert.getTargetColumnList() != null) {
+ throw InvalidSqlInput.exception(
+ "Operation [%s] cannot be run with a target column list, given [%s
(%s)]",
+ operationName,
+ ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
+ );
+ }
+
+ // The target namespace is both the target table ID and the row type for
that table.
+ final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+ final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+ // The target is a new or existing datasource.
+ final DatasourceTable table = validateInsertTarget(targetNamespace,
insertNs, operationName);
+
+ // An existing datasource may have metadata.
+ final DatasourceFacade tableMetadata = table == null ? null :
table.effectiveMetadata().catalogMetadata();
+
+ // Validate segment granularity, which depends on nothing else.
+ if (!(ingestNode.getTargetTable() instanceof
ExternalDestinationSqlIdentifier)) {
+ validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+ }
+
+ // The source must be a SELECT
+ final SqlNode source = insert.getSource();
+
+ // Validate the source statement.
+ // Because of the non-standard Druid semantics, we can't define the target
type: we don't know
+ // the target columns yet, and we can't infer types when they must come
from the SELECT.
+ // Normally, the target type is known, and is pushed into the SELECT. In
Druid, the SELECT
+ // usually defines the target types, unless the catalog says otherwise.
Since catalog entries
+ // are optional, we don't know the target type until we validate the
SELECT. (Also, we won't
+ // know names and we match by name.) Thus, we'd have to validate (to know
names and types)
+ // to get the target types, but we need the target types to validate.
Catch-22. So, we punt.
+ final SqlValidatorScope scope;
+ if (source instanceof SqlSelect) {
+ final SqlSelect sqlSelect = (SqlSelect) source;
+ validateSelect(sqlSelect, unknownType);
+ scope = null;
+ } else {
+ scope = scopes.get(source);
+ validateQuery(source, scope, unknownType);
+ }
+
+ final SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+ final RelRecordType sourceType = (RelRecordType)
sourceNamespace.getRowType();
+
+ // Determine the output (target) schema.
+ final RelDataType targetType = validateTargetType(scope, insertNs, insert,
sourceType, tableMetadata);
+
+ // Set the type for the INSERT/REPLACE node
+ setValidatedNodeType(insert, targetType);
+
+ // Segment size
+ if (tableMetadata != null &&
!validatorContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) {
+ final Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+ if (targetSegmentRows != null) {
+ validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT,
targetSegmentRows);
+ }
+ }
+ }
+
+ /**
+ * Validate the target table. Druid {@code INSERT/REPLACE} can create a new
datasource,
+ * or insert into an existing one. If the target exists, it must be a
datasource. If it
+ * does not exist, the target must be in the datasource schema, normally
"druid".
+ */
+ private DatasourceTable validateInsertTarget(
+ final SqlValidatorNamespace targetNamespace,
+ final IdentifierNamespace insertNs,
+ final String operationName
+ )
+ {
+ // Get the target table ID
+ final SqlIdentifier destId = insertNs.getId();
+ if (destId.names.isEmpty()) {
+ // I don't think this can happen, but include a branch for it just in
case.
+ throw InvalidSqlInput.exception("%s requires a target table.",
operationName);
+ }
+
+ // Druid does not support 3+ part names.
+ final int n = destId.names.size();
+ if (n > 2) {
+ throw InvalidSqlInput.exception("Druid does not support 3+ part names:
[%s]", destId, operationName);
+ }
+ String tableName = destId.names.get(n - 1);
+
+ // If this is a 2-part name, the first part must be the datasource schema.
+ if (n == 2 &&
!validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+ throw InvalidSqlInput.exception(
+ "Table [%s] does not support operation [%s] because it is not a
Druid datasource",
+ destId,
+ operationName
+ );
+ }
+ try {
+ // Try to resolve the table. Will fail if this is an INSERT into a new
table.
+ validateNamespace(targetNamespace, unknownType);
+ SqlValidatorTable target = insertNs.resolve().getTable();
+ try {
+ return target.unwrap(DatasourceTable.class);
+ }
+ catch (Exception e) {
+ throw InvalidSqlInput.exception(
+ "Table [%s] does not support operation [%s] because it is not a
Druid datasource",
+ destId,
+ operationName
+ );
+ }
+ }
+ catch (CalciteContextException e) {
+ // Something failed. Let's make sure it was the table lookup.
+ // The check is kind of a hack, but its the best we can do given that
Calcite
+ // didn't expect this non-SQL use case.
+ if (e.getCause() instanceof SqlValidatorException && e.getMessage()
+ .contains(StringUtils.format("Object '%s' not found", tableName))) {
+ // The catalog implementation may be "strict": and require that the
target
+ // table already exists, rather than the default "lenient" mode that
can
+ // create a new table.
+ if (validatorContext.catalog().ingestRequiresExistingTable()) {
+ throw InvalidSqlInput.exception("Cannot %s into [%s] because it does
not exist", operationName, destId);
+ }
+ // New table. Validate the shape of the name.
+ IdUtils.validateId("table", tableName);
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private void validateSegmentGranularity(
+ final String operationName,
+ final DruidSqlIngest ingestNode,
+ final DatasourceFacade tableMetadata
+ )
+ {
+ final Granularity definedGranularity = tableMetadata == null ? null :
tableMetadata.segmentGranularity();
+ if (definedGranularity != null) {
+ // Should already have been checked when creating the catalog entry
+ DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null,
definedGranularity);
+ }
+ final Granularity ingestionGranularity =
ingestNode.getPartitionedBy().getGranularity();
+ if (ingestionGranularity != null) {
+
DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode,
ingestionGranularity);
+ }
+ final Granularity finalGranularity;
+ if (definedGranularity == null) {
+ // The catalog has no granularity: apply the query value
+ if (ingestionGranularity == null) {
+ // Neither have a value: error
+ throw InvalidSqlInput.exception(
+ "Operation [%s] requires a PARTITIONED BY to be explicitly
defined, but none was found.",
+ operationName
+ );
+ } else {
+ finalGranularity = ingestionGranularity;
+ }
+ } else {
+ // The catalog has a granularity
+ if (ingestionGranularity == null) {
+ // The query has no granularity: just apply the catalog granularity.
+ finalGranularity = definedGranularity;
+ } else if (definedGranularity.equals(ingestionGranularity)) {
+ // Both have a setting and they are the same. We assume this would
+ // likely occur only when moving to the catalog, and old queries still
+ // contain the PARTITION BY clause.
+ finalGranularity = definedGranularity;
+ } else {
+ // Both have a setting but they are different. Since the user declared
+ // the grain, using a different one is an error. If the user wants to
+ // vary the grain across different (re)ingestions, then, at present,
don't
+ // declare the grain in the catalog.
+ // TODO: allow mismatch
+ throw InvalidSqlInput.exception(
+ "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+ granularityToSqlString(definedGranularity),
+ granularityToSqlString(ingestionGranularity)
+ );
+ }
+ }
+
+ // Note: though this is the validator, we cheat a bit and write the target
+ // granularity into the query context. Perhaps this step should be done
+ // during conversion, however, we've just worked out the granularity, so we
+ // do it here instead.
+ try {
+ validatorContext.queryContextMap().put(
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+ validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]",
finalGranularity);
+ }
+ }
+
+ private String granularityToSqlString(final Granularity gran)
+ {
+ if (gran == null) {
+ return "NULL";
+ }
+ // The validation path will only ever see the ALL granularity or
+ // a period granularity. Neither the parser nor catalog can
+ // create a Duration granularity.
+ if (Granularities.ALL == gran) {
+ return "ALL TIME";
+ }
+ return ((PeriodGranularity) gran).getPeriod().toString();
+ }
+
+
+ /**
+ * Compute and validate the target type. In normal SQL, the engine would
insert
+ * a project operator after the SELECT before the write to cast columns from
the
+ * input type to the (compatible) defined output type. Druid doesn't work
that way.
+ * In MSQ, the output the just is the input type. If the user wants to
control the
+ * output type, then the user must manually insert any required CAST: Druid
is not
+ * in the business of changing the type to suit the catalog.
+ * <p>
+ * As a result, we first propagate column names and types using Druid rules:
the
+ * output is exactly what SELECT says it is. We then apply restrictions from
the
+ * catalog. If the table is strict, only column names from the catalog can be
+ * used.
+ */
+ private RelDataType validateTargetType(
+ SqlValidatorScope scope,
+ final IdentifierNamespace insertNs,
+ SqlInsert insert,
+ RelRecordType sourceType,
+ DatasourceFacade tableMetadata
+ )
+ {
+ final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+ for (final RelDataTypeField sourceField : sourceFields) {
+ // Check that there are no unnamed columns in the insert.
+ if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) {
Review Comment:
Yes! there are tests in CalciteInsertDmlTest, for example,
org.apache.druid.sql.calcite.CalciteInsertDmlTest#testInsertWithUnnamedColumnInSelectStatement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]