zachjsh commented on code in PR #15711:
URL: https://github.com/apache/druid/pull/15711#discussion_r1473616138


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,38 +19,762 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.runtime.CalciteContextException;
 import org.apache.calcite.runtime.CalciteException;
+import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.error.InvalidSqlInput;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
 import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
  * Druid extended SQL validator. (At present, it doesn't actually
  * have any extensions yet, but it will soon.)
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = 
Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
   private final PlannerContext plannerContext;
 
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      Config validatorConfig,
-      PlannerContext plannerContext
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final Config validatorConfig,
+      final PlannerContext plannerContext,
+      final ValidatorContext validatorContext
   )
   {
     super(opTab, catalogReader, typeFactory, validatorConfig);
     this.plannerContext = plannerContext;
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns 
are
+   * matched by name. A datasource, by default, allows the insertion of 
arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). 
Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by 
name.</li>
+   * <li>There is no requirement that the target columns already exist. In 
fact,
+   * even if the target column exists, any existing type is ignored if not 
specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code 
VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table 
to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} 
clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly 
customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(final SqlInsert insert)
+  {
+    final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    final String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", 
operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for 
that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    final DatasourceTable table = validateInsertTarget(targetNamespace, 
insertNs, operationName);
+    final SqlValidatorTable target = insertNs.resolve().getTable();
+
+    // An existing datasource may have metadata.
+    final DatasourceFacade tableMetadata = table == null ? null : 
table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    final SqlNode source = insert.getSource();
+    ensureNoOrderBy(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    final SqlNodeList catalogClustering = 
convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in 
the above step.
+    // Because of the non-standard Druid semantics, we can't define the target 
type: we don't know
+    // the target columns yet, and we can't infer types when they must come 
from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In 
Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. 
Since catalog entries
+    // are optional, we don't know the target type until we validate the 
SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know 
names and types)
+    // to get the target types, but we need the target types to validate. 
Catch-22. So, we punt.
+    final SqlValidatorScope scope;
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+      scope = null;
+    } else {
+      scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    final SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    final RelRecordType sourceType = (RelRecordType) 
sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = 
sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has 
additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it 
after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    final RelDataType targetType = validateTargetType(scope, target, insert, 
sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null && 
!validatorContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) {
+      final Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, 
targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new 
datasource,
+   * or insert into an existing one. If the target exists, it must be a 
datasource. If it
+   * does not exist, the target must be in the datasource schema, normally 
"druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    final SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in 
case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    final int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Druid does not support 3+ part names: [%s]", destId);
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && 
!validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into [%s] because the table is not in the 
'druid' schema",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new 
table.

Review Comment:
   This causes MSQ ingestion to fail on non-catalog tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to