zachjsh commented on code in PR #15908:
URL: https://github.com/apache/druid/pull/15908#discussion_r1492917050
##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -113,6 +158,324 @@ public void validateWindow(SqlNode windowOrId,
SqlValidatorScope scope, @Nullabl
super.validateWindow(windowOrId, scope, call);
}
+ @Override
+ public void validateInsert(final SqlInsert insert)
+ {
+ final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+ if (insert.isUpsert()) {
+ throw InvalidSqlInput.exception("UPSERT is not supported.");
+ }
+
+
+ // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+ final String operationName = insert.getOperator().getName();
+ if (insert.getTargetColumnList() != null) {
+ throw InvalidSqlInput.exception(
+ "Operation [%s] cannot be run with a target column list, given [%s
(%s)]",
+ operationName,
+ ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
+ );
+ }
+
+ // The target namespace is both the target table ID and the row type for
that table.
+ final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+ final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+ // The target is a new or existing datasource.
+ final DatasourceTable table = validateInsertTarget(targetNamespace,
insertNs, operationName);
+
+ // An existing datasource may have metadata.
+ final DatasourceFacade tableMetadata = table == null ? null :
table.effectiveMetadata().catalogMetadata();
+
+ // Validate segment granularity, which depends on nothing else.
+ if (!(ingestNode.getTargetTable() instanceof
ExternalDestinationSqlIdentifier)) {
+ validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+ }
+
+ // The source must be a SELECT
+ final SqlNode source = insert.getSource();
+
+ // Validate the source statement.
+ // Because of the non-standard Druid semantics, we can't define the target
type: we don't know
+ // the target columns yet, and we can't infer types when they must come
from the SELECT.
+ // Normally, the target type is known, and is pushed into the SELECT. In
Druid, the SELECT
+ // usually defines the target types, unless the catalog says otherwise.
Since catalog entries
+ // are optional, we don't know the target type until we validate the
SELECT. (Also, we won't
+ // know names and we match by name.) Thus, we'd have to validate (to know
names and types)
+ // to get the target types, but we need the target types to validate.
Catch-22. So, we punt.
+ final SqlValidatorScope scope;
+ if (source instanceof SqlSelect) {
+ final SqlSelect sqlSelect = (SqlSelect) source;
+ validateSelect(sqlSelect, unknownType);
+ scope = null;
+ } else {
+ scope = scopes.get(source);
+ validateQuery(source, scope, unknownType);
+ }
+
+ final SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+ final RelRecordType sourceType = (RelRecordType)
sourceNamespace.getRowType();
+
+ // Determine the output (target) schema.
+ final RelDataType targetType = validateTargetType(scope, insertNs, insert,
sourceType, tableMetadata);
+
+ // Set the type for the INSERT/REPLACE node
+ setValidatedNodeType(insert, targetType);
+
+ // Segment size
+ if (tableMetadata != null &&
!validatorContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) {
+ final Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+ if (targetSegmentRows != null) {
+ validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT,
targetSegmentRows);
+ }
+ }
+ }
+
+ /**
+ * Validate the target table. Druid {@code INSERT/REPLACE} can create a new
datasource,
+ * or insert into an existing one. If the target exists, it must be a
datasource. If it
+ * does not exist, the target must be in the datasource schema, normally
"druid".
+ */
+ private DatasourceTable validateInsertTarget(
+ final SqlValidatorNamespace targetNamespace,
+ final IdentifierNamespace insertNs,
+ final String operationName
+ )
+ {
+ // Get the target table ID
+ final SqlIdentifier destId = insertNs.getId();
+ if (destId.names.isEmpty()) {
+ // I don't think this can happen, but include a branch for it just in
case.
+ throw InvalidSqlInput.exception("%s requires a target table.",
operationName);
+ }
+
+ // Druid does not support 3+ part names.
+ final int n = destId.names.size();
+ if (n > 2) {
+ throw InvalidSqlInput.exception("Druid does not support 3+ part names:
[%s]", destId, operationName);
+ }
+ String tableName = destId.names.get(n - 1);
+
+ // If this is a 2-part name, the first part must be the datasource schema.
+ if (n == 2 &&
!validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+ throw InvalidSqlInput.exception(
+ "Table [%s] does not support operation [%s] because it is not a
Druid datasource",
+ destId,
+ operationName
+ );
+ }
+ try {
+ // Try to resolve the table. Will fail if this is an INSERT into a new
table.
+ validateNamespace(targetNamespace, unknownType);
+ SqlValidatorTable target = insertNs.resolve().getTable();
+ try {
+ return target.unwrap(DatasourceTable.class);
+ }
+ catch (Exception e) {
+ throw InvalidSqlInput.exception(
+ "Table [%s] does not support operation [%s] because it is not a
Druid datasource",
+ destId,
+ operationName
+ );
+ }
+ }
+ catch (CalciteContextException e) {
+ // Something failed. Let's make sure it was the table lookup.
+ // The check is kind of a hack, but its the best we can do given that
Calcite
+ // didn't expect this non-SQL use case.
+ if (e.getCause() instanceof SqlValidatorException && e.getMessage()
+ .contains(StringUtils.format("Object '%s' not found", tableName))) {
+ // The catalog implementation may be "strict": and require that the
target
+ // table already exists, rather than the default "lenient" mode that
can
+ // create a new table.
+ if (validatorContext.catalog().ingestRequiresExistingTable()) {
+ throw InvalidSqlInput.exception("Cannot %s into [%s] because it does
not exist", operationName, destId);
+ }
+ // New table. Validate the shape of the name.
+ IdUtils.validateId("table", tableName);
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ private void validateSegmentGranularity(
Review Comment:
Good point! Fixed. let me know if better now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]