This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 58674d5  [SPARK-27181][SQL] Add public transform API
58674d5 is described below

commit 58674d54baedc048ae9bf237f268dce5f3d06d77
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Apr 10 14:30:39 2019 +0800

    [SPARK-27181][SQL] Add public transform API
    
    ## What changes were proposed in this pull request?
    
    This adds a public Expression API that can be used to pass partition 
transformations to data sources.
    
    ## How was this patch tested?
    
    Existing tests to validate no regressions. Added transform cases to DDL 
suite and v1 conversions suite.
    
    Closes #24117 from rdblue/add-public-transform-api.
    
    Authored-by: Ryan Blue <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/catalyst/parser/SqlBase.g4    |  17 +-
 .../sql/catalog/v2/expressions/Expression.java     |  31 ++++
 .../sql/catalog/v2/expressions/Expressions.java    | 162 ++++++++++++++++
 .../spark/sql/catalog/v2/expressions/Literal.java  |  42 +++++
 .../sql/catalog/v2/expressions/NamedReference.java |  33 ++++
 .../sql/catalog/v2/expressions/Transform.java      |  44 +++++
 .../sql/catalog/v2/expressions/expressions.scala   | 203 +++++++++++++++++++++
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 101 +++++++++-
 .../plans/logical/sql/CreateTableStatement.scala   |   5 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  52 +++++-
 .../datasources/DataSourceResolution.scala         |   7 +-
 .../execution/command/PlanResolutionSuite.scala    |  20 ++
 12 files changed, 705 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 0f9387b..d261b56 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -92,7 +92,7 @@ statement
     | SHOW DATABASES (LIKE? pattern=STRING)?                           
#showDatabases
     | createTableHeader ('(' colTypeList ')')? tableProvider
         ((OPTIONS options=tablePropertyList) |
-        (PARTITIONED BY partitionColumnNames=identifierList) |
+        (PARTITIONED BY partitioning=transformList) |
         bucketSpec |
         locationSpec |
         (COMMENT comment=STRING) |
@@ -587,6 +587,21 @@ namedExpressionSeq
     : namedExpression (',' namedExpression)*
     ;
 
+transformList
+    : '(' transforms+=transform (',' transforms+=transform)* ')'
+    ;
+
+transform
+    : qualifiedName                                                           
#identityTransform
+    | transformName=identifier
+      '(' argument+=transformArgument (',' argument+=transformArgument)* ')'  
#applyTransform
+    ;
+
+transformArgument
+    : qualifiedName
+    | constant
+    ;
+
 expression
     : booleanExpression
     ;
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java
new file mode 100644
index 0000000..1e2aca9
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Base class of the public logical expression API.
+ */
+@Experimental
+public interface Expression {
+  /**
+   * Format the expression as a human readable SQL-like string.
+   */
+  String describe();
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
new file mode 100644
index 0000000..009e89b
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
+import scala.collection.JavaConverters;
+
+import java.util.Arrays;
+
+/**
+ * Helper methods to create logical transforms to pass into Spark.
+ */
+@Experimental
+public class Expressions {
+  private Expressions() {
+  }
+
+  /**
+   * Create a logical transform for applying a named transform.
+   * <p>
+   * This transform can represent applying any named transform.
+   *
+   * @param name the transform name
+   * @param args expression arguments to the transform
+   * @return a logical transform
+   */
+  public Transform apply(String name, Expression... args) {
+    return LogicalExpressions.apply(name,
+        JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq());
+  }
+
+  /**
+   * Create a named reference expression for a column.
+   *
+   * @param name a column name
+   * @return a named reference for the column
+   */
+  public NamedReference column(String name) {
+    return LogicalExpressions.reference(name);
+  }
+
+  /**
+   * Create a literal from a value.
+   * <p>
+   * The JVM type of the value held by a literal must be the type used by 
Spark's InternalRow API
+   * for the literal's {@link DataType SQL data type}.
+   *
+   * @param value a value
+   * @param <T> the JVM type of the value
+   * @return a literal expression for the value
+   */
+  public <T> Literal<T> literal(T value) {
+    return LogicalExpressions.literal(value);
+  }
+
+  /**
+   * Create a bucket transform for one or more columns.
+   * <p>
+   * This transform represents a logical mapping from a value to a bucket id 
in [0, numBuckets)
+   * based on a hash of the value.
+   * <p>
+   * The name reported by transforms created with this method is "bucket".
+   *
+   * @param numBuckets the number of output buckets
+   * @param columns input columns for the bucket transform
+   * @return a logical bucket transform with name "bucket"
+   */
+  public Transform bucket(int numBuckets, String... columns) {
+    return LogicalExpressions.bucket(numBuckets,
+        JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq());
+  }
+
+  /**
+   * Create an identity transform for a column.
+   * <p>
+   * This transform represents a logical mapping from a value to itself.
+   * <p>
+   * The name reported by transforms created with this method is "identity".
+   *
+   * @param column an input column
+   * @return a logical identity transform with name "identity"
+   */
+  public Transform identity(String column) {
+    return LogicalExpressions.identity(column);
+  }
+
+  /**
+   * Create a yearly transform for a timestamp or date column.
+   * <p>
+   * This transform represents a logical mapping from a timestamp or date to a 
year, such as 2018.
+   * <p>
+   * The name reported by transforms created with this method is "years".
+   *
+   * @param column an input timestamp or date column
+   * @return a logical yearly transform with name "years"
+   */
+  public Transform years(String column) {
+    return LogicalExpressions.years(column);
+  }
+
+  /**
+   * Create a monthly transform for a timestamp or date column.
+   * <p>
+   * This transform represents a logical mapping from a timestamp or date to a 
month, such as
+   * 2018-05.
+   * <p>
+   * The name reported by transforms created with this method is "months".
+   *
+   * @param column an input timestamp or date column
+   * @return a logical monthly transform with name "months"
+   */
+  public Transform months(String column) {
+    return LogicalExpressions.months(column);
+  }
+
+  /**
+   * Create a daily transform for a timestamp or date column.
+   * <p>
+   * This transform represents a logical mapping from a timestamp or date to a 
date, such as
+   * 2018-05-13.
+   * <p>
+   * The name reported by transforms created with this method is "days".
+   *
+   * @param column an input timestamp or date column
+   * @return a logical daily transform with name "days"
+   */
+  public Transform days(String column) {
+    return LogicalExpressions.days(column);
+  }
+
+  /**
+   * Create an hourly transform for a timestamp column.
+   * <p>
+   * This transform represents a logical mapping from a timestamp to a date 
and hour, such as
+   * 2018-05-13, hour 19.
+   * <p>
+   * The name reported by transforms created with this method is "hours".
+   *
+   * @param column an input timestamp column
+   * @return a logical hourly transform with name "hours"
+   */
+  public Transform hours(String column) {
+    return LogicalExpressions.hours(column);
+  }
+
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Literal.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Literal.java
new file mode 100644
index 0000000..e41bcf9
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Literal.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Represents a constant literal value in the public expression API.
+ * <p>
+ * The JVM type of the value held by a literal must be the type used by 
Spark's InternalRow API for
+ * the literal's {@link DataType SQL data type}.
+ *
+ * @param <T> the JVM type of a value held by the literal
+ */
+@Experimental
+public interface Literal<T> extends Expression {
+  /**
+   * Returns the literal value.
+   */
+  T value();
+
+  /**
+   * Returns the SQL data type of the literal.
+   */
+  DataType dataType();
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/NamedReference.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/NamedReference.java
new file mode 100644
index 0000000..c71ffbe
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/NamedReference.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Represents a field or column reference in the public logical expression API.
+ */
+@Experimental
+public interface NamedReference extends Expression {
+  /**
+   * Returns the referenced field name as an array of String parts.
+   * <p>
+   * Each string in the returned array represents a field name.
+   */
+  String[] fieldNames();
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Transform.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Transform.java
new file mode 100644
index 0000000..c85e0c4
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Transform.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Represents a transform function in the public logical expression API.
+ * <p>
+ * For example, the transform date(ts) is used to derive a date value from a 
timestamp column. The
+ * transform name is "date" and its argument is a reference to the "ts" column.
+ */
+@Experimental
+public interface Transform extends Expression {
+  /**
+   * Returns the transform function name.
+   */
+  String name();
+
+  /**
+   * Returns all field references in the transform arguments.
+   */
+  NamedReference[] references();
+
+  /**
+   * Returns the arguments passed to the transform function.
+   */
+  Expression[] arguments();
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
new file mode 100644
index 0000000..813d882
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
+
+/**
+ * Helper methods for working with the logical expressions API.
+ *
+ * Factory methods can be used when referencing the logical expression nodes 
is ambiguous because
+ * logical and internal expressions are used.
+ */
+private[sql] object LogicalExpressions {
+  // a generic parser that is only used for parsing multi-part field names.
+  // because this is only used for field names, the SQL conf passed in does 
not matter.
+  private lazy val parser = new CatalystSqlParser(SQLConf.get)
+
+  def fromPartitionColumns(columns: String*): Array[IdentityTransform] =
+    columns.map(identity).toArray
+
+  def fromBucketSpec(spec: BucketSpec): BucketTransform = {
+    if (spec.sortColumnNames.nonEmpty) {
+      throw new AnalysisException(
+        s"Cannot convert bucketing with sort columns to a transform: $spec")
+    }
+
+    bucket(spec.numBuckets, spec.bucketColumnNames: _*)
+  }
+
+  implicit class TransformHelper(transforms: Seq[Transform]) {
+    def asPartitionColumns: Seq[String] = {
+      val (idTransforms, nonIdTransforms) = 
transforms.partition(_.isInstanceOf[IdentityTransform])
+
+      if (nonIdTransforms.nonEmpty) {
+        throw new AnalysisException("Transforms cannot be converted to 
partition columns: " +
+            nonIdTransforms.map(_.describe).mkString(", "))
+      }
+
+      idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map 
{ ref =>
+        val parts = ref.fieldNames
+        if (parts.size > 1) {
+          throw new AnalysisException(s"Cannot partition by nested column: 
$ref")
+        } else {
+          parts(0)
+        }
+      }
+    }
+  }
+
+  def literal[T](value: T): LiteralValue[T] = {
+    val internalLit = catalyst.expressions.Literal(value)
+    literal(value, internalLit.dataType)
+  }
+
+  def literal[T](value: T, dataType: DataType): LiteralValue[T] = 
LiteralValue(value, dataType)
+
+  def reference(name: String): NamedReference =
+    FieldReference(parser.parseMultipartIdentifier(name))
+
+  def apply(name: String, arguments: Expression*): Transform = 
ApplyTransform(name, arguments)
+
+  def bucket(numBuckets: Int, columns: String*): BucketTransform =
+    BucketTransform(literal(numBuckets, IntegerType), columns.map(reference))
+
+  def identity(column: String): IdentityTransform = 
IdentityTransform(reference(column))
+
+  def years(column: String): YearsTransform = YearsTransform(reference(column))
+
+  def months(column: String): MonthsTransform = 
MonthsTransform(reference(column))
+
+  def days(column: String): DaysTransform = DaysTransform(reference(column))
+
+  def hours(column: String): HoursTransform = HoursTransform(reference(column))
+}
+
+/**
+ * Base class for simple transforms of a single column.
+ */
+private[sql] abstract class SingleColumnTransform(ref: NamedReference) extends 
Transform {
+
+  def reference: NamedReference = ref
+
+  override def references: Array[NamedReference] = Array(ref)
+
+  override def arguments: Array[Expression] = Array(ref)
+
+  override def describe: String = name + "(" + reference.describe + ")"
+
+  override def toString: String = describe
+}
+
+private[sql] final case class BucketTransform(
+    numBuckets: Literal[Int],
+    columns: Seq[NamedReference]) extends Transform {
+
+  override val name: String = "bucket"
+
+  override def references: Array[NamedReference] = {
+    arguments
+        .filter(_.isInstanceOf[NamedReference])
+        .map(_.asInstanceOf[NamedReference])
+  }
+
+  override def arguments: Array[Expression] = numBuckets +: columns.toArray
+
+  override def describe: String = 
s"bucket(${arguments.map(_.describe).mkString(", ")})"
+
+  override def toString: String = describe
+}
+
+private[sql] final case class ApplyTransform(
+    name: String,
+    args: Seq[Expression]) extends Transform {
+
+  override def arguments: Array[Expression] = args.toArray
+
+  override def references: Array[NamedReference] = {
+    arguments
+        .filter(_.isInstanceOf[NamedReference])
+        .map(_.asInstanceOf[NamedReference])
+  }
+
+  override def describe: String = 
s"$name(${arguments.map(_.describe).mkString(", ")})"
+
+  override def toString: String = describe
+}
+
+private[sql] final case class IdentityTransform(
+    ref: NamedReference) extends SingleColumnTransform(ref) {
+  override val name: String = "identity"
+  override def describe: String = ref.describe
+}
+
+private[sql] final case class YearsTransform(
+    ref: NamedReference) extends SingleColumnTransform(ref) {
+  override val name: String = "years"
+}
+
+private[sql] final case class MonthsTransform(
+    ref: NamedReference) extends SingleColumnTransform(ref) {
+  override val name: String = "months"
+}
+
+private[sql] final case class DaysTransform(
+    ref: NamedReference) extends SingleColumnTransform(ref) {
+  override val name: String = "days"
+}
+
+private[sql] final case class HoursTransform(
+    ref: NamedReference) extends SingleColumnTransform(ref) {
+  override val name: String = "hours"
+}
+
+private[sql] final case class LiteralValue[T](value: T, dataType: DataType) 
extends Literal[T] {
+  override def describe: String = {
+    if (dataType.isInstanceOf[StringType]) {
+      s"'$value'"
+    } else {
+      s"$value"
+    }
+  }
+  override def toString: String = describe
+}
+
+private[sql] final case class FieldReference(parts: Seq[String]) extends 
NamedReference {
+  override def fieldNames: Array[String] = parts.toArray
+  override def describe: String = parts.map(quote).mkString(".")
+  override def toString: String = describe
+
+  private def quote(part: String): String = {
+    if (part.contains(".") || part.contains("`")) {
+      s"`${part.replace("`", "``")}`"
+    } else {
+      part
+    }
+  }
+}
+
+private[sql] object FieldReference {
+  def apply(column: String): NamedReference = {
+    LogicalExpressions.reference(column)
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 111815b..6bb991c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -28,6 +28,8 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, 
TerminalNode}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2
+import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, FieldReference, HoursTransform, 
IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
@@ -2043,6 +2045,95 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
   }
 
   /**
+   * Parse a list of transforms.
+   */
+  override def visitTransformList(ctx: TransformListContext): Seq[Transform] = 
withOrigin(ctx) {
+    def getFieldReference(
+        ctx: ApplyTransformContext,
+        arg: v2.expressions.Expression): FieldReference = {
+      lazy val name: String = ctx.identifier.getText
+      arg match {
+        case ref: FieldReference =>
+          ref
+        case nonRef =>
+          throw new ParseException(
+            s"Expected a column reference for transform $name: 
${nonRef.describe}", ctx)
+      }
+    }
+
+    def getSingleFieldReference(
+        ctx: ApplyTransformContext,
+        arguments: Seq[v2.expressions.Expression]): FieldReference = {
+      lazy val name: String = ctx.identifier.getText
+      if (arguments.size > 1) {
+        throw new ParseException(s"Too many arguments for transform $name", 
ctx)
+      } else if (arguments.isEmpty) {
+        throw new ParseException(s"Not enough arguments for transform $name", 
ctx)
+      } else {
+        getFieldReference(ctx, arguments.head)
+      }
+    }
+
+    ctx.transforms.asScala.map {
+      case identityCtx: IdentityTransformContext =>
+        IdentityTransform(FieldReference(
+          identityCtx.qualifiedName.identifier.asScala.map(_.getText)))
+
+      case applyCtx: ApplyTransformContext =>
+        val arguments = applyCtx.argument.asScala.map(visitTransformArgument)
+
+        applyCtx.identifier.getText match {
+          case "bucket" =>
+            val numBuckets: Int = arguments.head match {
+              case LiteralValue(shortValue, ShortType) =>
+                shortValue.asInstanceOf[Short].toInt
+              case LiteralValue(intValue, IntegerType) =>
+                intValue.asInstanceOf[Int]
+              case LiteralValue(longValue, LongType) =>
+                longValue.asInstanceOf[Long].toInt
+              case lit =>
+                throw new ParseException(s"Invalid number of buckets: 
${lit.describe}", applyCtx)
+            }
+
+            val fields = arguments.tail.map(arg => getFieldReference(applyCtx, 
arg))
+
+            BucketTransform(LiteralValue(numBuckets, IntegerType), fields)
+
+          case "years" =>
+            YearsTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case "months" =>
+            MonthsTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case "days" =>
+            DaysTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case "hours" =>
+            HoursTransform(getSingleFieldReference(applyCtx, arguments))
+
+          case name =>
+            ApplyTransform(name, arguments)
+        }
+    }
+  }
+
+  /**
+   * Parse an argument to a transform. An argument may be a field reference 
(qualified name) or
+   * a value literal.
+   */
+  override def visitTransformArgument(ctx: TransformArgumentContext): 
v2.expressions.Expression = {
+    withOrigin(ctx) {
+      val reference = Option(ctx.qualifiedName)
+          .map(nameCtx => 
FieldReference(nameCtx.identifier.asScala.map(_.getText)))
+      val literal = Option(ctx.constant)
+          .map(typedVisit[Literal])
+          .map(lit => LiteralValue(lit.value, lit.dataType))
+      reference.orElse(literal)
+          .getOrElse(throw new ParseException(s"Invalid transform argument", 
ctx))
+    }
+  }
+
+  /**
    * Create a table, returning a [[CreateTableStatement]] logical plan.
    *
    * Expected format:
@@ -2054,7 +2145,7 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
    *
    *   create_table_clauses (order insensitive):
    *     [OPTIONS table_property_list]
-   *     [PARTITIONED BY (col_name, col_name, ...)]
+   *     [PARTITIONED BY (col_name, transform(col_name), transform(constant, 
col_name), ...)]
    *     [CLUSTERED BY (col_name, col_name, ...)
    *       [SORTED BY (col_name [ASC|DESC], ...)]
    *       INTO num_buckets BUCKETS
@@ -2078,8 +2169,8 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
     checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
 
     val schema = Option(ctx.colTypeList()).map(createSchema)
-    val partitionCols: Seq[String] =
-      Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil)
+    val partitioning: Seq[Transform] =
+      Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
     val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
     val properties = 
Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
@@ -2099,7 +2190,7 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 
       case Some(query) =>
         CreateTableAsSelectStatement(
-          table, query, partitionCols, bucketSpec, properties, provider, 
options, location, comment,
+          table, query, partitioning, bucketSpec, properties, provider, 
options, location, comment,
           ifNotExists = ifNotExists)
 
       case None if temp =>
@@ -2108,7 +2199,7 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
         operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
 
       case _ =>
-        CreateTableStatement(table, schema.getOrElse(new StructType), 
partitionCols, bucketSpec,
+        CreateTableStatement(table, schema.getOrElse(new StructType), 
partitioning, bucketSpec,
           properties, provider, options, location, comment, ifNotExists = 
ifNotExists)
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
index c734968..ed1b3e3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical.sql
 
+import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -31,7 +32,7 @@ import org.apache.spark.sql.types.StructType
 case class CreateTableStatement(
     table: TableIdentifier,
     tableSchema: StructType,
-    partitioning: Seq[String],
+    partitioning: Seq[Transform],
     bucketSpec: Option[BucketSpec],
     properties: Map[String, String],
     provider: String,
@@ -51,7 +52,7 @@ case class CreateTableStatement(
 case class CreateTableAsSelectStatement(
     table: TableIdentifier,
     asSelect: LogicalPlan,
-    partitioning: Seq[String],
+    partitioning: Seq[Transform],
     bucketSpec: Option[BucketSpec],
     properties: Map[String, String],
     provider: String,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index dae8f58..98388a7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.catalyst.parser
 
+import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, FieldReference, HoursTransform, 
IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.AnalysisTest
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import 
org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, 
CreateTableStatement}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType, 
TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
 
 class DDLParserSuite extends AnalysisTest {
   import CatalystSqlParser._
@@ -92,7 +94,7 @@ class DDLParserSuite extends AnalysisTest {
         assert(create.tableSchema == new StructType()
             .add("a", IntegerType, nullable = true, "test")
             .add("b", StringType))
-        assert(create.partitioning == Seq("a"))
+        assert(create.partitioning == 
Seq(IdentityTransform(FieldReference("a"))))
         assert(create.bucketSpec.isEmpty)
         assert(create.properties.isEmpty)
         assert(create.provider == "parquet")
@@ -107,6 +109,52 @@ class DDLParserSuite extends AnalysisTest {
     }
   }
 
+  test("create table - partitioned by transforms") {
+    val sql =
+      """
+        |CREATE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet
+        |PARTITIONED BY (
+        |    a,
+        |    bucket(16, b),
+        |    years(ts),
+        |    months(ts),
+        |    days(ts),
+        |    hours(ts),
+        |    foo(a, "bar", 34))
+      """.stripMargin
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType()
+            .add("a", IntegerType)
+            .add("b", StringType)
+            .add("ts", TimestampType))
+        assert(create.partitioning == Seq(
+            IdentityTransform(FieldReference("a")),
+            BucketTransform(LiteralValue(16, IntegerType), 
Seq(FieldReference("b"))),
+            YearsTransform(FieldReference("ts")),
+            MonthsTransform(FieldReference("ts")),
+            DaysTransform(FieldReference("ts")),
+            HoursTransform(FieldReference("ts")),
+            ApplyTransform("foo", Seq(
+                FieldReference("a"),
+                LiteralValue(UTF8String.fromString("bar"), StringType),
+                LiteralValue(34, IntegerType)))))
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
   test("create table - with bucket") {
     val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
         "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index 9fd44ea..f503ff0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import java.util.Locale
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType, CatalogUtils}
@@ -31,6 +32,8 @@ import org.apache.spark.sql.sources.v2.TableProvider
 import org.apache.spark.sql.types.StructType
 
 case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with 
CastSupport  {
+  import 
org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions.TransformHelper
+
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case CreateTableStatement(
         table, schema, partitionCols, bucketSpec, properties, 
V1WriteProvider(provider), options,
@@ -75,7 +78,7 @@ case class DataSourceResolution(conf: SQLConf) extends 
Rule[LogicalPlan] with Ca
   private def buildCatalogTable(
       table: TableIdentifier,
       schema: StructType,
-      partitionColumnNames: Seq[String],
+      partitioning: Seq[Transform],
       bucketSpec: Option[BucketSpec],
       properties: Map[String, String],
       provider: String,
@@ -104,7 +107,7 @@ case class DataSourceResolution(conf: SQLConf) extends 
Rule[LogicalPlan] with Ca
       storage = storage.copy(locationUri = customLocation),
       schema = schema,
       provider = Some(provider),
-      partitionColumnNames = partitionColumnNames,
+      partitionColumnNames = partitioning.asPartitionColumns,
       bucketSpec = bucketSpec,
       properties = properties,
       comment = comment)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 89c5df0..7fae54b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -65,6 +65,26 @@ class PlanResolutionSuite extends AnalysisTest {
     }
   }
 
+  test("create table - partitioned by transforms") {
+    val transforms = Seq(
+        "bucket(16, b)", "years(ts)", "months(ts)", "days(ts)", "hours(ts)", 
"foo(a, 'bar', 34)",
+        "bucket(32, b), days(ts)")
+    transforms.foreach { transform =>
+      val query =
+        s"""
+           |CREATE TABLE my_tab(a INT, b STRING) USING parquet
+           |PARTITIONED BY ($transform)
+           """.stripMargin
+
+      val ae = intercept[AnalysisException] {
+        parseAndResolve(query)
+      }
+
+      assert(ae.message
+          .contains(s"Transforms cannot be converted to partition columns: 
$transform"))
+    }
+  }
+
   test("create table - with bucket") {
     val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
         "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to