This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 58674d5 [SPARK-27181][SQL] Add public transform API
58674d5 is described below
commit 58674d54baedc048ae9bf237f268dce5f3d06d77
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Apr 10 14:30:39 2019 +0800
[SPARK-27181][SQL] Add public transform API
## What changes were proposed in this pull request?
This adds a public Expression API that can be used to pass partition
transformations to data sources.
## How was this patch tested?
Existing tests to validate no regressions. Added transform cases to DDL
suite and v1 conversions suite.
Closes #24117 from rdblue/add-public-transform-api.
Authored-by: Ryan Blue <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 17 +-
.../sql/catalog/v2/expressions/Expression.java | 31 ++++
.../sql/catalog/v2/expressions/Expressions.java | 162 ++++++++++++++++
.../spark/sql/catalog/v2/expressions/Literal.java | 42 +++++
.../sql/catalog/v2/expressions/NamedReference.java | 33 ++++
.../sql/catalog/v2/expressions/Transform.java | 44 +++++
.../sql/catalog/v2/expressions/expressions.scala | 203 +++++++++++++++++++++
.../spark/sql/catalyst/parser/AstBuilder.scala | 101 +++++++++-
.../plans/logical/sql/CreateTableStatement.scala | 5 +-
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 52 +++++-
.../datasources/DataSourceResolution.scala | 7 +-
.../execution/command/PlanResolutionSuite.scala | 20 ++
12 files changed, 705 insertions(+), 12 deletions(-)
diff --git
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 0f9387b..d261b56 100644
---
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -92,7 +92,7 @@ statement
| SHOW DATABASES (LIKE? pattern=STRING)?
#showDatabases
| createTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
- (PARTITIONED BY partitionColumnNames=identifierList) |
+ (PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
@@ -587,6 +587,21 @@ namedExpressionSeq
: namedExpression (',' namedExpression)*
;
+transformList
+ : '(' transforms+=transform (',' transforms+=transform)* ')'
+ ;
+
+transform
+ : qualifiedName
#identityTransform
+ | transformName=identifier
+ '(' argument+=transformArgument (',' argument+=transformArgument)* ')'
#applyTransform
+ ;
+
+transformArgument
+ : qualifiedName
+ | constant
+ ;
+
expression
: booleanExpression
;
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java
new file mode 100644
index 0000000..1e2aca9
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Base class of the public logical expression API.
+ */
+@Experimental
+public interface Expression {
+ /**
+ * Format the expression as a human readable SQL-like string.
+ */
+ String describe();
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
new file mode 100644
index 0000000..009e89b
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.JavaConverters;
+
+import java.util.Arrays;
+
+/**
+ * Helper methods to create logical transforms to pass into Spark.
+ */
+@Experimental
+public class Expressions {
+ private Expressions() {
+ }
+
+ /**
+ * Create a logical transform for applying a named transform.
+ * <p>
+ * This transform can represent applying any named transform.
+ *
+ * @param name the transform name
+ * @param args expression arguments to the transform
+ * @return a logical transform
+ */
+ public Transform apply(String name, Expression... args) {
+ return LogicalExpressions.apply(name,
+ JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq());
+ }
+
+ /**
+ * Create a named reference expression for a column.
+ *
+ * @param name a column name
+ * @return a named reference for the column
+ */
+ public NamedReference column(String name) {
+ return LogicalExpressions.reference(name);
+ }
+
+ /**
+ * Create a literal from a value.
+ * <p>
+ * The JVM type of the value held by a literal must be the type used by
Spark's InternalRow API
+ * for the literal's {@link DataType SQL data type}.
+ *
+ * @param value a value
+ * @param <T> the JVM type of the value
+ * @return a literal expression for the value
+ */
+ public <T> Literal<T> literal(T value) {
+ return LogicalExpressions.literal(value);
+ }
+
+ /**
+ * Create a bucket transform for one or more columns.
+ * <p>
+ * This transform represents a logical mapping from a value to a bucket id
in [0, numBuckets)
+ * based on a hash of the value.
+ * <p>
+ * The name reported by transforms created with this method is "bucket".
+ *
+ * @param numBuckets the number of output buckets
+ * @param columns input columns for the bucket transform
+ * @return a logical bucket transform with name "bucket"
+ */
+ public Transform bucket(int numBuckets, String... columns) {
+ return LogicalExpressions.bucket(numBuckets,
+ JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq());
+ }
+
+ /**
+ * Create an identity transform for a column.
+ * <p>
+ * This transform represents a logical mapping from a value to itself.
+ * <p>
+ * The name reported by transforms created with this method is "identity".
+ *
+ * @param column an input column
+ * @return a logical identity transform with name "identity"
+ */
+ public Transform identity(String column) {
+ return LogicalExpressions.identity(column);
+ }
+
+ /**
+ * Create a yearly transform for a timestamp or date column.
+ * <p>
+ * This transform represents a logical mapping from a timestamp or date to a
year, such as 2018.
+ * <p>
+ * The name reported by transforms created with this method is "years".
+ *
+ * @param column an input timestamp or date column
+ * @return a logical yearly transform with name "years"
+ */
+ public Transform years(String column) {
+ return LogicalExpressions.years(column);
+ }
+
+ /**
+ * Create a monthly transform for a timestamp or date column.
+ * <p>
+ * This transform represents a logical mapping from a timestamp or date to a
month, such as
+ * 2018-05.
+ * <p>
+ * The name reported by transforms created with this method is "months".
+ *
+ * @param column an input timestamp or date column
+ * @return a logical monthly transform with name "months"
+ */
+ public Transform months(String column) {
+ return LogicalExpressions.months(column);
+ }
+
+ /**
+ * Create a daily transform for a timestamp or date column.
+ * <p>
+ * This transform represents a logical mapping from a timestamp or date to a
date, such as
+ * 2018-05-13.
+ * <p>
+ * The name reported by transforms created with this method is "days".
+ *
+ * @param column an input timestamp or date column
+ * @return a logical daily transform with name "days"
+ */
+ public Transform days(String column) {
+ return LogicalExpressions.days(column);
+ }
+
+ /**
+ * Create an hourly transform for a timestamp column.
+ * <p>
+ * This transform represents a logical mapping from a timestamp to a date
and hour, such as
+ * 2018-05-13, hour 19.
+ * <p>
+ * The name reported by transforms created with this method is "hours".
+ *
+ * @param column an input timestamp column
+ * @return a logical hourly transform with name "hours"
+ */
+ public Transform hours(String column) {
+ return LogicalExpressions.hours(column);
+ }
+
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Literal.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Literal.java
new file mode 100644
index 0000000..e41bcf9
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Literal.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Represents a constant literal value in the public expression API.
+ * <p>
+ * The JVM type of the value held by a literal must be the type used by
Spark's InternalRow API for
+ * the literal's {@link DataType SQL data type}.
+ *
+ * @param <T> the JVM type of a value held by the literal
+ */
+@Experimental
+public interface Literal<T> extends Expression {
+ /**
+ * Returns the literal value.
+ */
+ T value();
+
+ /**
+ * Returns the SQL data type of the literal.
+ */
+ DataType dataType();
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/NamedReference.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/NamedReference.java
new file mode 100644
index 0000000..c71ffbe
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/NamedReference.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Represents a field or column reference in the public logical expression API.
+ */
+@Experimental
+public interface NamedReference extends Expression {
+ /**
+ * Returns the referenced field name as an array of String parts.
+ * <p>
+ * Each string in the returned array represents a field name.
+ */
+ String[] fieldNames();
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Transform.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Transform.java
new file mode 100644
index 0000000..c85e0c4
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Transform.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Represents a transform function in the public logical expression API.
+ * <p>
+ * For example, the transform date(ts) is used to derive a date value from a
timestamp column. The
+ * transform name is "date" and its argument is a reference to the "ts" column.
+ */
+@Experimental
+public interface Transform extends Expression {
+ /**
+ * Returns the transform function name.
+ */
+ String name();
+
+ /**
+ * Returns all field references in the transform arguments.
+ */
+ NamedReference[] references();
+
+ /**
+ * Returns the arguments passed to the transform function.
+ */
+ Expression[] arguments();
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
new file mode 100644
index 0000000..813d882
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
+
+/**
+ * Helper methods for working with the logical expressions API.
+ *
+ * Factory methods can be used when referencing the logical expression nodes
is ambiguous because
+ * logical and internal expressions are used.
+ */
+private[sql] object LogicalExpressions {
+ // a generic parser that is only used for parsing multi-part field names.
+ // because this is only used for field names, the SQL conf passed in does
not matter.
+ private lazy val parser = new CatalystSqlParser(SQLConf.get)
+
+ def fromPartitionColumns(columns: String*): Array[IdentityTransform] =
+ columns.map(identity).toArray
+
+ def fromBucketSpec(spec: BucketSpec): BucketTransform = {
+ if (spec.sortColumnNames.nonEmpty) {
+ throw new AnalysisException(
+ s"Cannot convert bucketing with sort columns to a transform: $spec")
+ }
+
+ bucket(spec.numBuckets, spec.bucketColumnNames: _*)
+ }
+
+ implicit class TransformHelper(transforms: Seq[Transform]) {
+ def asPartitionColumns: Seq[String] = {
+ val (idTransforms, nonIdTransforms) =
transforms.partition(_.isInstanceOf[IdentityTransform])
+
+ if (nonIdTransforms.nonEmpty) {
+ throw new AnalysisException("Transforms cannot be converted to
partition columns: " +
+ nonIdTransforms.map(_.describe).mkString(", "))
+ }
+
+ idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map
{ ref =>
+ val parts = ref.fieldNames
+ if (parts.size > 1) {
+ throw new AnalysisException(s"Cannot partition by nested column:
$ref")
+ } else {
+ parts(0)
+ }
+ }
+ }
+ }
+
+ def literal[T](value: T): LiteralValue[T] = {
+ val internalLit = catalyst.expressions.Literal(value)
+ literal(value, internalLit.dataType)
+ }
+
+ def literal[T](value: T, dataType: DataType): LiteralValue[T] =
LiteralValue(value, dataType)
+
+ def reference(name: String): NamedReference =
+ FieldReference(parser.parseMultipartIdentifier(name))
+
+ def apply(name: String, arguments: Expression*): Transform =
ApplyTransform(name, arguments)
+
+ def bucket(numBuckets: Int, columns: String*): BucketTransform =
+ BucketTransform(literal(numBuckets, IntegerType), columns.map(reference))
+
+ def identity(column: String): IdentityTransform =
IdentityTransform(reference(column))
+
+ def years(column: String): YearsTransform = YearsTransform(reference(column))
+
+ def months(column: String): MonthsTransform =
MonthsTransform(reference(column))
+
+ def days(column: String): DaysTransform = DaysTransform(reference(column))
+
+ def hours(column: String): HoursTransform = HoursTransform(reference(column))
+}
+
+/**
+ * Base class for simple transforms of a single column.
+ */
+private[sql] abstract class SingleColumnTransform(ref: NamedReference) extends
Transform {
+
+ def reference: NamedReference = ref
+
+ override def references: Array[NamedReference] = Array(ref)
+
+ override def arguments: Array[Expression] = Array(ref)
+
+ override def describe: String = name + "(" + reference.describe + ")"
+
+ override def toString: String = describe
+}
+
+private[sql] final case class BucketTransform(
+ numBuckets: Literal[Int],
+ columns: Seq[NamedReference]) extends Transform {
+
+ override val name: String = "bucket"
+
+ override def references: Array[NamedReference] = {
+ arguments
+ .filter(_.isInstanceOf[NamedReference])
+ .map(_.asInstanceOf[NamedReference])
+ }
+
+ override def arguments: Array[Expression] = numBuckets +: columns.toArray
+
+ override def describe: String =
s"bucket(${arguments.map(_.describe).mkString(", ")})"
+
+ override def toString: String = describe
+}
+
+private[sql] final case class ApplyTransform(
+ name: String,
+ args: Seq[Expression]) extends Transform {
+
+ override def arguments: Array[Expression] = args.toArray
+
+ override def references: Array[NamedReference] = {
+ arguments
+ .filter(_.isInstanceOf[NamedReference])
+ .map(_.asInstanceOf[NamedReference])
+ }
+
+ override def describe: String =
s"$name(${arguments.map(_.describe).mkString(", ")})"
+
+ override def toString: String = describe
+}
+
+private[sql] final case class IdentityTransform(
+ ref: NamedReference) extends SingleColumnTransform(ref) {
+ override val name: String = "identity"
+ override def describe: String = ref.describe
+}
+
+private[sql] final case class YearsTransform(
+ ref: NamedReference) extends SingleColumnTransform(ref) {
+ override val name: String = "years"
+}
+
+private[sql] final case class MonthsTransform(
+ ref: NamedReference) extends SingleColumnTransform(ref) {
+ override val name: String = "months"
+}
+
+private[sql] final case class DaysTransform(
+ ref: NamedReference) extends SingleColumnTransform(ref) {
+ override val name: String = "days"
+}
+
+private[sql] final case class HoursTransform(
+ ref: NamedReference) extends SingleColumnTransform(ref) {
+ override val name: String = "hours"
+}
+
+private[sql] final case class LiteralValue[T](value: T, dataType: DataType)
extends Literal[T] {
+ override def describe: String = {
+ if (dataType.isInstanceOf[StringType]) {
+ s"'$value'"
+ } else {
+ s"$value"
+ }
+ }
+ override def toString: String = describe
+}
+
+private[sql] final case class FieldReference(parts: Seq[String]) extends
NamedReference {
+ override def fieldNames: Array[String] = parts.toArray
+ override def describe: String = parts.map(quote).mkString(".")
+ override def toString: String = describe
+
+ private def quote(part: String): String = {
+ if (part.contains(".") || part.contains("`")) {
+ s"`${part.replace("`", "``")}`"
+ } else {
+ part
+ }
+ }
+}
+
+private[sql] object FieldReference {
+ def apply(column: String): NamedReference = {
+ LogicalExpressions.reference(column)
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 111815b..6bb991c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -28,6 +28,8 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode,
TerminalNode}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2
+import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform,
BucketTransform, DaysTransform, FieldReference, HoursTransform,
IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
@@ -2043,6 +2045,95 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
+ * Parse a list of transforms.
+ */
+ override def visitTransformList(ctx: TransformListContext): Seq[Transform] =
withOrigin(ctx) {
+ def getFieldReference(
+ ctx: ApplyTransformContext,
+ arg: v2.expressions.Expression): FieldReference = {
+ lazy val name: String = ctx.identifier.getText
+ arg match {
+ case ref: FieldReference =>
+ ref
+ case nonRef =>
+ throw new ParseException(
+ s"Expected a column reference for transform $name:
${nonRef.describe}", ctx)
+ }
+ }
+
+ def getSingleFieldReference(
+ ctx: ApplyTransformContext,
+ arguments: Seq[v2.expressions.Expression]): FieldReference = {
+ lazy val name: String = ctx.identifier.getText
+ if (arguments.size > 1) {
+ throw new ParseException(s"Too many arguments for transform $name",
ctx)
+ } else if (arguments.isEmpty) {
+ throw new ParseException(s"Not enough arguments for transform $name",
ctx)
+ } else {
+ getFieldReference(ctx, arguments.head)
+ }
+ }
+
+ ctx.transforms.asScala.map {
+ case identityCtx: IdentityTransformContext =>
+ IdentityTransform(FieldReference(
+ identityCtx.qualifiedName.identifier.asScala.map(_.getText)))
+
+ case applyCtx: ApplyTransformContext =>
+ val arguments = applyCtx.argument.asScala.map(visitTransformArgument)
+
+ applyCtx.identifier.getText match {
+ case "bucket" =>
+ val numBuckets: Int = arguments.head match {
+ case LiteralValue(shortValue, ShortType) =>
+ shortValue.asInstanceOf[Short].toInt
+ case LiteralValue(intValue, IntegerType) =>
+ intValue.asInstanceOf[Int]
+ case LiteralValue(longValue, LongType) =>
+ longValue.asInstanceOf[Long].toInt
+ case lit =>
+ throw new ParseException(s"Invalid number of buckets:
${lit.describe}", applyCtx)
+ }
+
+ val fields = arguments.tail.map(arg => getFieldReference(applyCtx,
arg))
+
+ BucketTransform(LiteralValue(numBuckets, IntegerType), fields)
+
+ case "years" =>
+ YearsTransform(getSingleFieldReference(applyCtx, arguments))
+
+ case "months" =>
+ MonthsTransform(getSingleFieldReference(applyCtx, arguments))
+
+ case "days" =>
+ DaysTransform(getSingleFieldReference(applyCtx, arguments))
+
+ case "hours" =>
+ HoursTransform(getSingleFieldReference(applyCtx, arguments))
+
+ case name =>
+ ApplyTransform(name, arguments)
+ }
+ }
+ }
+
+ /**
+ * Parse an argument to a transform. An argument may be a field reference
(qualified name) or
+ * a value literal.
+ */
+ override def visitTransformArgument(ctx: TransformArgumentContext):
v2.expressions.Expression = {
+ withOrigin(ctx) {
+ val reference = Option(ctx.qualifiedName)
+ .map(nameCtx =>
FieldReference(nameCtx.identifier.asScala.map(_.getText)))
+ val literal = Option(ctx.constant)
+ .map(typedVisit[Literal])
+ .map(lit => LiteralValue(lit.value, lit.dataType))
+ reference.orElse(literal)
+ .getOrElse(throw new ParseException(s"Invalid transform argument",
ctx))
+ }
+ }
+
+ /**
* Create a table, returning a [[CreateTableStatement]] logical plan.
*
* Expected format:
@@ -2054,7 +2145,7 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
*
* create_table_clauses (order insensitive):
* [OPTIONS table_property_list]
- * [PARTITIONED BY (col_name, col_name, ...)]
+ * [PARTITIONED BY (col_name, transform(col_name), transform(constant,
col_name), ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
@@ -2078,8 +2169,8 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
val schema = Option(ctx.colTypeList()).map(createSchema)
- val partitionCols: Seq[String] =
- Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil)
+ val partitioning: Seq[Transform] =
+ Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties =
Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options =
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
@@ -2099,7 +2190,7 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
case Some(query) =>
CreateTableAsSelectStatement(
- table, query, partitionCols, bucketSpec, properties, provider,
options, location, comment,
+ table, query, partitioning, bucketSpec, properties, provider,
options, location, comment,
ifNotExists = ifNotExists)
case None if temp =>
@@ -2108,7 +2199,7 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
case _ =>
- CreateTableStatement(table, schema.getOrElse(new StructType),
partitionCols, bucketSpec,
+ CreateTableStatement(table, schema.getOrElse(new StructType),
partitioning, bucketSpec,
properties, provider, options, location, comment, ifNotExists =
ifNotExists)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
index c734968..ed1b3e3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical.sql
+import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -31,7 +32,7 @@ import org.apache.spark.sql.types.StructType
case class CreateTableStatement(
table: TableIdentifier,
tableSchema: StructType,
- partitioning: Seq[String],
+ partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
@@ -51,7 +52,7 @@ case class CreateTableStatement(
case class CreateTableAsSelectStatement(
table: TableIdentifier,
asSelect: LogicalPlan,
- partitioning: Seq[String],
+ partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index dae8f58..98388a7 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.catalyst.parser
+import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform,
BucketTransform, DaysTransform, FieldReference, HoursTransform,
IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import
org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement,
CreateTableStatement}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType,
TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
class DDLParserSuite extends AnalysisTest {
import CatalystSqlParser._
@@ -92,7 +94,7 @@ class DDLParserSuite extends AnalysisTest {
assert(create.tableSchema == new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType))
- assert(create.partitioning == Seq("a"))
+ assert(create.partitioning ==
Seq(IdentityTransform(FieldReference("a"))))
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
@@ -107,6 +109,52 @@ class DDLParserSuite extends AnalysisTest {
}
}
+ test("create table - partitioned by transforms") {
+ val sql =
+ """
+ |CREATE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet
+ |PARTITIONED BY (
+ | a,
+ | bucket(16, b),
+ | years(ts),
+ | months(ts),
+ | days(ts),
+ | hours(ts),
+ | foo(a, "bar", 34))
+ """.stripMargin
+
+ parsePlan(sql) match {
+ case create: CreateTableStatement =>
+ assert(create.table == TableIdentifier("my_tab"))
+ assert(create.tableSchema == new StructType()
+ .add("a", IntegerType)
+ .add("b", StringType)
+ .add("ts", TimestampType))
+ assert(create.partitioning == Seq(
+ IdentityTransform(FieldReference("a")),
+ BucketTransform(LiteralValue(16, IntegerType),
Seq(FieldReference("b"))),
+ YearsTransform(FieldReference("ts")),
+ MonthsTransform(FieldReference("ts")),
+ DaysTransform(FieldReference("ts")),
+ HoursTransform(FieldReference("ts")),
+ ApplyTransform("foo", Seq(
+ FieldReference("a"),
+ LiteralValue(UTF8String.fromString("bar"), StringType),
+ LiteralValue(34, IntegerType)))))
+ assert(create.bucketSpec.isEmpty)
+ assert(create.properties.isEmpty)
+ assert(create.provider == "parquet")
+ assert(create.options.isEmpty)
+ assert(create.location.isEmpty)
+ assert(create.comment.isEmpty)
+ assert(!create.ifNotExists)
+
+ case other =>
+ fail(s"Expected to parse
${classOf[CreateTableStatement].getClass.getName} from query," +
+ s"got ${other.getClass.getName}: $sql")
+ }
+ }
+
test("create table - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index 9fd44ea..f503ff0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.util.Locale
import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
CatalogTableType, CatalogUtils}
@@ -31,6 +32,8 @@ import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.StructType
case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with
CastSupport {
+ import
org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions.TransformHelper
+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
table, schema, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options,
@@ -75,7 +78,7 @@ case class DataSourceResolution(conf: SQLConf) extends
Rule[LogicalPlan] with Ca
private def buildCatalogTable(
table: TableIdentifier,
schema: StructType,
- partitionColumnNames: Seq[String],
+ partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
@@ -104,7 +107,7 @@ case class DataSourceResolution(conf: SQLConf) extends
Rule[LogicalPlan] with Ca
storage = storage.copy(locationUri = customLocation),
schema = schema,
provider = Some(provider),
- partitionColumnNames = partitionColumnNames,
+ partitionColumnNames = partitioning.asPartitionColumns,
bucketSpec = bucketSpec,
properties = properties,
comment = comment)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 89c5df0..7fae54b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -65,6 +65,26 @@ class PlanResolutionSuite extends AnalysisTest {
}
}
+ test("create table - partitioned by transforms") {
+ val transforms = Seq(
+ "bucket(16, b)", "years(ts)", "months(ts)", "days(ts)", "hours(ts)",
"foo(a, 'bar', 34)",
+ "bucket(32, b), days(ts)")
+ transforms.foreach { transform =>
+ val query =
+ s"""
+ |CREATE TABLE my_tab(a INT, b STRING) USING parquet
+ |PARTITIONED BY ($transform)
+ """.stripMargin
+
+ val ae = intercept[AnalysisException] {
+ parseAndResolve(query)
+ }
+
+ assert(ae.message
+ .contains(s"Transforms cannot be converted to partition columns:
$transform"))
+ }
+ }
+
test("create table - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]