This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 544b7e1  [SPARK-35998][SQL] Make from_csv/to_csv to handle year-month 
intervals properly
544b7e1 is described below

commit 544b7e16acf51b7c2a95588885fb4ebe7b19a00e
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Mon Jul 5 13:10:50 2021 +0300

    [SPARK-35998][SQL] Make from_csv/to_csv to handle year-month intervals 
properly
    
    ### What changes were proposed in this pull request?
    
    This PR fixes an issue that `from_csv/to_csv` doesn't handle year-month 
intervals properly.
    `from_csv` throws exception if year-month interval types are given.
    ```
    spark-sql> select from_csv("interval '1-2' year to month", "a interval year 
to month");
    21/07/03 04:32:24 ERROR SparkSQLDriver: Failed in [select 
from_csv("interval '1-2' year to month", "a interval year to month")]
    java.lang.Exception: Unsupported type: interval year to month
        at 
org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
    ```
    
    Also, `to_csv` doesn't handle year-month interval types properly though any 
exception is thrown.
    The result of `to_csv` for year-month interval types is not ANSI interval 
compliant form.
    
    ```
    spark-sql> select to_csv(named_struct("a", interval '1-2' year to month));
    14
    ```
    The result above should be `INTERVAL '1-2' YEAR TO MONTH`.
    
    ### Why are the changes needed?
    
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #33210 from sarutak/csv-yminterval.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
    (cherry picked from commit f4237aff7ebece0b8d61e680ecbe759850f25af8)
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../sql/catalyst/csv/UnivocityGenerator.scala      |  7 +++++-
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  7 +++++-
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 29 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 11b31ce..5d70ccb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -22,7 +22,7 @@ import java.io.Writer
 import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, 
IntervalStringStyles, IntervalUtils, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.types._
 
@@ -61,6 +61,11 @@ class UnivocityGenerator(
     case TimestampType =>
       (row: InternalRow, ordinal: Int) => 
timestampFormatter.format(row.getLong(ordinal))
 
+    case YearMonthIntervalType(start, end) =>
+      (row: InternalRow, ordinal: Int) =>
+        IntervalUtils.toYearMonthIntervalString(
+          row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
+
     case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
 
     case dt: DataType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 672d133..3ec1ea0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -26,7 +26,7 @@ import com.univocity.parsers.csv.CsvParser
 import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
-import org.apache.spark.sql.catalyst.expressions.{ExprUtils, 
GenericInternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -217,6 +217,11 @@ class UnivocityParser(
         IntervalUtils.safeStringToInterval(UTF8String.fromString(datum))
       }
 
+    case ym: YearMonthIntervalType => (d: String) =>
+      nullSafeDatum(d, name, nullable, options) { datum =>
+        Cast(Literal(datum), ym).eval(EmptyRow)
+      }
+
     case udt: UserDefinedType[_] =>
       makeConverter(name, udt.sqlType, nullable)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 6ae57cb..c6afb25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.text.SimpleDateFormat
+import java.time.Period
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -27,6 +28,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
 
 class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
@@ -279,4 +281,31 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-35998: Make from_csv/to_csv to handle year-month intervals 
properly") {
+    val ymDF = Seq(Period.of(1, 2, 0)).toDF
+    Seq(
+      (YearMonthIntervalType(), "INTERVAL '1-2' YEAR TO MONTH", Period.of(1, 
2, 0)),
+      (YearMonthIntervalType(YEAR), "INTERVAL '1' YEAR", Period.of(1, 0, 0)),
+      (YearMonthIntervalType(MONTH), "INTERVAL '14' MONTH", Period.of(1, 2, 0))
+    ).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) =>
+      val toCsvDF = ymDF.select(to_csv(struct($"value" cast toCsvDtype)) as 
"csv")
+      checkAnswer(toCsvDF, Row(toCsvExpected))
+
+      DataTypeTestUtils.yearMonthIntervalTypes.foreach { fromCsvDtype =>
+        val fromCsvDF = toCsvDF
+          .select(
+            from_csv(
+              $"csv",
+              StructType(StructField("a", fromCsvDtype) :: Nil),
+              Map.empty[String, String]) as "value")
+          .selectExpr("value.a")
+        if (toCsvDtype == fromCsvDtype) {
+          checkAnswer(fromCsvDF, Row(fromCsvExpected))
+        } else {
+          checkAnswer(fromCsvDF, Row(null))
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to