This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 87d7ab6  [SPARK-32608][SQL][3.0] Script Transform ROW FORMAT DELIMIT 
value should format value
87d7ab6 is described below

commit 87d7ab6c6e96db2bd019d743d9459d6f00240829
Author: angerszhu <[email protected]>
AuthorDate: Thu Aug 20 13:43:15 2020 +0000

    [SPARK-32608][SQL][3.0] Script Transform ROW FORMAT DELIMIT value should 
format value
    
    ### What changes were proposed in this pull request?
    For SQL
    ```
    SELECT TRANSFORM(a, b, c)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      NULL DEFINED AS 'null'
      USING 'cat' AS (a, b, c)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      NULL DEFINED AS 'NULL'
    FROM testData
    ```
    The correct
    
    TOK_TABLEROWFORMATFIELD should be `, `nut actually ` ','`
    
    TOK_TABLEROWFORMATLINES should be `\n`  but actually` '\n'`
    
    ### Why are the changes needed?
    Fix string value format
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #29487 from AngersZhuuuu/SPARK-32608-3.0.
    
    Authored-by: angerszhu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/parser/ParserUtils.scala    |  5 ++
 .../spark/sql/execution/SparkSqlParser.scala       |  7 --
 .../spark/sql/execution/SparkSqlParserSuite.scala  | 45 ++++++++++++-
 .../hive/execution/ScriptTransformationSuite.scala | 77 ++++++++++++++++++++--
 4 files changed, 118 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
index a377969..f2dab94 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
@@ -83,6 +83,11 @@ object ParserUtils {
     node.getText.slice(1, node.getText.size - 1)
   }
 
+  /** Collect the entries if any. */
+  def entry(key: String, value: Token): Seq[(String, String)] = {
+    Option(value).toSeq.map(x => key -> string(x))
+  }
+
   /** Get the origin (line and position) of the token. */
   def position(token: Token): Origin = {
     val opt = Option(token)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index aa139cb..44069f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -587,10 +587,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
    */
   override def visitRowFormatDelimited(
       ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
-    // Collect the entries if any.
-    def entry(key: String, value: Token): Seq[(String, String)] = {
-      Option(value).toSeq.map(x => key -> string(x))
-    }
     // TODO we need proper support for the NULL format.
     val entries =
       entry("field.delim", ctx.fieldsTerminatedBy) ++
@@ -689,9 +685,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
         // expects a seq of pairs in which the old parsers' token names are 
used as keys.
         // Transforming the result of visitRowFormatDelimited would be quite a 
bit messier than
         // retrieving the key value pairs ourselves.
-        def entry(key: String, value: Token): Seq[(String, String)] = {
-          Option(value).map(t => key -> t.getText).toSeq
-        }
         val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
           entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) 
++
           entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 06574a9..6a58c8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -21,8 +21,9 @@ import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, 
UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Ascending, 
AttributeReference, Concat, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{CreateTable, 
RefreshResource}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -251,4 +252,44 @@ class SparkSqlParserSuite extends AnalysisTest {
     assertEqual("ADD FILE /path with space/abc.txt", AddFileCommand("/path 
with space/abc.txt"))
     assertEqual("ADD JAR /path with space/abc.jar", AddJarCommand("/path with 
space/abc.jar"))
   }
+
+  test("SPARK-32608: script transform with row format delimit") {
+    assertEqual(
+      """
+        |SELECT TRANSFORM(a, b, c)
+        |  ROW FORMAT DELIMITED
+        |  FIELDS TERMINATED BY ','
+        |  COLLECTION ITEMS TERMINATED BY '#'
+        |  MAP KEYS TERMINATED BY '@'
+        |  LINES TERMINATED BY '\n'
+        |  NULL DEFINED AS 'null'
+        |  USING 'cat' AS (a, b, c)
+        |  ROW FORMAT DELIMITED
+        |  FIELDS TERMINATED BY ','
+        |  COLLECTION ITEMS TERMINATED BY '#'
+        |  MAP KEYS TERMINATED BY '@'
+        |  LINES TERMINATED BY '\n'
+        |  NULL DEFINED AS 'NULL'
+        |FROM testData
+      """.stripMargin,
+      ScriptTransformation(
+        Seq('a, 'b, 'c),
+        "cat",
+        Seq(AttributeReference("a", StringType)(),
+          AttributeReference("b", StringType)(),
+          AttributeReference("c", StringType)()),
+        UnresolvedRelation(TableIdentifier("testData")),
+        ScriptInputOutputSchema(
+          Seq(("TOK_TABLEROWFORMATFIELD", ","),
+            ("TOK_TABLEROWFORMATCOLLITEMS", "#"),
+            ("TOK_TABLEROWFORMATMAPKEYS", "@"),
+            ("TOK_TABLEROWFORMATLINES", "\n"),
+            ("TOK_TABLEROWFORMATNULL", "null")),
+          Seq(("TOK_TABLEROWFORMATFIELD", ","),
+            ("TOK_TABLEROWFORMATCOLLITEMS", "#"),
+            ("TOK_TABLEROWFORMATMAPKEYS", "@"),
+            ("TOK_TABLEROWFORMATLINES", "\n"),
+            ("TOK_TABLEROWFORMATNULL", "NULL")), None, None,
+          List.empty, List.empty, None, None, false)))
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index b97eb86..15a932f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -61,6 +62,14 @@ class ScriptTransformationSuite extends SparkPlanTest with 
SQLTestUtils with Tes
 
   private val uncaughtExceptionHandler = new TestUncaughtExceptionHandler
 
+  // In Hive 1.2, the string representation of a decimal omits trailing zeroes.
+  // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if 
necessary.
+  val decimalToString: Column => Column = if (HiveUtils.isHive23) {
+    c => c.cast("string")
+  } else {
+    c => c.cast("decimal(1, 0)").cast("string")
+  }
+
   protected override def beforeAll(): Unit = {
     super.beforeAll()
     defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler
@@ -212,13 +221,6 @@ class ScriptTransformationSuite extends SparkPlanTest with 
SQLTestUtils with Tes
           |FROM v
         """.stripMargin)
 
-      // In Hive 1.2, the string representation of a decimal omits trailing 
zeroes.
-      // But in Hive 2.3, it is always padded to 18 digits with trailing 
zeroes if necessary.
-      val decimalToString: Column => Column = if (HiveUtils.isHive23) {
-        c => c.cast("string")
-      } else {
-        c => c.cast("decimal(1, 0)").cast("string")
-      }
       checkAnswer(query, identity, df.select(
         'a.cast("string"),
         'b.cast("string"),
@@ -263,6 +265,67 @@ class ScriptTransformationSuite extends SparkPlanTest with 
SQLTestUtils with Tes
     assert(e.getMessage.contains("Subprocess exited with status"))
     assert(uncaughtExceptionHandler.exception.isEmpty)
   }
+
+
+  test("SPARK-32608: Script Transform ROW FORMAT DELIMIT value should format 
value") {
+    withTempView("v") {
+      val df = Seq(
+        (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)),
+        (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)),
+        (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3))
+      ).toDF("a", "b", "c", "d", "e") // Note column d's data type is 
Decimal(38, 18)
+      df.createTempView("v")
+
+      // input/output with same delimit
+      checkAnswer(
+        sql(
+          s"""
+             |SELECT TRANSFORM(a, b, c, d, cast(e as string))
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY ','
+             |  COLLECTION ITEMS TERMINATED BY '#'
+             |  MAP KEYS TERMINATED BY '@'
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'null'
+             |  USING 'cat' AS (a, b, c, d, e)
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY ','
+             |  COLLECTION ITEMS TERMINATED BY '#'
+             |  MAP KEYS TERMINATED BY '@'
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'NULL'
+             |FROM v
+        """.stripMargin), identity, df.select(
+          'a.cast("string"),
+          'b.cast("string"),
+          'c.cast("string"),
+          decimalToString('d),
+          'e.cast("string")).collect())
+
+      // input/output with different delimit and show result
+      checkAnswer(
+        sql(
+          s"""
+             |SELECT TRANSFORM(a, b, c, d, cast(e as string))
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY ','
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'null'
+             |  USING 'cat' AS (value)
+             |  ROW FORMAT DELIMITED
+             |  FIELDS TERMINATED BY '&'
+             |  LINES TERMINATED BY '\n'
+             |  NULL DEFINED AS 'NULL'
+             |FROM v
+        """.stripMargin), identity, df.select(
+          concat_ws(",",
+            'a.cast("string"),
+            'b.cast("string"),
+            'c.cast("string"),
+            decimalToString('d),
+            'e.cast("string"))).collect())
+    }
+  }
 }
 
 private case class ExceptionInjectingOperator(child: SparkPlan) extends 
UnaryExecNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to