This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f5f67b851e2 [SPARK-39143][SQL] Support CSV scans with DEFAULT values
f5f67b851e2 is described below

commit f5f67b851e28afd898a2e3844c088c3041a199fe
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Wed May 18 08:38:43 2022 +0900

    [SPARK-39143][SQL] Support CSV scans with DEFAULT values
    
    ### What changes were proposed in this pull request?
    
    Support CSV scans when the table schema has associated DEFAULT column 
values.
    
    Example:
    
    ```
    create table t(i int) using csv;
    insert into t values(42);
    alter table t add column s string default concat('abc', def');
    select * from t;
    > 42, 'abcdef'
    ```
    
    ### Why are the changes needed?
    
    This change makes it easier to build, query, and maintain tables backed by 
CSV data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36501 from dtenedor/default-csv.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 17 +++--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 ++
 .../org/apache/spark/sql/types/StructField.scala   | 11 ++++
 .../org/apache/spark/sql/types/StructType.scala    | 28 ++++++++-
 .../apache/spark/sql/types/StructTypeSuite.scala   | 63 +++++++++++++++++++
 .../org/apache/spark/sql/sources/InsertSuite.scala | 73 ++++++++++++++++++++++
 6 files changed, 192 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 56166950e67..ff46672e67f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
NoopFilters, OrderedFilters}
 import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
@@ -67,9 +68,16 @@ class UnivocityParser(
   private val tokenIndexArr =
     requiredSchema.map(f => 
java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray
 
+  // True if we should inform the Univocity CSV parser to select which fields 
to read by their
+  // positions. Generally assigned by input configuration options, except when 
input column(s) have
+  // default values, in which case we omit the explicit indexes in order to 
know how many tokens
+  // were present in each line instead.
+  private def columnPruning: Boolean = options.columnPruning &&
+    
!requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
+
   // When column pruning is enabled, the parser only parses the required 
columns based on
   // their positions in the data schema.
-  private val parsedSchema = if (options.columnPruning) requiredSchema else 
dataSchema
+  private val parsedSchema = if (columnPruning) requiredSchema else dataSchema
 
   val tokenizer: CsvParser = {
     val parserSetting = options.asParserSettings
@@ -266,7 +274,7 @@ class UnivocityParser(
    */
   val parse: String => Option[InternalRow] = {
     // This is intentionally a val to create a function once and reuse.
-    if (options.columnPruning && requiredSchema.isEmpty) {
+    if (columnPruning && requiredSchema.isEmpty) {
       // If `columnPruning` enabled and partition attributes scanned only,
       // `schema` gets empty.
       (_: String) => Some(InternalRow.empty)
@@ -276,7 +284,7 @@ class UnivocityParser(
     }
   }
 
-  private val getToken = if (options.columnPruning) {
+  private val getToken = if (columnPruning) {
     (tokens: Array[String], index: Int) => tokens(index)
   } else {
     (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
@@ -318,7 +326,8 @@ class UnivocityParser(
         case e: SparkUpgradeException => throw e
         case NonFatal(e) =>
           badRecordException = badRecordException.orElse(Some(e))
-          row.setNullAt(i)
+          // Use the corresponding DEFAULT value associated with the column, 
if any.
+          row.update(i, requiredSchema.defaultValues(i))
       }
       i += 1
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 912f65aa58c..3d133d6cfab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2445,4 +2445,10 @@ object QueryCompilationErrors extends QueryErrorsBase {
       "Failed to execute MERGE INTO command because one of its INSERT or 
UPDATE assignments " +
         "contains a DEFAULT column reference as part of another expression; 
this is not allowed")
   }
+
+  def failedToParseExistenceDefaultAsLiteral(fieldName: String, defaultValue: 
String): Throwable = {
+    throw new AnalysisException(
+      s"Invalid DEFAULT value for column $fieldName: $defaultValue fails to 
parse as a valid " +
+        "literal value")
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
index c80745ff6b5..1fdde3e5219 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -118,6 +118,17 @@ case class StructField(
     }
   }
 
+  /**
+   * Return the existence default value of this StructField.
+   */
+  private[sql] def getExistenceDefaultValue(): Option[String] = {
+    if (metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) {
+      Option(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
   private def getDDLComment = getComment()
     .map(escapeSingleQuotedString)
     .map(" COMMENT '" + _ + "'")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index ec08ee4838f..464d1ba1ef9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -25,8 +25,8 @@ import org.json4s.JsonDSL._
 
 import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, InterpretedOrdering}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
LegacyTypeStringParser}
+import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Attribute, 
AttributeReference, Cast, InterpretedOrdering, Literal => ExprLiteral}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
LegacyTypeStringParser, ParseException}
 import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.catalyst.util.{truncatedString, StringUtils}
 import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
@@ -511,6 +511,30 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
   @transient
   private[sql] lazy val interpretedOrdering =
     InterpretedOrdering.forSchema(this.fields.map(_.dataType))
+
+  /**
+   * Parses the text representing constant-folded default column literal 
values.
+   * @return a sequence of either (1) NULL, if the column had no default 
value, or (2) an object of
+   *         Any type suitable for assigning into a row using the 
InternalRow.update method.
+   */
+  private [sql] lazy val defaultValues: Array[Any] =
+    fields.map { field: StructField =>
+      val defaultValue: Option[String] = field.getExistenceDefaultValue()
+      defaultValue.map { text: String =>
+        val expr = try {
+          val expr = CatalystSqlParser.parseExpression(text)
+          expr match {
+            case _: ExprLiteral | _: AnsiCast | _: Cast => expr
+          }
+        } catch {
+          case _: ParseException | _: MatchError =>
+            throw 
QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text)
+        }
+        // The expression should be a literal value by this point, possibly 
wrapped in a cast
+        // function. This is enforced by the execution of commands that assign 
default values.
+        expr.eval()
+      }.getOrElse(null)
+    }
 }
 
 /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index 16f122334f3..ef29f7b9cbb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -22,12 +22,14 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, 
caseSensitiveResolution}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DayTimeIntervalType => DT}
 import org.apache.spark.sql.types.{YearMonthIntervalType => YM}
 import org.apache.spark.sql.types.DayTimeIntervalType._
 import org.apache.spark.sql.types.StructType.fromDDL
 import org.apache.spark.sql.types.YearMonthIntervalType._
+import org.apache.spark.unsafe.types.UTF8String
 
 class StructTypeSuite extends SparkFunSuite with SQLHelper {
 
@@ -436,4 +438,65 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper 
{
     }
     assert(e.getMessage.contains("Failed to merge decimal types"))
   }
+
+  test("SPARK-39143: Test parsing default column values out of struct types") {
+    // Positive test: the StructType.defaultValues evaluation is successful.
+    val source1 = StructType(Array(
+      StructField("c1", LongType, true,
+        new MetadataBuilder()
+          .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"CAST(42 AS BIGINT)")
+          
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 
AS BIGINT")
+          .build()),
+      StructField("c2", StringType, true,
+        new MetadataBuilder()
+          .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"'abc'")
+          
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
+          .build()),
+      StructField("c3", BooleanType)))
+    assert(source1.defaultValues.size == 3)
+    assert(source1.defaultValues(0) == 42)
+    assert(source1.defaultValues(1) == UTF8String.fromString("abc"))
+    assert(source1.defaultValues(2) == null)
+
+    // Negative test: StructType.defaultValues fails because the existence 
default value parses and
+    // resolves successfully, but evaluates to a non-literal expression.
+    val source2 = StructType(
+      Array(StructField("c1", IntegerType, true,
+        new MetadataBuilder()
+        .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"1 + 1")
+          
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "1 + 1")
+          .build())))
+    val error = "fails to parse as a valid literal value"
+  assert(intercept[AnalysisException] {
+      source2.defaultValues
+    }.getMessage.contains(error))
+
+    // Negative test: StructType.defaultValues fails because the existence 
default value fails to
+    // parse.
+    val source3 = StructType(Array(
+      StructField("c1", IntegerType, true,
+        new MetadataBuilder()
+          .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"invalid")
+          
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid")
+          .build())))
+    assert(intercept[AnalysisException] {
+      source3.defaultValues
+    }.getMessage.contains(error))
+
+    // Negative test: StructType.defaultValues fails because the existence 
default value fails to
+    // resolve.
+    val source4 = StructType(Array(
+      StructField("c1", IntegerType, true,
+        new MetadataBuilder()
+          .putString(
+            ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
+            "(SELECT 'abc' FROM missingtable)")
+          .putString(
+            ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+            "(SELECT 'abc' FROM missingtable)")
+          .build())))
+    assert(intercept[AnalysisException] {
+      source4.defaultValues
+    }.getMessage.contains(error))
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index a2237b377cf..8fbaafbead3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1512,6 +1512,79 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them: 
Positive tests") {
+    def runTest(dataSource: String): Unit = {
+      val createTableIntCol = s"create table t(a string, i int) using 
$dataSource"
+      // Adding a column with a valid default value into a table containing 
existing data works
+      // successfully. Querying data from the altered table returns the new 
value.
+      withTable("t") {
+        sql(createTableIntCol)
+        sql("insert into t values('xyz', 42)")
+        sql("alter table t add column (s string default concat('abc', 'def'))")
+        checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef"))
+        checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
+      }
+      // Same as above, but a following command alters the column to change 
the default value.
+      // This returns the previous value, not the new value, since the 
behavior semantics are
+      // the same as if the first command had performed a backfill of the new 
default value in
+      // the existing rows.
+      withTable("t") {
+        sql(createTableIntCol)
+        sql("insert into t values('xyz', 42)")
+        sql("alter table t add column (s string default concat('abc', 'def'))")
+        sql("alter table t alter column s set default concat('ghi', 'jkl')")
+        checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef"))
+        checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
+      }
+      // Adding a column with a default value and then inserting explicit NULL 
values works.
+      // Querying data back from the table differentiates between the explicit 
NULL values and
+      // default values.
+      withTable("t") {
+        sql(createTableIntCol)
+        sql("insert into t values('xyz', 42)")
+        sql("alter table t add column (s string default concat('abc', 'def'))")
+        sql("insert into t values(null, null, null)")
+        sql("alter table t add column (x boolean default true)")
+        checkAnswer(spark.table("t"),
+          Seq(
+            Row("xyz", 42, "abcdef", true),
+            Row(null, null, null, true)))
+        checkAnswer(sql("select i, s, x from t"),
+          Seq(
+            Row(42, "abcdef", true),
+            Row(null, null, true)))
+      }
+      // Adding two columns where only the first has a valid default value 
works successfully.
+      // Querying data from the altered table returns the default value as 
well as NULL for the
+      // second column.
+      withTable("t") {
+        sql(createTableIntCol)
+        sql("insert into t values('xyz', 42)")
+        sql("alter table t add column (s string default concat('abc', 'def'))")
+        sql("alter table t add column (x string)")
+        checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef", null))
+        checkAnswer(sql("select i, s, x from t"), Row(42, "abcdef", null))
+      }
+    }
+
+    // This represents one test configuration over a data source.
+    case class Config(dataSource: String, sqlConf: Seq[(String, String)] = 
Seq())
+    Seq(
+      Config(dataSource = "csv",
+        Seq(
+          SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false"))
+    ).foreach { config: Config =>
+      // First run the test with default settings.
+      runTest(config.dataSource)
+      // Then run the test again with each pair of custom SQLConf values.
+      config.sqlConf.foreach { kv: (String, String) =>
+        withSQLConf(kv) {
+          runTest(config.dataSource)
+        }
+      }
+    }
+  }
+
   test("Stop task set if FileAlreadyExistsException was thrown") {
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to