This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 39b3487aa9 Add replace statement to sql parser (#12386)
39b3487aa9 is described below
commit 39b3487aa9829d1d5b19733ef76319d8133dcb38
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri May 13 10:56:40 2022 +0530
Add replace statement to sql parser (#12386)
Relevant Issue: #11929
- Add custom replace statement to Druid SQL parser.
- Edit DruidPlanner to convert relevant fields to Query Context.
- Refactor common code with INSERT statements to reuse them for REPLACE
where possible.
---
sql/src/main/codegen/config.fmpp | 6 +
.../codegen/includes/{insert.ftl => common.ftl} | 59 +-
sql/src/main/codegen/includes/explain.ftl | 2 +
sql/src/main/codegen/includes/insert.ftl | 55 --
sql/src/main/codegen/includes/replace.ftl | 78 +++
.../druid/sql/calcite/parser/DruidSqlInsert.java | 2 +
.../sql/calcite/parser/DruidSqlParserUtils.java | 212 +++++++
.../{DruidSqlInsert.java => DruidSqlReplace.java} | 64 +-
.../druid/sql/calcite/planner/DruidPlanner.java | 195 +++---
.../druid/sql/calcite/CalciteIngestionDmlTest.java | 339 +++++++++++
.../druid/sql/calcite/CalciteInsertDmlTest.java | 372 ++----------
.../druid/sql/calcite/CalciteReplaceDmlTest.java | 656 +++++++++++++++++++++
.../sql/calcite/parser/DruidSqlUnparseTest.java | 97 +++
13 files changed, 1598 insertions(+), 539 deletions(-)
diff --git a/sql/src/main/codegen/config.fmpp b/sql/src/main/codegen/config.fmpp
index 817bd4e79e..31fe812bbe 100644
--- a/sql/src/main/codegen/config.fmpp
+++ b/sql/src/main/codegen/config.fmpp
@@ -51,6 +51,7 @@ data: {
# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
imports: [
+ "java.util.List"
"org.apache.calcite.sql.SqlNode"
"org.apache.calcite.sql.SqlInsert"
"org.apache.druid.java.util.common.granularity.Granularity"
@@ -63,6 +64,7 @@ data: {
# keyword add it to 'nonReservedKeywords' section.
keywords: [
"CLUSTERED"
+ "OVERWRITE"
"PARTITIONED"
]
@@ -218,6 +220,7 @@ data: {
"OTHERS"
"OUTPUT"
"OVERRIDING"
+ "OVERWRITE"
"PAD"
"PARAMETER_MODE"
"PARAMETER_NAME"
@@ -384,6 +387,7 @@ data: {
statementParserMethods: [
"DruidSqlInsertEof()"
"DruidSqlExplain()"
+ "DruidSqlReplaceEof()"
]
# List of methods for parsing custom literals.
@@ -433,8 +437,10 @@ data: {
# given as part of "statementParserMethods", "literalParserMethods" or
# "dataTypeParserMethods".
implementationFiles: [
+ "common.ftl"
"insert.ftl"
"explain.ftl"
+ "replace.ftl"
]
includePosixOperators: false
diff --git a/sql/src/main/codegen/includes/insert.ftl
b/sql/src/main/codegen/includes/common.ftl
similarity index 52%
copy from sql/src/main/codegen/includes/insert.ftl
copy to sql/src/main/codegen/includes/common.ftl
index ced692759f..136492482c 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/common.ftl
@@ -18,64 +18,11 @@
*/
// Using fully qualified name for Pair class, since Calcite also has a same
class name being used in the Parser.jj
-SqlNode DruidSqlInsertEof() :
-{
- SqlNode insertNode;
- org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy =
new org.apache.druid.java.util.common.Pair(null, null);
- SqlNodeList clusteredBy = null;
-}
-{
- insertNode = SqlInsert()
- // PARTITIONED BY is necessary, but is kept optional in the grammar. It is
asserted that it is not missing in the
- // DruidSqlInsert constructor so that we can return a custom error message.
- [
- <PARTITIONED> <BY>
- partitionedBy = PartitionGranularity()
- ]
- [
- <CLUSTERED> <BY>
- clusteredBy = ClusterItems()
- ]
- // EOF is also present in SqlStmtEof but EOF is a special case and a single
EOF can be consumed multiple times.
- // The reason for adding EOF here is to ensure that we create a
DruidSqlInsert node after the syntax has been
- // validated and throw SQL syntax errors before performing validations in
the DruidSqlInsert which can overshadow the
- // actual error message.
- <EOF>
- {
- if (!(insertNode instanceof SqlInsert)) {
- // This shouldn't be encountered, but done as a defensive practice.
SqlInsert() always returns a node of type
- // SqlInsert
- return insertNode;
- }
- SqlInsert sqlInsert = (SqlInsert) insertNode;
- return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs,
clusteredBy);
- }
-}
-
-SqlNodeList ClusterItems() :
-{
- List<SqlNode> list;
- final Span s;
- SqlNode e;
-}
-{
- e = OrderItem() {
- s = span();
- list = startList(e);
- }
- (
- LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
- )*
- {
- return new SqlNodeList(list, s.addAll(list).pos());
- }
-}
-
org.apache.druid.java.util.common.Pair<Granularity, String>
PartitionGranularity() :
{
- SqlNode e = null;
- Granularity granularity = null;
- String unparseString = null;
+ SqlNode e;
+ Granularity granularity;
+ String unparseString;
}
{
(
diff --git a/sql/src/main/codegen/includes/explain.ftl
b/sql/src/main/codegen/includes/explain.ftl
index b24927e500..b6f3d926a1 100644
--- a/sql/src/main/codegen/includes/explain.ftl
+++ b/sql/src/main/codegen/includes/explain.ftl
@@ -62,6 +62,8 @@ SqlNode DruidQueryOrSqlQueryOrDml() :
{
(
stmt = DruidSqlInsertEof()
+ |
+ stmt = DruidSqlReplaceEof()
|
stmt = SqlQueryOrDml()
)
diff --git a/sql/src/main/codegen/includes/insert.ftl
b/sql/src/main/codegen/includes/insert.ftl
index ced692759f..6f7c1148f5 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -70,58 +70,3 @@ SqlNodeList ClusterItems() :
return new SqlNodeList(list, s.addAll(list).pos());
}
}
-
-org.apache.druid.java.util.common.Pair<Granularity, String>
PartitionGranularity() :
-{
- SqlNode e = null;
- Granularity granularity = null;
- String unparseString = null;
-}
-{
- (
- <HOUR>
- {
- granularity = Granularities.HOUR;
- unparseString = "HOUR";
- }
- |
- <DAY>
- {
- granularity = Granularities.DAY;
- unparseString = "DAY";
- }
- |
- <MONTH>
- {
- granularity = Granularities.MONTH;
- unparseString = "MONTH";
- }
- |
- <YEAR>
- {
- granularity = Granularities.YEAR;
- unparseString = "YEAR";
- }
- |
- <ALL>
- {
- granularity = Granularities.ALL;
- unparseString = "ALL";
- }
- [
- <TIME>
- {
- unparseString += " TIME";
- }
- ]
- |
- e = Expression(ExprContext.ACCEPT_SUB_QUERY)
- {
- granularity =
DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
- unparseString = e.toString();
- }
- )
- {
- return new org.apache.druid.java.util.common.Pair(granularity,
unparseString);
- }
-}
diff --git a/sql/src/main/codegen/includes/replace.ftl
b/sql/src/main/codegen/includes/replace.ftl
new file mode 100644
index 0000000000..741c59c0ad
--- /dev/null
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Taken from syntax of SqlInsert statement from calcite parser, edited for
replace syntax
+SqlNode DruidSqlReplaceEof() :
+{
+ SqlNode table;
+ SqlNode source;
+ SqlNodeList columnList = null;
+ final Span s;
+ SqlInsert sqlInsert;
+ // Using fully qualified name for Pair class, since Calcite also has a
same class name being used in the Parser.jj
+ org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy
= new org.apache.druid.java.util.common.Pair(null, null);
+ final Pair<SqlNodeList, SqlNodeList> p;
+ final SqlNode replaceTimeQuery;
+}
+{
+ <REPLACE> { s = span(); }
+ <INTO>
+ table = CompoundIdentifier()
+ [
+ p = ParenthesizedCompoundIdentifierList() {
+ if (p.left.size() > 0) {
+ columnList = p.left;
+ }
+ }
+ ]
+ <OVERWRITE>
+ replaceTimeQuery = ReplaceTimeQuery()
+ source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+ // PARTITIONED BY is necessary, but is kept optional in the grammar. It is
asserted that it is not missing in the
+ // DruidSqlInsert constructor so that we can return a custom error message.
+ [
+ <PARTITIONED> <BY>
+ partitionedBy = PartitionGranularity()
+ ]
+ // EOF is also present in SqlStmtEof but EOF is a special case and a
single EOF can be consumed multiple times.
+ // The reason for adding EOF here is to ensure that we create a
DruidSqlReplace node after the syntax has been
+ // validated and throw SQL syntax errors before performing validations in
the DruidSqlReplace which can overshadow the
+ // actual error message.
+ <EOF>
+ {
+ sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table,
source, columnList);
+ return new DruidSqlReplace(sqlInsert, partitionedBy.lhs,
partitionedBy.rhs, replaceTimeQuery);
+ }
+}
+
+SqlNode ReplaceTimeQuery() :
+{
+ SqlNode replaceQuery;
+}
+{
+ (
+ <ALL> { replaceQuery = SqlLiteral.createCharString("ALL", getPos()); }
+ |
+ // We parse all types of conditions and throw an exception if it is not
supported to keep the parsing simple
+ replaceQuery = WhereOpt()
+ )
+ {
+ return replaceQuery;
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
index c941da3e59..dbaace1d93 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
@@ -43,6 +43,8 @@ public class DruidSqlInsert extends SqlInsert
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
private final Granularity partitionedBy;
+
+ // Used in the unparse function to generate the original query since we
convert the string to an enum
private final String partitionedByStringForUnparse;
@Nullable
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
index d81ce2ca5b..2ef587e3b0 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
@@ -20,27 +20,48 @@
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlTimestampLiteral;
+import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.filter.AndDimFilter;
+import org.apache.druid.query.filter.BoundDimFilter;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.OrDimFilter;
+import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.expression.TimeUnits;
import
org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.filtration.MoveTimeFiltersToIntervals;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
import org.joda.time.Period;
+import org.joda.time.base.AbstractInterval;
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class DruidSqlParserUtils
{
private static final Logger log = new Logger(DruidSqlParserUtils.class);
+ public static final String ALL = "all";
/**
* Delegates to {@code convertSqlNodeToGranularity} and converts the
exceptions to {@link ParseException}
@@ -167,4 +188,195 @@ public class DruidSqlParserUtils
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
));
}
+
+ /**
+ * This method validates and converts a {@link SqlNode} representing a query
into an optmizied list of intervals to
+ * be used in creating an ingestion spec. If the sqlNode is an SqlLiteral of
{@link #ALL}, returns a singleton list of
+ * "ALL". Otherwise, it converts and optimizes the query using {@link
MoveTimeFiltersToIntervals} into a list of
+ * intervals which contain all valid values of time as per the query.
+ *
+ * The following validations are performed
+ * 1. Only __time column and timestamp literals are present in the query
+ * 2. The interval after optimization is not empty
+ * 3. The operands in the expression are supported
+ * 4. The intervals after adjusting for timezone are aligned with the
granularity parameter
+ *
+ * @param replaceTimeQuery Sql node representing the query
+ * @param granularity granularity of the query for validation
+ * @param dateTimeZone timezone
+ * @return List of string representation of intervals
+ * @throws ValidationException if the SqlNode cannot be converted to a list
of intervals
+ */
+ public static List<String> validateQueryAndConvertToIntervals(
+ SqlNode replaceTimeQuery,
+ Granularity granularity,
+ DateTimeZone dateTimeZone
+ ) throws ValidationException
+ {
+ if (replaceTimeQuery instanceof SqlLiteral &&
ALL.equalsIgnoreCase(((SqlLiteral) replaceTimeQuery).toValue())) {
+ return ImmutableList.of(ALL);
+ }
+
+ DimFilter dimFilter = convertQueryToDimFilter(replaceTimeQuery,
dateTimeZone);
+
+ Filtration filtration = Filtration.create(dimFilter);
+ filtration = MoveTimeFiltersToIntervals.instance().apply(filtration);
+ List<Interval> intervals = filtration.getIntervals();
+
+ if (filtration.getDimFilter() != null) {
+ throw new ValidationException("Only " + ColumnHolder.TIME_COLUMN_NAME +
" column is supported in OVERWRITE WHERE clause");
+ }
+
+ if (intervals.isEmpty()) {
+ throw new ValidationException("Intervals for replace are empty");
+ }
+
+ for (Interval interval : intervals) {
+ DateTime intervalStart = interval.getStart();
+ DateTime intervalEnd = interval.getEnd();
+ if (!granularity.bucketStart(intervalStart).equals(intervalStart) ||
!granularity.bucketStart(intervalEnd).equals(intervalEnd)) {
+ throw new ValidationException("OVERWRITE WHERE clause contains an
interval " + intervals +
+ " which is not aligned with PARTITIONED
BY granularity " + granularity);
+ }
+ }
+ return intervals
+ .stream()
+ .map(AbstractInterval::toString)
+ .collect(Collectors.toList());
+ }
+
+
+ /**
+ * This method is used to convert an {@link SqlNode} representing a query
into a {@link DimFilter} for the same query.
+ * It takes the timezone as a separate parameter, as Sql timestamps don't
contain that information. Supported functions
+ * are AND, OR, NOT, >, <, >=, <= and BETWEEN operators in the sql query.
+ *
+ * @param replaceTimeQuery Sql node representing the query
+ * @param dateTimeZone timezone
+ * @return Dimfilter for the query
+ * @throws ValidationException if the SqlNode cannot be converted a Dimfilter
+ */
+ public static DimFilter convertQueryToDimFilter(SqlNode replaceTimeQuery,
DateTimeZone dateTimeZone)
+ throws ValidationException
+ {
+ if (!(replaceTimeQuery instanceof SqlBasicCall)) {
+ log.error("Expected SqlBasicCall during parsing, but found " +
replaceTimeQuery.getClass().getName());
+ throw new ValidationException("Invalid OVERWRITE WHERE clause");
+ }
+ String columnName;
+ SqlBasicCall sqlBasicCall = (SqlBasicCall) replaceTimeQuery;
+ List<SqlNode> operandList = sqlBasicCall.getOperandList();
+ switch (sqlBasicCall.getOperator().getKind()) {
+ case AND:
+ List<DimFilter> dimFilters = new ArrayList<>();
+ for (SqlNode sqlNode : sqlBasicCall.getOperandList()) {
+ dimFilters.add(convertQueryToDimFilter(sqlNode, dateTimeZone));
+ }
+ return new AndDimFilter(dimFilters);
+ case OR:
+ dimFilters = new ArrayList<>();
+ for (SqlNode sqlNode : sqlBasicCall.getOperandList()) {
+ dimFilters.add(convertQueryToDimFilter(sqlNode, dateTimeZone));
+ }
+ return new OrDimFilter(dimFilters);
+ case NOT:
+ return new
NotDimFilter(convertQueryToDimFilter(sqlBasicCall.getOperandList().get(0),
dateTimeZone));
+ case GREATER_THAN_OR_EQUAL:
+ columnName = parseColumnName(operandList.get(0));
+ return new BoundDimFilter(
+ columnName,
+ parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+ null,
+ false,
+ null,
+ null,
+ null,
+ StringComparators.NUMERIC
+ );
+ case LESS_THAN_OR_EQUAL:
+ columnName = parseColumnName(operandList.get(0));
+ return new BoundDimFilter(
+ columnName,
+ null,
+ parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+ null,
+ false,
+ null,
+ null,
+ StringComparators.NUMERIC
+ );
+ case GREATER_THAN:
+ columnName = parseColumnName(operandList.get(0));
+ return new BoundDimFilter(
+ columnName,
+ parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+ null,
+ true,
+ null,
+ null,
+ null,
+ StringComparators.NUMERIC
+ );
+ case LESS_THAN:
+ columnName = parseColumnName(operandList.get(0));
+ return new BoundDimFilter(
+ columnName,
+ null,
+ parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+ null,
+ true,
+ null,
+ null,
+ StringComparators.NUMERIC
+ );
+ case BETWEEN:
+ columnName = parseColumnName(operandList.get(0));
+ return new BoundDimFilter(
+ columnName,
+ parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+ parseTimeStampWithTimeZone(operandList.get(2), dateTimeZone),
+ false,
+ false,
+ null,
+ null,
+ StringComparators.NUMERIC
+ );
+ default:
+ throw new ValidationException("Unsupported operation in OVERWRITE
WHERE clause: " + sqlBasicCall.getOperator().getName());
+ }
+ }
+
+ /**
+ * Converts a {@link SqlNode} identifier into a string representation
+ *
+ * @param sqlNode the sql node
+ * @return string representing the column name
+ * @throws ValidationException if the sql node is not an SqlIdentifier
+ */
+ public static String parseColumnName(SqlNode sqlNode) throws
ValidationException
+ {
+ if (!(sqlNode instanceof SqlIdentifier)) {
+ throw new ValidationException("Expressions must be of the form __time
<operator> TIMESTAMP");
+ }
+ return ((SqlIdentifier) sqlNode).getSimple();
+ }
+
+ /**
+ * Converts a {@link SqlNode} into a timestamp, taking into account the
timezone
+ *
+ * @param sqlNode the sql node
+ * @param timeZone timezone
+ * @return the timestamp string as milliseconds from epoch
+ * @throws ValidationException if the sql node is not a SqlTimestampLiteral
+ */
+ public static String parseTimeStampWithTimeZone(SqlNode sqlNode,
DateTimeZone timeZone) throws ValidationException
+ {
+ if (!(sqlNode instanceof SqlTimestampLiteral)) {
+ throw new ValidationException("Expressions must be of the form __time
<operator> TIMESTAMP");
+ }
+
+ Timestamp sqlTimestamp = Timestamp.valueOf(((SqlTimestampLiteral)
sqlNode).toFormattedString());
+ ZonedDateTime zonedTimestamp =
sqlTimestamp.toLocalDateTime().atZone(timeZone.toTimeZone().toZoneId());
+ return String.valueOf(zonedTimestamp.toInstant().toEpochMilli());
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
similarity index 63%
copy from
sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
copy to
sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index c941da3e59..a56fd69251 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -21,9 +21,12 @@ package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -31,22 +34,22 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
- * Extends the 'insert' call to hold custom parameters specific to Druid i.e.
PARTITIONED BY and CLUSTERED BY
+ * Extends the 'replace' call to hold custom parameters specific to Druid i.e.
PARTITIONED BY and the PARTITION SPECS
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted
into RelNode, and further processing
*/
-public class DruidSqlInsert extends SqlInsert
+public class DruidSqlReplace extends SqlInsert
{
- public static final String SQL_INSERT_SEGMENT_GRANULARITY =
"sqlInsertSegmentGranularity";
+ public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
- // This allows reusing super.unparse
- public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
+ public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE",
SqlKind.OTHER);
private final Granularity partitionedBy;
+
+ // Used in the unparse function to generate the original query since we
convert the string to an enum
private final String partitionedByStringForUnparse;
- @Nullable
- private final SqlNodeList clusteredBy;
+ private final SqlNode replaceTimeQuery;
/**
* While partitionedBy and partitionedByStringForUnparse can be null as
arguments to the constructor, this is
@@ -54,11 +57,11 @@ public class DruidSqlInsert extends SqlInsert
* errors when the PARTITIONED BY custom clause is not present, and keeps
its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to
explicitly specify the PARTITIONED BY clause
*/
- public DruidSqlInsert(
+ public DruidSqlReplace(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
- @Nullable SqlNodeList clusteredBy
+ @Nonnull SqlNode replaceTimeQuery
) throws ParseException
{
super(
@@ -69,20 +72,18 @@ public class DruidSqlInsert extends SqlInsert
insertNode.getTargetColumnList()
);
if (partitionedBy == null) {
- throw new ParseException("INSERT statements must specify PARTITIONED BY
clause explicitly");
+ throw new ParseException("REPLACE statements must specify PARTITIONED BY
clause explictly");
}
this.partitionedBy = partitionedBy;
- Preconditions.checkNotNull(partitionedByStringForUnparse);
- this.partitionedByStringForUnparse = partitionedByStringForUnparse;
+ this.partitionedByStringForUnparse =
Preconditions.checkNotNull(partitionedByStringForUnparse);
- this.clusteredBy = clusteredBy;
+ this.replaceTimeQuery = replaceTimeQuery;
}
- @Nullable
- public SqlNodeList getClusteredBy()
+ public SqlNode getReplaceTimeQuery()
{
- return clusteredBy;
+ return replaceTimeQuery;
}
public Granularity getPartitionedBy()
@@ -100,16 +101,29 @@ public class DruidSqlInsert extends SqlInsert
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
- super.unparse(writer, leftPrec, rightPrec);
+ writer.startList(SqlWriter.FrameTypeEnum.SELECT);
+ writer.sep("REPLACE INTO");
+ final int opLeft = getOperator().getLeftPrec();
+ final int opRight = getOperator().getRightPrec();
+ getTargetTable().unparse(writer, opLeft, opRight);
+
+ if (getTargetColumnList() != null) {
+ getTargetColumnList().unparse(writer, opLeft, opRight);
+ }
+ writer.newlineAndIndent();
+
+ writer.keyword("OVERWRITE");
+ if (replaceTimeQuery instanceof SqlLiteral) {
+ writer.keyword("ALL");
+ } else {
+ replaceTimeQuery.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+
+ getSource().unparse(writer, 0, 0);
+ writer.newlineAndIndent();
+
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
- if (getClusteredBy() != null) {
- writer.keyword("CLUSTERED BY");
- SqlWriter.Frame frame = writer.startList("", "");
- for (SqlNode clusterByOpts : getClusteredBy().getList()) {
- clusterByOpts.unparse(writer, leftPrec, rightPrec);
- }
- writer.endList(frame);
- }
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index b9ae4a353b..c552356efd 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -55,6 +55,7 @@ import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOrderBy;
@@ -83,6 +84,8 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
@@ -90,6 +93,7 @@ import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.utils.Throwables;
+import org.joda.time.DateTimeZone;
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -134,7 +138,7 @@ public class DruidPlanner implements Closeable
public ValidationResult validate(boolean authorizeContextParams) throws
SqlParseException, ValidationException
{
resetPlanner();
- final ParsedNodes parsed =
ParsedNodes.create(planner.parse(plannerContext.getSql()));
+ final ParsedNodes parsed =
ParsedNodes.create(planner.parse(plannerContext.getSql()),
plannerContext.getTimeZone());
final SqlValidator validator = getValidator();
final SqlNode validatedQueryNode;
@@ -150,8 +154,8 @@ public class DruidPlanner implements Closeable
final Set<ResourceAction> resourceActions = new
HashSet<>(resourceCollectorShuttle.getResourceActions());
- if (parsed.getInsertNode() != null) {
- final String targetDataSource =
validateAndGetDataSourceForInsert(parsed.getInsertNode());
+ if (parsed.getInsertOrReplace() != null) {
+ final String targetDataSource =
validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
resourceActions.add(new ResourceAction(new Resource(targetDataSource,
ResourceType.DATASOURCE), Action.WRITE));
}
if (authorizeContextParams) {
@@ -175,7 +179,7 @@ public class DruidPlanner implements Closeable
{
resetPlanner();
- final ParsedNodes parsed =
ParsedNodes.create(planner.parse(plannerContext.getSql()));
+ final ParsedNodes parsed =
ParsedNodes.create(planner.parse(plannerContext.getSql()),
plannerContext.getTimeZone());
final SqlNode validatedQueryNode = planner.validate(parsed.getQueryNode());
final RelRoot rootQueryRel = planner.rel(validatedQueryNode);
@@ -187,7 +191,7 @@ public class DruidPlanner implements Closeable
if (parsed.getExplainNode() != null) {
returnedRowType = getExplainStructType(typeFactory);
} else {
- returnedRowType = buildQueryMaker(rootQueryRel,
parsed.getInsertNode()).getResultType();
+ returnedRowType = buildQueryMaker(rootQueryRel,
parsed.getInsertOrReplace()).getResultType();
}
return new PrepareResult(returnedRowType, parameterTypes);
@@ -206,7 +210,7 @@ public class DruidPlanner implements Closeable
{
resetPlanner();
- final ParsedNodes parsed =
ParsedNodes.create(planner.parse(plannerContext.getSql()));
+ final ParsedNodes parsed =
ParsedNodes.create(planner.parse(plannerContext.getSql()),
plannerContext.getTimeZone());
try {
if (parsed.getIngestionGranularity() != null) {
@@ -220,6 +224,13 @@ public class DruidPlanner implements Closeable
throw new ValidationException("Unable to serialize partition
granularity.");
}
+ if (parsed.getReplaceIntervals() != null) {
+ plannerContext.getQueryContext().addSystemParam(
+ DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
+ String.join(",", parsed.getReplaceIntervals())
+ );
+ }
+
// the planner's type factory is not available until after parsing
this.rexBuilder = new RexBuilder(planner.getTypeFactory());
final SqlNode parameterizedQueryNode =
rewriteDynamicParameters(parsed.getQueryNode());
@@ -227,7 +238,7 @@ public class DruidPlanner implements Closeable
final RelRoot rootQueryRel = planner.rel(validatedQueryNode);
try {
- return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(),
parsed.getInsertNode());
+ return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(),
parsed.getInsertOrReplace());
}
catch (Exception e) {
Throwable cannotPlanException = Throwables.getCauseOfType(e,
RelOptPlanner.CannotPlanException.class);
@@ -236,9 +247,9 @@ public class DruidPlanner implements Closeable
throw e;
}
- // If there isn't any INSERT clause, then we should try again with
BINDABLE convention. And return without
+ // If there isn't any ingestion clause, then we should try again with
BINDABLE convention. And return without
// any error, if it is plannable by the bindable convention
- if (parsed.getInsertNode() == null) {
+ if (parsed.getInsertOrReplace() == null) {
// Try again with BINDABLE convention. Used for querying Values and
metadata tables.
try {
return planWithBindableConvention(rootQueryRel,
parsed.getExplainNode());
@@ -295,12 +306,11 @@ public class DruidPlanner implements Closeable
private PlannerResult planWithDruidConvention(
final RelRoot root,
@Nullable final SqlExplain explain,
- @Nullable final SqlInsert insert
+ @Nullable final SqlInsert insertOrReplace
) throws ValidationException, RelConversionException
{
final RelRoot possiblyLimitedRoot =
possiblyWrapRootWithOuterLimitFromContext(root);
-
- final QueryMaker queryMaker = buildQueryMaker(root, insert);
+ final QueryMaker queryMaker = buildQueryMaker(root, insertOrReplace);
plannerContext.setQueryMaker(queryMaker);
RelNode parameterized =
rewriteRelDynamicParameters(possiblyLimitedRoot.rel);
@@ -625,7 +635,7 @@ public class DruidPlanner implements Closeable
/**
* Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap
out any
- * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link
org.apache.calcite.sql.SqlLiteral}
+ * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link
SqlLiteral}
* replacement
*/
private SqlNode rewriteDynamicParameters(SqlNode parsed)
@@ -650,11 +660,11 @@ public class DruidPlanner implements Closeable
private QueryMaker buildQueryMaker(
final RelRoot rootQueryRel,
- @Nullable final SqlInsert insert
+ @Nullable final SqlInsert insertOrReplace
) throws ValidationException
{
- if (insert != null) {
- final String targetDataSource =
validateAndGetDataSourceForInsert(insert);
+ if (insertOrReplace != null) {
+ final String targetDataSource =
validateAndGetDataSourceForIngest(insertOrReplace);
validateColumnsForIngestion(rootQueryRel);
return queryMakerFactory.buildForInsert(targetDataSource, rootQueryRel,
plannerContext);
} else {
@@ -674,17 +684,17 @@ public class DruidPlanner implements Closeable
}
/**
- * Extract target datasource from a {@link SqlInsert}, and also validate
that the INSERT is of a form we support.
- * Expects the INSERT target to be either an unqualified name, or a name
qualified by the default schema.
+ * Extract target datasource from a {@link SqlInsert}, and also validate
that the ingestion is of a form we support.
+ * Expects the target datasource to be either an unqualified name, or a name
qualified by the default schema.
*/
- private String validateAndGetDataSourceForInsert(final SqlInsert insert)
throws ValidationException
+ private String validateAndGetDataSourceForIngest(final SqlInsert insert)
throws ValidationException
{
if (insert.isUpsert()) {
throw new ValidationException("UPSERT is not supported.");
}
if (insert.getTargetColumnList() != null) {
- throw new ValidationException("INSERT with target column list is not
supported.");
+ throw new ValidationException("Ingestion with target column list is not
supported.");
}
final SqlIdentifier tableIdentifier = (SqlIdentifier)
insert.getTargetTable();
@@ -692,7 +702,7 @@ public class DruidPlanner implements Closeable
if (tableIdentifier.names.isEmpty()) {
// I don't think this can happen, but include a branch for it just in
case.
- throw new ValidationException("INSERT requires target table.");
+ throw new ValidationException("Ingestion requires target table.");
} else if (tableIdentifier.names.size() == 1) {
// Unqualified name.
dataSource = Iterables.getOnlyElement(tableIdentifier.names);
@@ -705,13 +715,13 @@ public class DruidPlanner implements Closeable
dataSource = tableIdentifier.names.get(1);
} else {
throw new ValidationException(
- StringUtils.format("Cannot INSERT into [%s] because it is not a
Druid datasource.", tableIdentifier)
+ StringUtils.format("Cannot ingest into [%s] because it is not a
Druid datasource.", tableIdentifier)
);
}
}
try {
- IdUtils.validateId("INSERT dataSource", dataSource);
+ IdUtils.validateId("Ingestion dataSource", dataSource);
}
catch (IllegalArgumentException e) {
throw new ValidationException(e.getMessage());
@@ -777,84 +787,129 @@ public class DruidPlanner implements Closeable
private final SqlExplain explain;
@Nullable
- private final DruidSqlInsert insert;
+ private final SqlInsert insertOrReplace;
private final SqlNode query;
@Nullable
private final Granularity ingestionGranularity;
+ @Nullable
+ private final List<String> replaceIntervals;
+
private ParsedNodes(
@Nullable SqlExplain explain,
- @Nullable DruidSqlInsert insert,
+ @Nullable SqlInsert insertOrReplace,
SqlNode query,
- @Nullable Granularity ingestionGranularity
+ @Nullable Granularity ingestionGranularity,
+ @Nullable List<String> replaceIntervals
)
{
this.explain = explain;
- this.insert = insert;
+ this.insertOrReplace = insertOrReplace;
this.query = query;
this.ingestionGranularity = ingestionGranularity;
+ this.replaceIntervals = replaceIntervals;
}
- static ParsedNodes create(final SqlNode node) throws ValidationException
+ static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone)
throws ValidationException
{
- SqlExplain explain = null;
- DruidSqlInsert druidSqlInsert = null;
SqlNode query = node;
- Granularity ingestionGranularity = null;
-
+ SqlExplain explain = null;
if (query.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) query;
query = explain.getExplicandum();
}
if (query.getKind() == SqlKind.INSERT) {
- druidSqlInsert = (DruidSqlInsert) query;
- query = druidSqlInsert.getSource();
+ if (query instanceof DruidSqlInsert) {
+ return handleInsert(explain, (DruidSqlInsert) query);
+ } else if (query instanceof DruidSqlReplace) {
+ return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone);
+ }
+ }
+
+ if (!query.isA(SqlKind.QUERY)) {
+ throw new ValidationException(StringUtils.format("Cannot execute
[%s].", query.getKind()));
+ }
+
+ return new ParsedNodes(explain, null, query, null, null);
+ }
+
+ static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert
druidSqlInsert) throws ValidationException
+ {
+ SqlNode query = druidSqlInsert.getSource();
+
+ // Check if ORDER BY clause is not provided to the underlying query
+ if (query instanceof SqlOrderBy) {
+ SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+ SqlNodeList orderByList = sqlOrderBy.orderList;
+ if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
+ throw new ValidationException("Cannot have ORDER BY on an INSERT
query, use CLUSTERED BY instead.");
+ }
+ }
+
+ Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
+
+ if (druidSqlInsert.getClusteredBy() != null) {
+ // If we have a CLUSTERED BY clause, extract the information in that
CLUSTERED BY and create a new
+ // SqlOrderBy node
+ SqlNode offset = null;
+ SqlNode fetch = null;
- // Check if ORDER BY clause is not provided to the underlying query
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
- SqlNodeList orderByList = sqlOrderBy.orderList;
- if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY)))
{
- throw new ValidationException("Cannot have ORDER BY on an INSERT
query, use CLUSTERED BY instead.");
- }
+ // This represents the underlying query free of OFFSET, FETCH and
ORDER BY clauses
+ // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
+ // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo
GROUP BY dim1
+ query = sqlOrderBy.query;
+ offset = sqlOrderBy.offset;
+ fetch = sqlOrderBy.fetch;
}
+ // Creates a new SqlOrderBy query, which may have our CLUSTERED BY
overwritten
+ query = new SqlOrderBy(
+ query.getParserPosition(),
+ query,
+ druidSqlInsert.getClusteredBy(),
+ offset,
+ fetch
+ );
+ }
- ingestionGranularity = druidSqlInsert.getPartitionedBy();
-
- if (druidSqlInsert.getClusteredBy() != null) {
- // If we have a CLUSTERED BY clause, extract the information in that
CLUSTERED BY and create a new SqlOrderBy
- // node
- SqlNode offset = null;
- SqlNode fetch = null;
-
- if (query instanceof SqlOrderBy) {
- SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
- // This represents the underlying query free of OFFSET, FETCH and
ORDER BY clauses
- // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP BY dim1
- // this would represent the "SELECT dim1, sum(dim2) from foo GROUP
BY dim1
- query = sqlOrderBy.query;
- offset = sqlOrderBy.offset;
- fetch = sqlOrderBy.fetch;
- }
- // Creates a new SqlOrderBy query, which may have our CLUSTERED BY
overwritten
- query = new SqlOrderBy(
- query.getParserPosition(),
- query,
- druidSqlInsert.getClusteredBy(),
- offset,
- fetch
- );
+ if (!query.isA(SqlKind.QUERY)) {
+ throw new ValidationException(StringUtils.format("Cannot execute
[%s].", query.getKind()));
+ }
+
+ return new ParsedNodes(explain, druidSqlInsert, query,
ingestionGranularity, null);
+ }
+
+ static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace
druidSqlReplace, DateTimeZone dateTimeZone)
+ throws ValidationException
+ {
+ SqlNode query = druidSqlReplace.getSource();
+
+ // Check if ORDER BY clause is not provided to the underlying query
+ if (query instanceof SqlOrderBy) {
+ SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+ SqlNodeList orderByList = sqlOrderBy.orderList;
+ if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
+ throw new ValidationException("Cannot have ORDER BY on a REPLACE
query.");
}
}
+ SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery();
+ if (replaceTimeQuery == null) {
+ throw new ValidationException("Missing time chunk information in
DELETE WHERE clause for replace.");
+ }
+
+ Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
+ List<String> replaceIntervals =
DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery,
ingestionGranularity, dateTimeZone);
+
if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute
[%s].", query.getKind()));
}
- return new ParsedNodes(explain, druidSqlInsert, query,
ingestionGranularity);
+ return new ParsedNodes(explain, druidSqlReplace, query,
ingestionGranularity, replaceIntervals);
}
@Nullable
@@ -864,9 +919,15 @@ public class DruidPlanner implements Closeable
}
@Nullable
- public DruidSqlInsert getInsertNode()
+ public SqlInsert getInsertOrReplace()
+ {
+ return insertOrReplace;
+ }
+
+ @Nullable
+ public List<String> getReplaceIntervals()
{
- return insert;
+ return replaceIntervals;
}
public SqlNode getQueryNode()
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
new file mode 100644
index 0000000000..7040e7fc53
--- /dev/null
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.calcite.external.ExternalDataSource;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
+{
+ protected static final Map<String, Object> DEFAULT_CONTEXT =
+ ImmutableMap.<String, Object>builder()
+ .put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
+ .build();
+
+ protected static final RowSignature FOO_TABLE_SIGNATURE =
+ RowSignature.builder()
+ .addTimeColumn()
+ .add("cnt", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("dim3", ColumnType.STRING)
+ .add("m1", ColumnType.FLOAT)
+ .add("m2", ColumnType.DOUBLE)
+ .add("unique_dim1", HyperUniquesAggregatorFactory.TYPE)
+ .build();
+
+ protected final ExternalDataSource externalDataSource = new
ExternalDataSource(
+ new InlineInputSource("a,b,1\nc,d,2\n"),
+ new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false,
0),
+ RowSignature.builder()
+ .add("x", ColumnType.STRING)
+ .add("y", ColumnType.STRING)
+ .add("z", ColumnType.LONG)
+ .build()
+ );
+
+ protected boolean didTest = false;
+
+ @After
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ // Catch situations where tests forgot to call "verify" on their tester.
+ if (!didTest) {
+ throw new ISE("Test was not run; did you call verify() on a tester?");
+ }
+ }
+
+ protected String externSql(final ExternalDataSource externalDataSource)
+ {
+ try {
+ return StringUtils.format(
+ "TABLE(extern(%s, %s, %s))",
+
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
+
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
+
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Map<String, Object> queryContextWithGranularity(Granularity
granularity)
+ {
+ String granularityString = null;
+ try {
+ granularityString = queryJsonMapper.writeValueAsString(granularity);
+ }
+ catch (JsonProcessingException e) {
+ Assert.fail(e.getMessage());
+ }
+ return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
granularityString);
+ }
+
+ protected IngestionDmlTester testIngestionQuery()
+ {
+ return new IngestionDmlTester();
+ }
+
+ public class IngestionDmlTester
+ {
+ private String sql;
+ private PlannerConfig plannerConfig = new PlannerConfig();
+ private Map<String, Object> queryContext = DEFAULT_CONTEXT;
+ private AuthenticationResult authenticationResult =
CalciteTests.REGULAR_USER_AUTH_RESULT;
+ private String expectedTargetDataSource;
+ private RowSignature expectedTargetSignature;
+ private List<ResourceAction> expectedResources;
+ private Query expectedQuery;
+ private Matcher<Throwable> validationErrorMatcher;
+
+ private IngestionDmlTester()
+ {
+ // Nothing to do.
+ }
+
+ public IngestionDmlTester sql(final String sql)
+ {
+ this.sql = sql;
+ return this;
+ }
+
+ protected IngestionDmlTester sql(final String sqlPattern, final Object
arg, final Object... otherArgs)
+ {
+ final Object[] args = new Object[otherArgs.length + 1];
+ args[0] = arg;
+ System.arraycopy(otherArgs, 0, args, 1, otherArgs.length);
+ this.sql = StringUtils.format(sqlPattern, args);
+ return this;
+ }
+
+ public IngestionDmlTester context(final Map<String, Object> context)
+ {
+ this.queryContext = context;
+ return this;
+ }
+
+ public IngestionDmlTester authentication(final AuthenticationResult
authenticationResult)
+ {
+ this.authenticationResult = authenticationResult;
+ return this;
+ }
+
+ public IngestionDmlTester expectTarget(
+ final String expectedTargetDataSource,
+ final RowSignature expectedTargetSignature
+ )
+ {
+ this.expectedTargetDataSource =
Preconditions.checkNotNull(expectedTargetDataSource,
"expectedTargetDataSource");
+ this.expectedTargetSignature =
Preconditions.checkNotNull(expectedTargetSignature, "expectedTargetSignature");
+ return this;
+ }
+
+ public IngestionDmlTester expectResources(final ResourceAction...
expectedResources)
+ {
+ this.expectedResources = Arrays.asList(expectedResources);
+ return this;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public IngestionDmlTester expectQuery(final Query expectedQuery)
+ {
+ this.expectedQuery = expectedQuery;
+ return this;
+ }
+
+ public IngestionDmlTester expectValidationError(Matcher<Throwable>
validationErrorMatcher)
+ {
+ this.validationErrorMatcher = validationErrorMatcher;
+ return this;
+ }
+
+ public IngestionDmlTester expectValidationError(Class<? extends Throwable>
clazz)
+ {
+ return expectValidationError(CoreMatchers.instanceOf(clazz));
+ }
+
+ public IngestionDmlTester expectValidationError(Class<? extends Throwable>
clazz, String message)
+ {
+ return expectValidationError(
+ CoreMatchers.allOf(
+ CoreMatchers.instanceOf(clazz),
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(message))
+ )
+ );
+ }
+
+ public void verify()
+ {
+ if (didTest) {
+ // It's good form to only do one test per method.
+ // This also helps us ensure that "verify" actually does get called.
+ throw new ISE("Use one @Test method per tester");
+ }
+
+ didTest = true;
+
+ if (sql == null) {
+ throw new ISE("Test must have SQL statement");
+ }
+
+ try {
+ log.info("SQL: %s", sql);
+ queryLogHook.clearRecordedQueries();
+
+ if (validationErrorMatcher != null) {
+ verifyValidationError();
+ } else {
+ verifySuccess();
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void verifyValidationError()
+ {
+ if (expectedTargetDataSource != null) {
+ throw new ISE("Test must not have expectedTargetDataSource");
+ }
+
+ if (expectedResources != null) {
+ throw new ISE("Test must not have expectedResources");
+ }
+
+ if (expectedQuery != null) {
+ throw new ISE("Test must not have expectedQuery");
+ }
+
+ final SqlLifecycleFactory sqlLifecycleFactory = getSqlLifecycleFactory(
+ plannerConfig,
+ new AuthConfig(),
+ createOperatorTable(),
+ createMacroTable(),
+ CalciteTests.TEST_AUTHORIZER_MAPPER,
+ queryJsonMapper
+ );
+
+ final SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+ sqlLifecycle.initialize(sql, new QueryContext(queryContext));
+
+ final Throwable e = Assert.assertThrows(
+ Throwable.class,
+ () -> {
+ sqlLifecycle.validateAndAuthorize(authenticationResult);
+ sqlLifecycle.plan();
+ }
+ );
+
+ MatcherAssert.assertThat(e, validationErrorMatcher);
+ Assert.assertTrue(queryLogHook.getRecordedQueries().isEmpty());
+ }
+
+ private void verifySuccess() throws Exception
+ {
+ if (expectedTargetDataSource == null) {
+ throw new ISE("Test must have expectedTargetDataSource");
+ }
+
+ if (expectedResources == null) {
+ throw new ISE("Test must have expectedResources");
+ }
+
+ final List<Query> expectedQueries =
+ expectedQuery == null
+ ? Collections.emptyList()
+ :
Collections.singletonList(recursivelyOverrideContext(expectedQuery,
queryContext));
+
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedResources),
+ analyzeResources(plannerConfig, new AuthConfig(), sql, queryContext,
authenticationResult)
+ );
+
+ final List<Object[]> results =
+ getResults(plannerConfig, queryContext, Collections.emptyList(),
sql, authenticationResult);
+
+ verifyResults(
+ sql,
+ expectedQueries,
+ Collections.singletonList(new Object[]{expectedTargetDataSource,
expectedTargetSignature}),
+ results
+ );
+ }
+ }
+
+ protected static ResourceAction viewRead(final String viewName)
+ {
+ return new ResourceAction(new Resource(viewName, ResourceType.VIEW),
Action.READ);
+ }
+
+ protected static ResourceAction dataSourceRead(final String dataSource)
+ {
+ return new ResourceAction(new Resource(dataSource,
ResourceType.DATASOURCE), Action.READ);
+ }
+
+ protected static ResourceAction dataSourceWrite(final String dataSource)
+ {
+ return new ResourceAction(new Resource(dataSource,
ResourceType.DATASOURCE), Action.WRITE);
+ }
+}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index caa1ef8486..87c389a722 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -20,110 +20,44 @@
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
-import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.AuthConfig;
-import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
-import org.apache.druid.server.security.Resource;
-import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.SqlLifecycleFactory;
import org.apache.druid.sql.SqlPlanningException;
-import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.CoreMatchers;
-import org.hamcrest.Matcher;
-import org.hamcrest.MatcherAssert;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-public class CalciteInsertDmlTest extends BaseCalciteQueryTest
+public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
{
- private static final Map<String, Object> DEFAULT_CONTEXT =
- ImmutableMap.<String, Object>builder()
- .put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
- .build();
-
- private static final RowSignature FOO_TABLE_SIGNATURE =
- RowSignature.builder()
- .addTimeColumn()
- .add("cnt", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("dim2", ColumnType.STRING)
- .add("dim3", ColumnType.STRING)
- .add("m1", ColumnType.FLOAT)
- .add("m2", ColumnType.DOUBLE)
- .add("unique_dim1", HyperUniquesAggregatorFactory.TYPE)
- .build();
-
- private final ExternalDataSource externalDataSource = new ExternalDataSource(
- new InlineInputSource("a,b,1\nc,d,2\n"),
- new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false,
0),
- RowSignature.builder()
- .add("x", ColumnType.STRING)
- .add("y", ColumnType.STRING)
- .add("z", ColumnType.LONG)
- .build()
- );
-
private static final Map<String, Object>
PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}"
);
- private boolean didTest = false;
-
- @After
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
-
- // Catch situations where tests forgot to call "verify" on their tester.
- if (!didTest) {
- throw new ISE("Test was not run; did you call verify() on a tester?");
- }
- }
-
@Test
public void testInsertFromTable()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@@ -141,7 +75,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertFromView()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM view.aview PARTITIONED BY ALL
TIME")
.expectTarget("dst", RowSignature.builder().add("dim1_firstchar",
ColumnType.STRING).build())
.expectResources(viewRead("aview"), dataSourceWrite("dst"))
@@ -161,7 +95,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoExistingTable()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO foo SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("foo", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
@@ -179,7 +113,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoQualifiedTable()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO druid.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@@ -197,25 +131,25 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoInvalidDataSourceName()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO \"in/valid\" SELECT dim1, dim2 FROM foo PARTITIONED
BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "INSERT dataSource
cannot contain the '/' character.")
+ .expectValidationError(SqlPlanningException.class, "Ingestion
dataSource cannot contain the '/' character.")
.verify();
}
@Test
public void testInsertUsingColumnList()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo
PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "INSERT with target
column list is not supported.")
+ .expectValidationError(SqlPlanningException.class, "Ingestion with
target column list is not supported.")
.verify();
}
@Test
public void testUpsert()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("UPSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "UPSERT is not
supported.")
.verify();
@@ -224,11 +158,11 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoSystemTable()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo
PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not
a Druid datasource."
+ "Cannot ingest into [INFORMATION_SCHEMA.COLUMNS] because it is not
a Druid datasource."
)
.verify();
}
@@ -236,11 +170,11 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoView()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL
TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot INSERT into [view.aview] because it is not a Druid
datasource."
+ "Cannot ingest into [view.aview] because it is not a Druid
datasource."
)
.verify();
}
@@ -248,7 +182,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertFromUnauthorizedDataSource()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM \"%s\" PARTITIONED BY ALL TIME",
CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
@@ -257,7 +191,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoUnauthorizedDataSource()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO \"%s\" SELECT * FROM foo PARTITIONED BY ALL TIME",
CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
@@ -266,11 +200,11 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertIntoNonexistentSchema()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL
TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot INSERT into [nonexistent.dst] because it is not a Druid
datasource."
+ "Cannot ingest into [nonexistent.dst] because it is not a Druid
datasource."
)
.verify();
}
@@ -278,7 +212,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertFromExternal()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", externalDataSource.getSignature())
@@ -304,7 +238,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
.add("dim1",
ColumnType.STRING)
.build();
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1
FROM foo PARTITIONED BY TIME_FLOOR(__time, 'PT1H')")
.expectTarget("dst", targetRowSignature)
@@ -353,7 +287,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
Assert.fail(e.getMessage());
}
- testInsertQuery()
+ testIngestionQuery()
.sql(StringUtils.format(
"INSERT INTO druid.dst SELECT __time, dim1 FROM foo PARTITIONED
BY %s",
partitionedByArgument
@@ -384,7 +318,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
.add("dim1",
ColumnType.STRING)
.add("ceil_m2",
ColumnType.DOUBLE)
.build();
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2
FROM foo "
@@ -424,7 +358,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
.add("dim1",
ColumnType.STRING)
.build();
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1
FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
.expectTarget("dst", targetRowSignature)
@@ -456,7 +390,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
.add("dim1",
ColumnType.STRING)
.build();
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1
FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
.expectTarget("dst", targetRowSignature)
@@ -583,7 +517,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
+ queryJsonMapper.writeValueAsString(expectedQuery)
+ "], signature=[{x:STRING, y:STRING, z:LONG}])\n";
- // Use testQuery for EXPLAIN (not testInsertQuery).
+ // Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery(
new PlannerConfig(),
StringUtils.format(
@@ -600,14 +534,14 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
)
);
- // Not using testInsertQuery, so must set didTest manually to satisfy the
check in tearDown.
+ // Not using testIngestionQuery, so must set didTest manually to satisfy
the check in tearDown.
didTest = true;
}
@Test
public void testExplainInsertFromExternalUnauthorized()
{
- // Use testQuery for EXPLAIN (not testInsertQuery).
+ // Use testQuery for EXPLAIN (not testIngestionQuery).
Assert.assertThrows(
ForbiddenException.class,
() ->
@@ -621,14 +555,14 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
)
);
- // Not using testInsertQuery, so must set didTest manually to satisfy the
check in tearDown.
+ // Not using testIngestionQuery, so must set didTest manually to satisfy
the check in tearDown.
didTest = true;
}
@Test
public void testInsertFromExternalUnauthorized()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource))
.expectValidationError(ForbiddenException.class)
.verify();
@@ -639,7 +573,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
{
// INSERT with a particular column ordering.
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO dst SELECT x || y AS xy, z FROM %s PARTITIONED BY ALL
TIME CLUSTERED BY 1, 2",
externSql(externalDataSource)
@@ -670,7 +604,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
{
// INSERT with rollup.
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO dst SELECT x, SUM(z) AS sum_z, COUNT(*) AS cnt FROM
%s GROUP BY 1 PARTITIONED BY ALL TIME",
externSql(externalDataSource)
@@ -706,7 +640,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
{
// INSERT with rollup into a single row (no GROUP BY exprs).
- testInsertQuery()
+ testIngestionQuery()
.sql(
"INSERT INTO dst SELECT COUNT(*) AS cnt FROM %s PARTITIONED BY ALL
TIME",
externSql(externalDataSource)
@@ -734,7 +668,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertWithInvalidSelectStatement()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO t SELECT channel, added as count FROM foo
PARTITIONED BY ALL") // count is a keyword
.expectValidationError(
CoreMatchers.allOf(
@@ -748,7 +682,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertWithUnnamedColumnInSelectStatement()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED
BY ALL")
.expectValidationError(
SqlPlanningException.class,
@@ -763,7 +697,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertWithInvalidColumnNameInIngest()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED
BY ALL")
.expectValidationError(
SqlPlanningException.class,
@@ -778,7 +712,7 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
@Test
public void testInsertWithUnnamedColumnInNestedSelectStatement()
{
- testInsertQuery()
+ testIngestionQuery()
.sql("INSERT INTO test "
+ "SELECT __time, * FROM "
+ "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
@@ -791,238 +725,4 @@ public class CalciteInsertDmlTest extends
BaseCalciteQueryTest
)
.verify();
}
-
- private String externSql(final ExternalDataSource externalDataSource)
- {
- try {
- return StringUtils.format(
- "TABLE(extern(%s, %s, %s))",
-
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
-
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
-
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
- private Map<String, Object> queryContextWithGranularity(Granularity
granularity)
- {
- String granularityString = null;
- try {
- granularityString = queryJsonMapper.writeValueAsString(granularity);
- }
- catch (JsonProcessingException e) {
- Assert.fail(e.getMessage());
- }
- return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
granularityString);
- }
-
- private InsertDmlTester testInsertQuery()
- {
- return new InsertDmlTester();
- }
-
- public class InsertDmlTester
- {
- private String sql;
- private PlannerConfig plannerConfig = new PlannerConfig();
- private Map<String, Object> queryContext = DEFAULT_CONTEXT;
- private AuthenticationResult authenticationResult =
CalciteTests.REGULAR_USER_AUTH_RESULT;
- private String expectedTargetDataSource;
- private RowSignature expectedTargetSignature;
- private List<ResourceAction> expectedResources;
- private Query expectedQuery;
- private Matcher<Throwable> validationErrorMatcher;
-
- private InsertDmlTester()
- {
- // Nothing to do.
- }
-
- public InsertDmlTester sql(final String sql)
- {
- this.sql = sql;
- return this;
- }
-
- private InsertDmlTester sql(final String sqlPattern, final Object arg,
final Object... otherArgs)
- {
- final Object[] args = new Object[otherArgs.length + 1];
- args[0] = arg;
- System.arraycopy(otherArgs, 0, args, 1, otherArgs.length);
- this.sql = StringUtils.format(sqlPattern, args);
- return this;
- }
-
- public InsertDmlTester context(final Map<String, Object> context)
- {
- this.queryContext = context;
- return this;
- }
-
- public InsertDmlTester authentication(final AuthenticationResult
authenticationResult)
- {
- this.authenticationResult = authenticationResult;
- return this;
- }
-
- public InsertDmlTester expectTarget(
- final String expectedTargetDataSource,
- final RowSignature expectedTargetSignature
- )
- {
- this.expectedTargetDataSource =
Preconditions.checkNotNull(expectedTargetDataSource,
"expectedTargetDataSource");
- this.expectedTargetSignature =
Preconditions.checkNotNull(expectedTargetSignature, "expectedTargetSignature");
- return this;
- }
-
- public InsertDmlTester expectResources(final ResourceAction...
expectedResources)
- {
- this.expectedResources = Arrays.asList(expectedResources);
- return this;
- }
-
- @SuppressWarnings("rawtypes")
- public InsertDmlTester expectQuery(final Query expectedQuery)
- {
- this.expectedQuery = expectedQuery;
- return this;
- }
-
- public InsertDmlTester expectValidationError(Matcher<Throwable>
validationErrorMatcher)
- {
- this.validationErrorMatcher = validationErrorMatcher;
- return this;
- }
-
- public InsertDmlTester expectValidationError(Class<? extends Throwable>
clazz)
- {
- return expectValidationError(CoreMatchers.instanceOf(clazz));
- }
-
- public InsertDmlTester expectValidationError(Class<? extends Throwable>
clazz, String message)
- {
- return expectValidationError(
- CoreMatchers.allOf(
- CoreMatchers.instanceOf(clazz),
- ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(message))
- )
- );
- }
-
- public void verify()
- {
- if (didTest) {
- // It's good form to only do one test per method.
- // This also helps us ensure that "verify" actually does get called.
- throw new ISE("Use one @Test method per tester");
- }
-
- didTest = true;
-
- if (sql == null) {
- throw new ISE("Test must have SQL statement");
- }
-
- try {
- log.info("SQL: %s", sql);
- queryLogHook.clearRecordedQueries();
-
- if (validationErrorMatcher != null) {
- verifyValidationError();
- } else {
- verifySuccess();
- }
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void verifyValidationError()
- {
- if (expectedTargetDataSource != null) {
- throw new ISE("Test must not have expectedTargetDataSource");
- }
-
- if (expectedResources != null) {
- throw new ISE("Test must not have expectedResources");
- }
-
- if (expectedQuery != null) {
- throw new ISE("Test must not have expectedQuery");
- }
-
- final SqlLifecycleFactory sqlLifecycleFactory = getSqlLifecycleFactory(
- plannerConfig,
- new AuthConfig(),
- createOperatorTable(),
- createMacroTable(),
- CalciteTests.TEST_AUTHORIZER_MAPPER,
- queryJsonMapper
- );
-
- final SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
- sqlLifecycle.initialize(sql, new QueryContext(queryContext));
-
- final Throwable e = Assert.assertThrows(
- Throwable.class,
- () -> {
- sqlLifecycle.validateAndAuthorize(authenticationResult);
- sqlLifecycle.plan();
- }
- );
-
- MatcherAssert.assertThat(e, validationErrorMatcher);
- Assert.assertTrue(queryLogHook.getRecordedQueries().isEmpty());
- }
-
- private void verifySuccess() throws Exception
- {
- if (expectedTargetDataSource == null) {
- throw new ISE("Test must have expectedTargetDataSource");
- }
-
- if (expectedResources == null) {
- throw new ISE("Test must have expectedResources");
- }
-
- final List<Query> expectedQueries =
- expectedQuery == null
- ? Collections.emptyList()
- :
Collections.singletonList(recursivelyOverrideContext(expectedQuery,
queryContext));
-
- Assert.assertEquals(
- ImmutableSet.copyOf(expectedResources),
- analyzeResources(plannerConfig, sql, authenticationResult)
- );
-
- final List<Object[]> results =
- getResults(plannerConfig, queryContext, Collections.emptyList(),
sql, authenticationResult);
-
- verifyResults(
- sql,
- expectedQueries,
- Collections.singletonList(new Object[]{expectedTargetDataSource,
expectedTargetSignature}),
- results
- );
- }
- }
-
- private static ResourceAction viewRead(final String viewName)
- {
- return new ResourceAction(new Resource(viewName, ResourceType.VIEW),
Action.READ);
- }
-
- private static ResourceAction dataSourceRead(final String dataSource)
- {
- return new ResourceAction(new Resource(dataSource,
ResourceType.DATASOURCE), Action.READ);
- }
-
- private static ResourceAction dataSourceWrite(final String dataSource)
- {
- return new ResourceAction(new Resource(dataSource,
ResourceType.DATASOURCE), Action.WRITE);
- }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
new file mode 100644
index 0000000000..04354d126f
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlPlanningException;
+import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
+{
+ private static final Map<String, Object> REPLACE_ALL_TIME_CHUNKS =
ImmutableMap.of(
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+ "{\"type\":\"all\"}",
+ DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
+ "all"
+ );
+
+ protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String,
Object> context, String replaceTimeChunks)
+ {
+ return ImmutableMap.<String, Object>builder()
+ .putAll(context)
+ .put(DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
replaceTimeChunks)
+ .build();
+ }
+
+ @Test
+ public void testReplaceFromTableWithReplaceAll()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY
ALL TIME")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromTableWithDeleteWhereClause()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01
00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' "
+ + "SELECT * FROM foo PARTITIONED BY DAY")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.DAY),
+ "2000-01-01T00:00:00.000Z/2000-01-02T00:00:00.000Z"
+ )
+ )
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromTableWithTimeZoneInQueryContext()
+ {
+ HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+ context.put(PlannerContext.CTX_SQL_TIME_ZONE, "+05:30");
+ testIngestionQuery()
+ .context(context)
+ .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01
05:30:00' AND __time < TIMESTAMP '2000-01-02 05:30:00' "
+ + "SELECT * FROM foo PARTITIONED BY DAY")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.DAY),
+ "2000-01-01T00:00:00.000Z/2000-01-02T00:00:00.000Z"
+ )
+ )
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromTableWithIntervalLargerThanOneGranularity()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE "
+ + "__time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP
'2000-05-01' "
+ + "SELECT * FROM foo PARTITIONED BY MONTH")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.MONTH),
+ "2000-01-01T00:00:00.000Z/2000-05-01T00:00:00.000Z"
+ )
+ )
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromTableWithComplexDeleteWhereClause()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE "
+ + "__time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP
'2000-02-01' "
+ + "OR __time >= TIMESTAMP '2000-03-01' AND __time < TIMESTAMP
'2000-04-01' "
+ + "SELECT * FROM foo PARTITIONED BY MONTH")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.MONTH),
+
"2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z,2000-03-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"
+ )
+ )
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromTableWithBetweenClause()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE "
+ + "__time BETWEEN TIMESTAMP '2000-01-01' AND TIMESTAMP
'2000-01-31 23:59:59.999' "
+ + "SELECT * FROM foo PARTITIONED BY MONTH")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.MONTH),
+ "2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z"
+ )
+ )
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForUnsupportedDeleteWhereClause()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE __time LIKE '20__-02-01' SELECT
* FROM foo PARTITIONED BY MONTH")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Unsupported operation in OVERWRITE WHERE clause: LIKE"
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForInvalidDeleteWhereClause()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE TRUE SELECT * FROM foo
PARTITIONED BY MONTH")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Invalid OVERWRITE WHERE clause"
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForDeleteWhereClauseOnUnsupportedColumns()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE dim1 > TIMESTAMP '2000-01-05
00:00:00' SELECT * FROM foo PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Only __time column is supported in OVERWRITE WHERE clause"
+ )
+ .verify();
+ }
+
+
+ @Test
+ public void testReplaceWithOrderBy()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1
PARTITIONED BY ALL TIME")
+ .expectValidationError(SqlPlanningException.class, "Cannot have ORDER
BY on a REPLACE query.")
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForMisalignedPartitionInterval()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-05
00:00:00' AND __time <= TIMESTAMP '2000-01-06 00:00:00' SELECT * FROM foo
PARTITIONED BY MONTH")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "OVERWRITE WHERE clause contains an interval
[2000-01-05T00:00:00.000Z/2000-01-06T00:00:00.001Z] which is not aligned with
PARTITIONED BY granularity {type=period, period=P1M, timeZone=UTC, origin=null}"
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForInvalidPartition()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-05
00:00:00' AND __time <= TIMESTAMP '2000-02-05 00:00:00' SELECT * FROM foo
PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "OVERWRITE WHERE clause contains an interval
[2000-01-05T00:00:00.000Z/2000-02-05T00:00:00.001Z] which is not aligned with
PARTITIONED BY granularity AllGranularity"
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromTableWithEmptyInterval()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE "
+ + "__time < TIMESTAMP '2000-01-01' AND __time > TIMESTAMP
'2000-01-01' "
+ + "SELECT * FROM foo PARTITIONED BY MONTH")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Intervals for replace are empty"
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForWithInvalidInterval()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-INVALID0:00' AND __time <= TIMESTAMP '2000-02-05 00:00:00' SELECT *
FROM foo PARTITIONED BY ALL TIME")
+ .expectValidationError(SqlPlanningException.class)
+ .verify();
+ }
+
+ @Test
+ public void testReplaceForWithoutPartitionSpec()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
+ .expectValidationError(SqlPlanningException.class)
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromView()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM view.aview
PARTITIONED BY ALL TIME")
+ .expectTarget("dst", RowSignature.builder().add("dim1_firstchar",
ColumnType.STRING).build())
+ .expectResources(viewRead("aview"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(expressionVirtualColumn("v0",
"substring(\"dim1\", 0, 1)", ColumnType.STRING))
+ .filters(selector("dim2", "a", null))
+ .columns("v0")
+ .context(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceIntoQualifiedTable()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO druid.dst OVERWRITE ALL SELECT * FROM foo
PARTITIONED BY ALL TIME")
+ .expectTarget("dst", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceIntoInvalidDataSourceName()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO \"in/valid\" OVERWRITE ALL SELECT dim1, dim2 FROM
foo PARTITIONED BY ALL TIME")
+ .expectValidationError(SqlPlanningException.class, "Ingestion
dataSource cannot contain the '/' character.")
+ .verify();
+ }
+
+ @Test
+ public void testReplaceUsingColumnList()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM
foo PARTITIONED BY ALL TIME")
+ .expectValidationError(SqlPlanningException.class, "Ingestion with
target column list is not supported.")
+ .verify();
+ }
+
+ @Test
+ public void testReplaceIntoSystemTable()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT *
FROM foo PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Cannot ingest into [INFORMATION_SCHEMA.COLUMNS] because it is not
a Druid datasource."
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceIntoView()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo
PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Cannot ingest into [view.aview] because it is not a Druid
datasource."
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromUnauthorizedDataSource()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM \"%s\" PARTITIONED
BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
+ .expectValidationError(ForbiddenException.class)
+ .verify();
+ }
+
+ @Test
+ public void testReplaceIntoUnauthorizedDataSource()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO \"%s\" OVERWRITE ALL SELECT * FROM foo PARTITIONED
BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
+ .expectValidationError(ForbiddenException.class)
+ .verify();
+ }
+
+ @Test
+ public void testReplaceIntoNonexistentSchema()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo
PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ SqlPlanningException.class,
+ "Cannot ingest into [nonexistent.dst] because it is not a Druid
datasource."
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromExternal()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY
ALL TIME", externSql(externalDataSource))
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("dst", externalDataSource.getSignature())
+ .expectResources(dataSourceWrite("dst"),
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("x", "y", "z")
+ .context(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceWithPartitionedByAndLimitOffset()
+ {
+ RowSignature targetRowSignature = RowSignature.builder()
+ .add("__time",
ColumnType.LONG)
+ .add("floor_m1",
ColumnType.FLOAT)
+ .add("dim1",
ColumnType.STRING)
+ .build();
+
+ testIngestionQuery()
+ .sql(
+ "REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, FLOOR(m1) as
floor_m1, dim1 FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
+ .expectTarget("dst", targetRowSignature)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "dim1", "v0")
+ .virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")",
ColumnType.FLOAT))
+ .limit(10)
+ .offset(20)
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.DAY),
+ "all"
+ )
+ )
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceWithPartitionedByContainingInvalidGranularity()
throws Exception
+ {
+ // Throws a ValidationException, which gets converted to a
SqlPlanningException before throwing to end user
+ try {
+ testQuery(
+ "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY
'invalid_granularity'",
+ ImmutableList.of(),
+ ImmutableList.of()
+ );
+ Assert.fail("Exception should be thrown");
+ }
+ catch (SqlPlanningException e) {
+ assertEquals(
+ "Encountered 'invalid_granularity' after PARTITIONED BY. Expected
HOUR, DAY, MONTH, YEAR, ALL TIME, FLOOR function or TIME_FLOOR function",
+ e.getMessage()
+ );
+ }
+ didTest = true;
+ }
+
+ @Test
+ public void testExplainReplaceFromExternal() throws Exception
+ {
+ // Skip vectorization since otherwise the "context" will change for each
subtest.
+ skipVectorize();
+
+ final ScanQuery expectedQuery = newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("x", "y", "z")
+ .context(
+ queryJsonMapper.readValue(
+
"{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ )
+ )
+ .build();
+
+ final String expectedExplanation =
+ "DruidQueryRel(query=["
+ + queryJsonMapper.writeValueAsString(expectedQuery)
+ + "], signature=[{x:STRING, y:STRING, z:LONG}])\n";
+
+ // Use testQuery for EXPLAIN (not testIngestionQuery).
+ testQuery(
+ new PlannerConfig(),
+ StringUtils.format(
+ "EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s
PARTITIONED BY ALL TIME",
+ externSql(externalDataSource)
+ ),
+ CalciteTests.SUPER_USER_AUTH_RESULT,
+ ImmutableList.of(),
+ ImmutableList.of(
+ new Object[]{
+ expectedExplanation,
+
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"
+ }
+ )
+ );
+
+ // Not using testIngestionQuery, so must set didTest manually to satisfy
the check in tearDown.
+ didTest = true;
+ }
+
+ @Test
+ public void testExplainReplaceFromExternalUnauthorized()
+ {
+ // Use testQuery for EXPLAIN (not testIngestionQuery).
+ Assert.assertThrows(
+ ForbiddenException.class,
+ () ->
+ testQuery(
+ StringUtils.format(
+ "EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT *
FROM %s PARTITIONED BY ALL TIME",
+ externSql(externalDataSource)
+ ),
+ ImmutableList.of(),
+ ImmutableList.of()
+ )
+ );
+
+ // Not using testIngestionQuery, so must set didTest manually to satisfy
the check in tearDown.
+ didTest = true;
+ }
+
+ @Test
+ public void testReplaceFromExternalUnauthorized()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY
ALL TIME", externSql(externalDataSource))
+ .expectValidationError(ForbiddenException.class)
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromExternalProjectSort()
+ {
+ testIngestionQuery()
+ .sql(
+ "REPLACE INTO dst OVERWRITE ALL SELECT x || y AS xy, z FROM %s
PARTITIONED BY ALL TIME",
+ externSql(externalDataSource)
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("dst", RowSignature.builder().add("xy",
ColumnType.STRING).add("z", ColumnType.LONG).build())
+ .expectResources(dataSourceWrite("dst"),
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(expressionVirtualColumn("v0",
"concat(\"x\",\"y\")", ColumnType.STRING))
+ .columns("v0", "z")
+ .context(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromExternalAggregate()
+ {
+ testIngestionQuery()
+ .sql(
+ "REPLACE INTO dst OVERWRITE ALL SELECT x, SUM(z) AS sum_z,
COUNT(*) AS cnt FROM %s GROUP BY 1 PARTITIONED BY ALL TIME",
+ externSql(externalDataSource)
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget(
+ "dst",
+ RowSignature.builder()
+ .add("x", ColumnType.STRING)
+ .add("sum_z", ColumnType.LONG)
+ .add("cnt", ColumnType.LONG)
+ .build()
+ )
+ .expectResources(dataSourceWrite("dst"),
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ GroupByQuery.builder()
+ .setDataSource(externalDataSource)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("x", "d0")))
+ .setAggregatorSpecs(
+ new LongSumAggregatorFactory("a0", "z"),
+ new CountAggregatorFactory("a1")
+ )
+ .setContext(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
+ @Test
+ public void testReplaceFromExternalAggregateAll()
+ {
+ testIngestionQuery()
+ .sql(
+ "REPLACE INTO dst OVERWRITE ALL SELECT COUNT(*) AS cnt FROM %s
PARTITIONED BY ALL TIME",
+ externSql(externalDataSource)
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget(
+ "dst",
+ RowSignature.builder()
+ .add("cnt", ColumnType.LONG)
+ .build()
+ )
+ .expectResources(dataSourceWrite("dst"),
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ GroupByQuery.builder()
+ .setDataSource(externalDataSource)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(new CountAggregatorFactory("a0"))
+ .setContext(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
new file mode 100644
index 0000000000..96a992f0dd
--- /dev/null
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+import org.junit.Test;
+
+import java.io.StringReader;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A class containing unit tests for testing implmentations of {@link
org.apache.calcite.sql.SqlNode#unparse(SqlWriter, int, int)}
+ * in custom Druid SqlNode classes, like {@link DruidSqlInsert} and {@link
DruidSqlReplace}.
+ */
+public class DruidSqlUnparseTest
+{
+ @Test
+ public void testUnparseInsert() throws ParseException
+ {
+ String sqlQuery = "INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL
TIME";
+ String prettySqlQuery = "INSERT INTO \"dst\"\n"
+ + "(SELECT *\n"
+ + " FROM \"foo\") PARTITIONED BY ALL TIME";
+
+ DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
+ DruidSqlInsert druidSqlReplace = (DruidSqlInsert)
druidSqlParser.DruidSqlInsertEof();
+
+ druidSqlReplace.unparse(sqlWriter, 0, 0);
+ assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
+ }
+
+ @Test
+ public void testUnparseReplaceAll() throws ParseException
+ {
+ String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo
PARTITIONED BY ALL TIME";
+ String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ + "OVERWRITE ALL\n"
+ + "(SELECT *\n"
+ + " FROM \"foo\")\n"
+ + "PARTITIONED BY ALL TIME";
+
+ DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
+ DruidSqlReplace druidSqlReplace = (DruidSqlReplace)
druidSqlParser.DruidSqlReplaceEof();
+
+ druidSqlReplace.unparse(sqlWriter, 0, 0);
+ assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
+ }
+
+ @Test
+ public void testUnparseReplaceWhere() throws ParseException
+ {
+ String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT *
FROM foo PARTITIONED BY DAY";
+ String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ + "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01
00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n"
+ + "(SELECT *\n"
+ + " FROM \"foo\")\n"
+ + "PARTITIONED BY DAY";
+ DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
+ DruidSqlReplace druidSqlReplace = (DruidSqlReplace)
druidSqlParser.DruidSqlReplaceEof();
+
+ druidSqlReplace.unparse(sqlWriter, 0, 0);
+ assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
+ }
+
+ private final SqlWriter sqlWriter = new
SqlPrettyWriter(CalciteSqlDialect.DEFAULT);
+
+ private static DruidSqlParserImpl createTestParser(String parseString)
+ {
+ DruidSqlParserImplFactory druidSqlParserImplFactory = new
DruidSqlParserImplFactory();
+ DruidSqlParserImpl druidSqlParser = (DruidSqlParserImpl)
druidSqlParserImplFactory.getParser(new StringReader(parseString));
+ druidSqlParser.setUnquotedCasing(Casing.TO_LOWER);
+ druidSqlParser.setQuotedCasing(Casing.TO_LOWER);
+ druidSqlParser.setIdentifierMaxLength(20);
+ return druidSqlParser;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]