This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a90a49828f4 [SPARK-39211][SQL] Support JSON scans with DEFAULT values
a90a49828f4 is described below
commit a90a49828f4484fa6c3dcfe5183bd4181f7cfd91
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Tue May 24 21:31:17 2022 +0800
[SPARK-39211][SQL] Support JSON scans with DEFAULT values
### What changes were proposed in this pull request?
Support JSON scans when the table schema has associated DEFAULT column
values.
Example:
```
create table t(i int) using json;
insert into t values(42);
alter table t add column s string default concat('abc', def');
select * from t;
> 42, 'abcdef'
```
Interesting note: JSON does not distinguish between NULL values and the
absence of values. Therefore inserting NULL and then selecting back the same
column yields the default value (if any), since the insert did not change any
storage.
### Why are the changes needed?
This change makes it easier to build, query, and maintain tables backed by
JSON data.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
This PR includes new test coverage.
Closes #36583 from dtenedor/default-json.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +-
.../spark/sql/catalyst/json/JacksonParser.scala | 5 +-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 63 ++++++++++++++++++++++
.../org/apache/spark/sql/types/StructType.scala | 30 +++--------
.../apache/spark/sql/types/StructTypeSuite.scala | 14 ++---
.../org/apache/spark/sql/sources/InsertSuite.scala | 35 +++++++++---
6 files changed, 110 insertions(+), 39 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index ff46672e67f..56ebfcc26c6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -327,7 +327,7 @@ class UnivocityParser(
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
// Use the corresponding DEFAULT value associated with the column,
if any.
- row.update(i, requiredSchema.defaultValues(i))
+ row.update(i, requiredSchema.existenceDefaultValues(i))
}
i += 1
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index abcbdb83813..7004d2a8f16 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.{InternalRow,
NoopFilters, StructFilters}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
@@ -421,12 +422,14 @@ class JacksonParser(
var skipRow = false
structFilters.reset()
+ resetExistenceDefaultsBitmask(schema)
while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
try {
row.update(index, fieldConverters(index).apply(parser))
skipRow = structFilters.skipRow(row, index)
+ schema.existenceDefaultsBitmask(index) = false
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) if isRoot =>
@@ -437,10 +440,10 @@ class JacksonParser(
parser.skipChildren()
}
}
-
if (skipRow) {
None
} else if (badRecordException.isEmpty) {
+ applyExistenceDefaultValuesToRow(schema, row)
Some(row)
} else {
throw PartialResultException(row, badRecordException.get)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index d2963a60409..262150174ea 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -18,11 +18,14 @@
package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -168,4 +171,64 @@ object ResolveDefaultColumns {
str.toLowerCase()
}
}
+
+ /**
+ * Parses the text representing constant-folded default column literal
values. These are known as
+ * "existence" default values because each one is the constant-folded result
of the original
+ * default value first assigned to the column at table/column creation time.
When scanning a field
+ * from any data source, if the corresponding value is not present in
storage, the output row
+ * returns this "existence" default value instead of NULL.
+ * @return a sequence of either (1) NULL, if the column had no default
value, or (2) an object of
+ * Any type suitable for assigning into a row using the
InternalRow.update method.
+ */
+ def getExistenceDefaultValues(schema: StructType): Array[Any] = {
+ schema.fields.map { field: StructField =>
+ val defaultValue: Option[String] = field.getExistenceDefaultValue()
+ defaultValue.map { text: String =>
+ val expr = try {
+ val expr = CatalystSqlParser.parseExpression(text)
+ expr match {
+ case _: ExprLiteral | _: AnsiCast | _: Cast => expr
+ }
+ } catch {
+ case _: ParseException | _: MatchError =>
+ throw
QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text)
+ }
+ // The expression should be a literal value by this point, possibly
wrapped in a cast
+ // function. This is enforced by the execution of commands that assign
default values.
+ expr.eval()
+ }.orNull
+ }
+ }
+
+ /**
+ * Returns an array of boolean values equal in size to the result of
[[getExistenceDefaultValues]]
+ * above, for convenience.
+ */
+ def getExistenceDefaultsBitmask(schema: StructType): Array[Boolean] = {
+ Array.fill[Boolean](schema.existenceDefaultValues.size)(true)
+ }
+
+ /**
+ * Resets the elements of the array initially returned from
[[getExistenceDefaultsBitmask]] above.
+ * Afterwards, set element(s) to false before calling
[[applyExistenceDefaultValuesToRow]] below.
+ */
+ def resetExistenceDefaultsBitmask(schema: StructType): Unit = {
+ for (i <- 0 until schema.existenceDefaultValues.size) {
+ schema.existenceDefaultsBitmask(i) = (schema.existenceDefaultValues(i)
!= null)
+ }
+ }
+
+ /**
+ * Updates a subset of columns in the row with default values from the
metadata in the schema.
+ */
+ def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow):
Unit = {
+ if (schema.hasExistenceDefaultValues) {
+ for (i <- 0 until schema.existenceDefaultValues.size) {
+ if (schema.existenceDefaultsBitmask(i)) {
+ row.update(i, schema.existenceDefaultValues(i))
+ }
+ }
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 464d1ba1ef9..06460513c8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -25,10 +25,11 @@ import org.json4s.JsonDSL._
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Attribute,
AttributeReference, Cast, InterpretedOrdering, Literal => ExprLiteral}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser,
LegacyTypeStringParser, ParseException}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, InterpretedOrdering}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser,
LegacyTypeStringParser}
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.catalyst.util.{truncatedString, StringUtils}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
@@ -513,28 +514,11 @@ case class StructType(fields: Array[StructField]) extends
DataType with Seq[Stru
InterpretedOrdering.forSchema(this.fields.map(_.dataType))
/**
- * Parses the text representing constant-folded default column literal
values.
- * @return a sequence of either (1) NULL, if the column had no default
value, or (2) an object of
- * Any type suitable for assigning into a row using the
InternalRow.update method.
+ * These define and cache existence default values for the struct fields for
efficiency purposes.
*/
- private [sql] lazy val defaultValues: Array[Any] =
- fields.map { field: StructField =>
- val defaultValue: Option[String] = field.getExistenceDefaultValue()
- defaultValue.map { text: String =>
- val expr = try {
- val expr = CatalystSqlParser.parseExpression(text)
- expr match {
- case _: ExprLiteral | _: AnsiCast | _: Cast => expr
- }
- } catch {
- case _: ParseException | _: MatchError =>
- throw
QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text)
- }
- // The expression should be a literal value by this point, possibly
wrapped in a cast
- // function. This is enforced by the execution of commands that assign
default values.
- expr.eval()
- }.getOrElse(null)
- }
+ private[sql] lazy val existenceDefaultValues: Array[Any] =
getExistenceDefaultValues(this)
+ private[sql] lazy val existenceDefaultsBitmask: Array[Boolean] =
getExistenceDefaultsBitmask(this)
+ private[sql] lazy val hasExistenceDefaultValues =
existenceDefaultValues.exists(_ != null)
}
/**
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index 3aca7b1e52e..940a8e5e2ec 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -457,10 +457,10 @@ class StructTypeSuite extends SparkFunSuite with
SQLHelper {
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
.build()),
StructField("c3", BooleanType)))
- assert(source1.defaultValues.size == 3)
- assert(source1.defaultValues(0) == 42)
- assert(source1.defaultValues(1) == UTF8String.fromString("abc"))
- assert(source1.defaultValues(2) == null)
+ assert(source1.existenceDefaultValues.size == 3)
+ assert(source1.existenceDefaultValues(0) == 42)
+ assert(source1.existenceDefaultValues(1) == UTF8String.fromString("abc"))
+ assert(source1.existenceDefaultValues(2) == null)
// Negative test: StructType.defaultValues fails because the existence
default value parses and
// resolves successfully, but evaluates to a non-literal expression.
@@ -472,7 +472,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
.build())))
val error = "fails to parse as a valid literal value"
assert(intercept[AnalysisException] {
- source2.defaultValues
+ source2.existenceDefaultValues
}.getMessage.contains(error))
// Negative test: StructType.defaultValues fails because the existence
default value fails to
@@ -484,7 +484,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid")
.build())))
assert(intercept[AnalysisException] {
- source3.defaultValues
+ source3.existenceDefaultValues
}.getMessage.contains(error))
// Negative test: StructType.defaultValues fails because the existence
default value fails to
@@ -500,7 +500,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
"(SELECT 'abc' FROM missingtable)")
.build())))
assert(intercept[AnalysisException] {
- source4.defaultValues
+ source4.existenceDefaultValues
}.getMessage.contains(error))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 1580a33a9eb..247c6bdb355 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1572,23 +1572,44 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
// This represents one test configuration over a data source.
- case class Config(dataSource: String, sqlConf: Seq[(String, String)] =
Seq())
+ case class Config(
+ dataSource: String,
+ sqlConf: Seq[Option[(String, String)]] = Seq())
+ // Run the test several times using each configuration.
Seq(
+ Config(dataSource = "json",
+ Seq(
+ Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false"))),
Config(dataSource = "csv",
Seq(
- SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false"))
+ None,
+ Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))
).foreach { config: Config =>
- // First run the test with default settings.
- runTest(config.dataSource)
- // Then run the test again with each pair of custom SQLConf values.
- config.sqlConf.foreach { kv: (String, String) =>
- withSQLConf(kv) {
+ config.sqlConf.foreach {
+ _.map { kv: (String, String) =>
+ withSQLConf(kv) {
+ // Run the test with the pair of custom SQLConf values.
+ runTest(config.dataSource)
+ }
+ }.getOrElse {
+ // Run the test with default settings.
runTest(config.dataSource)
}
}
}
}
+ test("SPARK-39211 INSERT into JSON table, ADD COLUMNS with DEFAULTs, then
SELECT them") {
+ // By default, INSERT commands into JSON tables do not store NULL values.
Therefore, if such
+ // destination table columns have DEFAULT values, SELECTing out the same
columns will return the
+ // default values (instead of NULL) since nothing is present in storage.
+ withTable("t") {
+ sql("create table t(a string default 'abc') using json")
+ sql("insert into t values(null)")
+ checkAnswer(spark.table("t"), Row("abc"))
+ }
+ }
+
test("Stop task set if FileAlreadyExistsException was thrown") {
Seq(true, false).foreach { fastFail =>
withSQLConf("fs.file.impl" ->
classOf[FileExistingTestFileSystem].getName,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]