This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a364cc0 [SPARK-38336][SQL] Support INSERT INTO commands into tables
with DEFAULT columns
a364cc0 is described below
commit a364cc01716279d19eb5b43bde4961b04f5102af
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Mar 31 14:29:58 2022 +0800
[SPARK-38336][SQL] Support INSERT INTO commands into tables with DEFAULT
columns
### What changes were proposed in this pull request?
Extend INSERT INTO statements to support omitting default values or
referring to them explicitly with the DEFAULT keyword, in which case the Spark
analyzer will automatically insert the appropriate corresponding values in the
right places.
Example:
```
CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
INSERT INTO T VALUES (1);
INSERT INTO T VALUES (1, DEFAULT);
INSERT INTO T VALUES (DEFAULT, 6);
SELECT * FROM T;
(1, 5)
(1, 5)
(4, 6)
```
### Why are the changes needed?
This helps users issue INSERT INTO statements with less effort, and helps
people creating or updating tables to add custom optional columns for use in
specific circumstances as desired.
### How was this patch tested?
This change is covered by new and existing unit test coverage as well as
new INSERT INTO query test cases covering a variety of positive and negative
scenarios.
Closes #35982 from dtenedor/default-columns-insert-into.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 1 +
.../spark/sql/catalyst/analysis/Analyzer.scala | 1 +
.../catalyst/analysis/ResolveDefaultColumns.scala | 250 ++++++++++++++++++++
.../spark/sql/catalyst/parser/AstBuilder.scala | 5 +
.../sql/catalyst/rules/RuleIdCollection.scala | 1 +
...lumns.scala => ResolveDefaultColumnsUtil.scala} | 52 +++--
.../spark/sql/errors/QueryParsingErrors.scala | 5 +
.../org/apache/spark/sql/internal/SQLConf.scala | 14 ++
.../org/apache/spark/sql/SQLInsertTestSuite.scala | 30 ++-
.../org/apache/spark/sql/sources/InsertSuite.scala | 258 ++++++++++++++++++++-
.../org/apache/spark/sql/hive/InsertSuite.scala | 91 ++++----
11 files changed, 638 insertions(+), 70 deletions(-)
diff --git
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 17c3395..872ea53 100644
---
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -322,6 +322,7 @@ partitionSpec
partitionVal
: identifier (EQ constant)?
+ | identifier EQ DEFAULT
;
namespace
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f69f17d..bd437c3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -311,6 +311,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveAggregateFunctions ::
TimeWindowing ::
SessionWindowing ::
+ ResolveDefaultColumns(this, v1SessionCatalog) ::
ResolveInlineTables ::
ResolveLambdaVariables ::
ResolveTimeZone ::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
new file mode 100644
index 0000000..f4502c9
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
UnresolvedCatalogRelation}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This is a rule to process DEFAULT columns in statements such as
CREATE/REPLACE TABLE.
+ *
+ * Background: CREATE TABLE and ALTER TABLE invocations support setting column
default values for
+ * later operations. Following INSERT, and INSERT MERGE commands may then
reference the value
+ * using the DEFAULT keyword as needed.
+ *
+ * Example:
+ * CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
+ * INSERT INTO T VALUES (1, 2);
+ * INSERT INTO T VALUES (1, DEFAULT);
+ * INSERT INTO T VALUES (DEFAULT, 6);
+ * SELECT * FROM T;
+ * (1, 2)
+ * (1, 5)
+ * (4, 6)
+ *
+ * @param analyzer analyzer to use for processing DEFAULT values stored as
text.
+ * @param catalog the catalog to use for looking up the schema of INSERT INTO
table objects.
+ */
+case class ResolveDefaultColumns(
+ analyzer: Analyzer,
+ catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ // This field stores the enclosing INSERT INTO command, once we find one.
+ var enclosingInsert: Option[InsertIntoStatement] = None
+
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(
+ (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+ case i@InsertIntoStatement(_, _, _, _, _, _)
+ if i.query.collectFirst { case u: UnresolvedInlineTable => u }.isDefined
=>
+ enclosingInsert = Some(i)
+ i
+
+ case table: UnresolvedInlineTable
+ if enclosingInsert.isDefined &&
+ table.rows.nonEmpty && table.rows.forall(_.size == table.rows(0).size)
=>
+ val expanded: UnresolvedInlineTable =
addMissingDefaultColumnValues(table).getOrElse(table)
+ replaceExplicitDefaultColumnValues(analyzer, expanded).getOrElse(table)
+
+ case i@InsertIntoStatement(_, _, _, project: Project, _, _) =>
+ enclosingInsert = Some(i)
+ val expanded: Project =
addMissingDefaultColumnValues(project).getOrElse(project)
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultColumnValues(analyzer, expanded)
+ if (replaced.isDefined) i.copy(query = replaced.get) else i
+ }
+
+ // Helper method to check if an expression is an explicit DEFAULT column
reference.
+ private def isExplicitDefaultColumn(expr: Expression): Boolean = expr match {
+ case u: UnresolvedAttribute if
u.name.equalsIgnoreCase(CURRENT_DEFAULT_COLUMN_NAME) => true
+ case _ => false
+ }
+
+ /**
+ * Updates an inline table to generate missing default column values.
+ */
+ private def addMissingDefaultColumnValues(
+ table: UnresolvedInlineTable): Option[UnresolvedInlineTable] = {
+ assert(enclosingInsert.isDefined)
+ val numQueryOutputs: Int = table.rows(0).size
+ val schema = getInsertTableSchemaWithoutPartitionColumns.getOrElse(return
None)
+ val newDefaultExpressions: Seq[Expression] =
getDefaultExpressions(numQueryOutputs, schema)
+ val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map {
_.name }
+ if (newDefaultExpressions.nonEmpty) {
+ Some(table.copy(
+ names = table.names ++ newNames,
+ rows = table.rows.map { row => row ++ newDefaultExpressions }))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Adds a new expressions to a projection to generate missing default column
values.
+ */
+ private def addMissingDefaultColumnValues(project: Project): Option[Project]
= {
+ val numQueryOutputs: Int = project.projectList.size
+ val schema = getInsertTableSchemaWithoutPartitionColumns.getOrElse(return
None)
+ val newDefaultExpressions: Seq[Expression] =
getDefaultExpressions(numQueryOutputs, schema)
+ val newAliases: Seq[NamedExpression] =
+ newDefaultExpressions.zip(schema.fields).map {
+ case (expr, field) => Alias(expr, field.name)()
+ }
+ if (newDefaultExpressions.nonEmpty) {
+ Some(project.copy(projectList = project.projectList ++ newAliases))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * This is a helper for the addMissingDefaultColumnValues methods above.
+ */
+ private def getDefaultExpressions(numQueryOutputs: Int, schema: StructType):
Seq[Expression] = {
+ val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
+ val numDefaultExpressionsToAdd: Int = {
+ if (SQLConf.get.useNullsForMissingDefaultColumnValues) {
+ remainingFields.size
+ } else {
+
remainingFields.takeWhile(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)).size
+ }
+ }
+
Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
+ }
+
+ /**
+ * Replaces unresolved DEFAULT column references with corresponding values
in a logical plan.
+ */
+ private def replaceExplicitDefaultColumnValues(
+ analyzer: Analyzer,
+ input: LogicalPlan): Option[LogicalPlan] = {
+ assert(enclosingInsert.isDefined)
+ val schema = getInsertTableSchemaWithoutPartitionColumns.getOrElse(return
None)
+ val columnNames: Seq[String] = schema.fields.map { _.name }
+ val defaultExpressions: Seq[Expression] = schema.fields.map {
+ case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
+ analyze(analyzer, f, "INSERT")
+ case _ => Literal(null)
+ }
+ // Check the type of `input` and replace its expressions accordingly.
+ // If necessary, return a more descriptive error message if the user tries
to nest the DEFAULT
+ // column reference inside some other expression, such as DEFAULT + 1
(this is not allowed).
+ //
+ // Note that we don't need to check if
"SQLConf.get.useNullsForMissingDefaultColumnValues"
+ // after this point because this method only takes responsibility to
replace *existing*
+ // DEFAULT references. In contrast, the "getDefaultExpressions" method
will check that config
+ // and add new NULLs if needed.
+ input match {
+ case table: UnresolvedInlineTable =>
+ replaceExplicitDefaultColumnValues(defaultExpressions, table)
+ case project: Project =>
+ replaceExplicitDefaultColumnValues(defaultExpressions, columnNames,
project)
+ }
+ }
+
+ /**
+ * Replaces unresolved DEFAULT column references with corresponding values
in an inline table.
+ */
+ private def replaceExplicitDefaultColumnValues(
+ defaultExpressions: Seq[Expression],
+ table: UnresolvedInlineTable): Option[LogicalPlan] = {
+ var replaced = false
+ val newRows: Seq[Seq[Expression]] = {
+ table.rows.map { row: Seq[Expression] =>
+ for {
+ i <- 0 until row.size
+ expr = row(i)
+ defaultExpr = if (i < defaultExpressions.size) defaultExpressions(i)
else Literal(null)
+ } yield expr match {
+ case u: UnresolvedAttribute if isExplicitDefaultColumn(u) =>
+ replaced = true
+ defaultExpr
+ case expr@_ if expr.find { isExplicitDefaultColumn }.isDefined =>
+ throw new AnalysisException(DEFAULTS_IN_EXPRESSIONS_ERROR)
+ case _ => expr
+ }
+ }
+ }
+ if (replaced) {
+ Some(table.copy(rows = newRows))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Replaces unresolved DEFAULT column references with corresponding values
in an projection.
+ */
+ private def replaceExplicitDefaultColumnValues(
+ defaultExpressions: Seq[Expression],
+ colNames: Seq[String],
+ project: Project): Option[LogicalPlan] = {
+ var replaced = false
+ val updated: Seq[NamedExpression] = {
+ for {
+ i <- 0 until project.projectList.size
+ projectExpr = project.projectList(i)
+ defaultExpr = if (i < defaultExpressions.size) defaultExpressions(i)
else Literal(null)
+ colName = if (i < colNames.size) colNames(i) else ""
+ } yield projectExpr match {
+ case Alias(u: UnresolvedAttribute, _) if isExplicitDefaultColumn(u) =>
+ replaced = true
+ Alias(defaultExpr, colName)()
+ case u: UnresolvedAttribute if isExplicitDefaultColumn(u) =>
+ replaced = true
+ Alias(defaultExpr, colName)()
+ case expr@_ if expr.find { isExplicitDefaultColumn }.isDefined =>
+ throw new AnalysisException(DEFAULTS_IN_EXPRESSIONS_ERROR)
+ case _ => projectExpr
+ }
+ }
+ if (replaced) {
+ Some(project.copy(projectList = updated))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Looks up the schema for the table object of an INSERT INTO statement from
the catalog.
+ */
+ private def getInsertTableSchemaWithoutPartitionColumns: Option[StructType]
= {
+ assert(enclosingInsert.isDefined)
+ try {
+ val tableName = enclosingInsert.get.table match {
+ case r: UnresolvedRelation => TableIdentifier(r.name)
+ case r: UnresolvedCatalogRelation => r.tableMeta.identifier
+ case _ => return None
+ }
+ val lookup = catalog.lookupRelation(tableName)
+ lookup match {
+ case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
+ Some(StructType(r.tableMeta.schema.fields.dropRight(
+ enclosingInsert.get.partitionSpec.size)))
+ case _ => None
+ }
+ } catch {
+ case _: NoSuchTableException => None
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c7925c9..c435e31 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -480,6 +480,11 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef]
with SQLConfHelper wit
val legacyNullAsString =
conf.getConf(SQLConf.LEGACY_PARSE_NULL_PARTITION_SPEC_AS_STRING_LITERAL)
val parts = ctx.partitionVal.asScala.map { pVal =>
+ // Check if the query attempted to refer to a DEFAULT column value
within the PARTITION clause
+ // and return a specific error to help guide the user, since this is not
allowed.
+ if (pVal.DEFAULT != null) {
+ throw
QueryParsingErrors.defaultColumnReferencesNotAllowedInPartitionSpec(ctx)
+ }
val name = pVal.identifier.getText
val value = Option(pVal.constant).map(v => visitStringConstant(v,
legacyNullAsString))
name -> value
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index e36a76b..1204fa8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -81,6 +81,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.DeduplicateRelations" ::
"org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases" ::
"org.apache.spark.sql.catalyst.analysis.EliminateUnions" ::
+ "org.apache.spark.sql.catalyst.analysis.ResolveDefaultColumns" ::
"org.apache.spark.sql.catalyst.analysis.ResolveExpressionsWithNamePlaceholders"
::
"org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveCoalesceHints" ::
"org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveJoinStrategyHints"
::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
similarity index 78%
rename from
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
rename to
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 1e40756..b7db9be 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -38,20 +38,28 @@ object ResolveDefaultColumns {
val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
// This column metadata represents the default value for all existing rows
in a table after a
// column has been added. This value is determined at time of CREATE TABLE,
REPLACE TABLE, or
- // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for
this "exist default"
- // to be used by any scan when the columns in the source row are missing
data. For example,
- // consider the following sequence:
+ // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for
this "exist default" to
+ // be used by any scan when the columns in the source row are missing data.
For example, consider
+ // the following sequence:
// CREATE TABLE t (c1 INT)
// INSERT INTO t VALUES (42)
// ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
// SELECT c1, c2 FROM t
// In this case, the final query is expected to return 42, 43. The ALTER
TABLE ADD COLUMNS command
- // executed after there was already data in the table, so in order to
enforce this invariant,
- // we need either (1) an expensive backfill of value 43 at column c2 into
all previous rows, or
- // (2) indicate to each data source that selected columns missing data are
to generate the
+ // executed after there was already data in the table, so in order to
enforce this invariant, we
+ // need either (1) an expensive backfill of value 43 at column c2 into all
previous rows, or (2)
+ // indicate to each data source that selected columns missing data are to
generate the
// corresponding DEFAULT value instead. We choose option (2) for efficiency,
and represent this
// value as the text representation of a folded constant in the
"EXISTS_DEFAULT" column metadata.
val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+ // Name of attributes representing explicit references to the value stored
in the above
+ // CURRENT_DEFAULT_COLUMN_METADATA.
+ val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
+ // Return a more descriptive error message if the user tries to nest the
DEFAULT column reference
+ // inside some other expression, such as DEFAULT + 1 (this is not allowed).
+ val DEFAULTS_IN_EXPRESSIONS_ERROR = "Failed to execute INSERT INTO command
because the " +
+ "VALUES list contains a DEFAULT column reference as part of another
expression; this is " +
+ "not allowed"
/**
* Finds "current default" expressions in CREATE/REPLACE TABLE columns and
constant-folds them.
@@ -75,8 +83,8 @@ object ResolveDefaultColumns {
* data source then takes responsibility to provide the constant-folded
value in the
* EXISTS_DEFAULT metadata for such columns where the value is not present
in storage.
*
- * @param analyzer used for analyzing the result of parsing the column
expression stored as text.
- * @param tableSchema represents the names and types of the columns of the
statement to process.
+ * @param analyzer used for analyzing the result of parsing the
expression stored as text.
+ * @param tableSchema represents the names and types of the columns of the
statement to process.
* @param statementType name of the statement being processed, such as
INSERT; useful for errors.
* @return a copy of `tableSchema` with field metadata updated with the
constant-folded values.
*/
@@ -84,26 +92,28 @@ object ResolveDefaultColumns {
analyzer: Analyzer,
tableSchema: StructType,
statementType: String): StructType = {
- if (!SQLConf.get.enableDefaultColumns) {
- return tableSchema
- }
- val newFields: Seq[StructField] = tableSchema.fields.map { field =>
- if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
- val analyzed: Expression = analyze(analyzer, field, statementType)
- val newMetadata: Metadata = new
MetadataBuilder().withMetadata(field.metadata)
- .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
- field.copy(metadata = newMetadata)
- } else {
- field
+ if (SQLConf.get.enableDefaultColumns) {
+ val newFields: Seq[StructField] = tableSchema.fields.map { field =>
+ if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+ val analyzed: Expression = analyze(analyzer, field, statementType)
+ val newMetadata: Metadata = new
MetadataBuilder().withMetadata(field.metadata)
+ .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY,
analyzed.sql).build()
+ field.copy(metadata = newMetadata)
+ } else {
+ field
+ }
}
+ StructType(newFields)
+ } else {
+ tableSchema
}
- StructType(newFields)
}
/**
* Parses and analyzes the DEFAULT column text in `field`, returning an
error upon failure.
*
- * @param field represents the DEFAULT column value whose "default" metadata
to parse and analyze.
+ * @param field represents the DEFAULT column value whose "default"
metadata to parse
+ * and analyze.
* @param statementType which type of statement we are running, such as
INSERT; useful for errors.
* @return Result of the analysis and constant-folding operation.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index b13a530..69e118d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -452,4 +452,9 @@ object QueryParsingErrors {
def defaultColumnNotEnabledError(ctx: ParserRuleContext): Throwable = {
new ParseException("Support for DEFAULT column values is not allowed", ctx)
}
+
+ def defaultColumnReferencesNotAllowedInPartitionSpec(ctx:
ParserRuleContext): Throwable = {
+ new ParseException(
+ "References to DEFAULT column values are not allowed within the
PARTITION clause", ctx)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3e1872a..9aad649 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2816,6 +2816,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
+ buildConf("spark.sql.defaultColumn.useNullsForMissingDefautValues")
+ .internal()
+ .doc("When true, and DEFAULT columns are enabled, allow column
definitions lacking " +
+ "explicit default values to behave as if they had specified DEFAULT
NULL instead. " +
+ "For example, this allows most INSERT INTO statements to specify only
a prefix of the " +
+ "columns in the target table, and the remaining columns will receive
NULL values.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(false)
+
val ENFORCE_RESERVED_KEYWORDS =
buildConf("spark.sql.ansi.enforceReservedKeywords")
.doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser
enforces the ANSI " +
"reserved keywords and forbids SQL queries that use reserved keywords as
alias names " +
@@ -4318,6 +4329,9 @@ class SQLConf extends Serializable with Logging {
def enableDefaultColumns: Boolean = getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)
+ def useNullsForMissingDefaultColumnValues: Boolean =
+ getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES)
+
def enforceReservedKeywords: Boolean = ansiEnabled &&
getConf(ENFORCE_RESERVED_KEYWORDS)
def strictIndexOperator: Boolean = ansiEnabled &&
getConf(ANSI_STRICT_INDEX_OPERATOR)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index fad01db..274da57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -171,25 +171,36 @@ trait SQLInsertTestSuite extends QueryTest with
SQLTestUtils {
}
test("insert with column list - mismatched column list size") {
- val msg = "Cannot write to table due to mismatched user specified column
size"
- withTable("t1") {
- val cols = Seq("c1", "c2", "c3")
- createTable("t1", cols, Seq("int", "long", "string"))
- val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2)
values(1, 2, 3)"))
- assert(e1.getMessage.contains(msg))
- val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3)
values(1, 2)"))
- assert(e2.getMessage.contains(msg))
+ val msgs = Seq("Cannot write to table due to mismatched user specified
column size",
+ "expected 3 columns but found")
+ def test: Unit = {
+ withTable("t1") {
+ val cols = Seq("c1", "c2", "c3")
+ createTable("t1", cols, Seq("int", "long", "string"))
+ val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2)
values(1, 2, 3)"))
+ assert(e1.getMessage.contains(msgs(0)) ||
e1.getMessage.contains(msgs(1)))
+ val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2,
c3) values(1, 2)"))
+ assert(e2.getMessage.contains(msgs(0)) ||
e2.getMessage.contains(msgs(1)))
+ }
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ test
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+ test
}
}
test("insert with column list - mismatched target table out size after
rewritten query") {
- val v2Msg = "Cannot write to 'testcat.t1', not enough data columns:"
+ val v2Msg = "expected 2 columns but found"
val cols = Seq("c1", "c2", "c3", "c4")
withTable("t1") {
createTable("t1", cols, Seq.fill(4)("int"))
val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1)
values(1)"))
assert(e1.getMessage.contains("target table has 4 column(s) but the
inserted data has 1") ||
+ e1.getMessage.contains("expected 4 columns but found 1") ||
+ e1.getMessage.contains("not enough data columns") ||
e1.getMessage.contains(v2Msg))
}
@@ -199,6 +210,7 @@ trait SQLInsertTestSuite extends QueryTest with
SQLTestUtils {
sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
}
assert(e1.getMessage.contains("target table has 4 column(s) but the
inserted data has 3") ||
+ e1.getMessage.contains("not enough data columns") ||
e1.getMessage.contains(v2Msg))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 1fb4737..2483055 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -27,7 +27,9 @@ import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
@@ -405,7 +407,7 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
test("SPARK-15824 - Execute an INSERT wrapped in a WITH statement
immediately") {
- withTable("target", "target2") {
+ def test: Unit = withTable("target", "target2") {
sql(s"CREATE TABLE target(a INT, b STRING) USING JSON")
sql("WITH tbl AS (SELECT * FROM jt) INSERT OVERWRITE TABLE target SELECT
a, b FROM tbl")
checkAnswer(
@@ -426,6 +428,12 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
sql("SELECT a, b FROM jt")
)
}
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+ test
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ test
+ }
}
test("SPARK-21203 wrong results of insertion of Array of Struct") {
@@ -849,6 +857,254 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
}
+ test("INSERT INTO statements with tables with default columns: positive
tests") {
+ // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is
enabled, and no
+ // explicit DEFAULT value is available when the INSERT INTO statement
provides fewer
+ // values than expected, NULL values are appended in their place.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"true") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ sql("insert into t values(true)")
+ checkAnswer(sql("select s from t where i = true"), Seq(Row(null)))
+ }
+ }
+ // The default value for the DEFAULT keyword is the NULL literal.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ sql("insert into t values(true, default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(null).map(i =>
Row(i)))
+ }
+ // There is a complex expression in the default value.
+ withTable("t") {
+ sql("create table t(i boolean, s string default concat('abc', 'def'))
using parquet")
+ sql("insert into t values(true, default)")
+ checkAnswer(sql("select s from t where i = true"), Seq("abcdef").map(i
=> Row(i)))
+ }
+ // The default value parses correctly and the provided value type is
different but coercible.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ sql("insert into t values(false)")
+ checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i =>
Row(i)))
+ }
+ // There are two trailing default values referenced implicitly by the
INSERT INTO statement.
+ withTable("t") {
+ sql("create table t(i int, s bigint default 42, x bigint default 43)
using parquet")
+ sql("insert into t values(1)")
+ checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i =>
Row(i)))
+ }
+ // The table has a partitioning column and a default value is injected.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint, q int default 42 ) using
parquet partitioned by (i)")
+ sql("insert into t partition(i='true') values(5, default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(5).map(i =>
Row(i)))
+ }
+ // The table has a partitioning column and a default value is added per an
explicit reference.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet
partitioned by (i)")
+ sql("insert into t partition(i='true') values(default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ // The default value parses correctly as a constant but non-literal
expression.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 41 + 1) using parquet")
+ sql("insert into t values(false, default)")
+ checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i =>
Row(i)))
+ }
+ // Explicit defaults may appear in different positions within the inline
table provided as input
+ // to the INSERT INTO statement.
+ withTable("t") {
+ sql("create table t(i boolean default false, s bigint default 42) using
parquet")
+ sql("insert into t values(false, default), (default, 42)")
+ checkAnswer(sql("select s from t where i = false"), Seq(42L, 42L).map(i
=> Row(i)))
+ }
+ // There is an explicit default value provided in the INSERT INTO
statement in the VALUES,
+ // with an alias over the VALUES.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ sql("insert into t select * from values (false, default) as tab(col,
other)")
+ checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i =>
Row(i)))
+ }
+ // The explicit default value arrives first before the other value.
+ withTable("t") {
+ sql("create table t(i boolean default false, s bigint) using parquet")
+ sql("insert into t values (default, 43)")
+ checkAnswer(sql("select s from t where i = false"), Seq(43L).map(i =>
Row(i)))
+ }
+ // The 'create table' statement provides the default parameter first.
+ withTable("t") {
+ sql("create table t(i boolean default false, s bigint) using parquet")
+ sql("insert into t values (default, 43)")
+ checkAnswer(sql("select s from t where i = false"), Seq(43L).map(i =>
Row(i)))
+ }
+ // The explicit default value is provided in the wrong order (first
instead of second), but
+ // this is OK because the provided default value evaluates to literal NULL.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ sql("insert into t values (default, 43)")
+ checkAnswer(sql("select s from t where i is null"), Seq(43L).map(i =>
Row(i)))
+ }
+ // There is an explicit default value provided in the INSERT INTO
statement as a SELECT.
+ // This is supported.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ sql("insert into t select false, default")
+ checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i =>
Row(i)))
+ }
+ // There is a complex query plan in the SELECT query in the INSERT INTO
statement.
+ withTable("t") {
+ sql("create table t(i boolean default false, s bigint default 42) using
parquet")
+ sql("insert into t select col, count(*) from values (default, default) "
+
+ "as tab(col, other) group by 1")
+ checkAnswer(sql("select s from t where i = false"), Seq(1).map(i =>
Row(i)))
+ }
+ // The explicit default reference resolves successfully with nested table
subqueries.
+ withTable("t") {
+ sql("create table t(i boolean default false, s bigint) using parquet")
+ sql("insert into t select * from (select * from values(default, 42))")
+ checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i =>
Row(i)))
+ }
+ // There are three column types exercising various combinations of
implicit and explicit
+ // default column value references in the 'insert into' statements. Note
these tests depend on
+ // enabling the configuration to use NULLs for missing DEFAULT column
values.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"true") {
+ withTable("t1", "t2") {
+ sql("create table t1(j int, s bigint default 42, x bigint default 43)
using parquet")
+ sql("insert into t1 values(1)")
+ sql("insert into t1 values(2, default)")
+ sql("insert into t1 values(3, default, default)")
+ sql("insert into t1 values(4, 44)")
+ sql("insert into t1 values(5, 44, 45)")
+ sql("create table t2(j int, s bigint default 42, x bigint default 43)
using parquet")
+ sql("insert into t2 select j from t1 where j = 1")
+ sql("insert into t2 select j, default from t1 where j = 2")
+ sql("insert into t2 select j, default, default from t1 where j = 3")
+ sql("insert into t2 select j, s from t1 where j = 4")
+ sql("insert into t2 select j, s, default from t1 where j = 5")
+ val resultSchema = new StructType()
+ .add("s", LongType, false)
+ .add("x", LongType, false)
+ checkAnswer(
+ sql("select j, s, x from t2 order by j, s, x"),
+ Seq(
+ new GenericRowWithSchema(Array(1, 42L, 43L), resultSchema),
+ new GenericRowWithSchema(Array(2, 42L, 43L), resultSchema),
+ new GenericRowWithSchema(Array(3, 42L, 43L), resultSchema),
+ new GenericRowWithSchema(Array(4, 44L, 43L), resultSchema),
+ new GenericRowWithSchema(Array(5, 44L, 43L), resultSchema)))
+ }
+ }
+ }
+
+ test("INSERT INTO statements with tables with default columns: negative
tests") {
+ object Errors {
+ val COMMON_SUBSTRING = " has a DEFAULT value"
+ val COLUMN_DEFAULT_NOT_FOUND = "Column 'default' does not exist"
+ val BAD_SUBQUERY = "cannot evaluate expression scalarsubquery() in
inline table definition"
+ }
+ // The default value fails to analyze.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default badvalue) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values (default, default)")
+ }.getMessage.contains(Errors.COMMON_SUBSTRING))
+ }
+ // The default value analyzes to a table not in the catalog.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default (select min(x) from
badtable)) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values (default, default)")
+ }.getMessage.contains(Errors.COMMON_SUBSTRING))
+ }
+ // The default value parses but refers to a table from the catalog.
+ withTable("t", "other") {
+ sql("create table other(x string) using parquet")
+ sql("create table t(i boolean, s bigint default (select min(x) from
other)) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values (default, default)")
+ }.getMessage.contains(Errors.COMMON_SUBSTRING))
+ }
+ // The default value has an explicit alias. It fails to evaluate when
inlined into the VALUES
+ // list at the INSERT INTO time.
+ withTable("t") {
+ sql("create table t(i boolean default (select false as alias), s bigint)
using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values (default, default)")
+ }.getMessage.contains(Errors.BAD_SUBQUERY))
+ }
+ // Explicit default values may not participate in complex expressions in
the VALUES list.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values(false, default + 1)")
+
}.getMessage.contains(ResolveDefaultColumns.DEFAULTS_IN_EXPRESSIONS_ERROR))
+ }
+ // Explicit default values may not participate in complex expressions in
the SELECT query.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t select false, default + 1")
+
}.getMessage.contains(ResolveDefaultColumns.DEFAULTS_IN_EXPRESSIONS_ERROR))
+ }
+ // Explicit default values have a reasonable error path if the table is
not found.
+ withTable("t") {
+ assert(intercept[AnalysisException] {
+ sql("insert into t values(false, default)")
+ }.getMessage.contains(Errors.COLUMN_DEFAULT_NOT_FOUND))
+ }
+ // The default value parses but the type is not coercible.
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default false) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values (default, default)")
+ }.getMessage.contains("provided a value of incompatible type"))
+ }
+ // The number of columns in the INSERT INTO statement is greater than the
number of columns in
+ // the table.
+ withTable("t") {
+ sql("create table num_data(id int, val decimal(38,10)) using parquet")
+ sql("create table t(id1 int, int2 int, result decimal(38,10)) using
parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val *
t2.val " +
+ "from num_data t1, num_data t2")
+ }.getMessage.contains(
+ "requires that the data to be inserted have the same number of columns
as the target"))
+ }
+ // The default value is disabled per configuration.
+ withTable("t") {
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ assert(intercept[AnalysisException] {
+ sql("create table t(i boolean, s bigint default 42L) using parquet")
+ }.getMessage.contains("Support for DEFAULT column values is not
allowed"))
+ }
+ }
+ // There is one trailing default value referenced implicitly by the INSERT
INTO statement.
+ withTable("t") {
+ sql("create table t(i int, s bigint default 42, x bigint) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values(1)")
+ }.getMessage.contains("expected 3 columns but found"))
+ }
+ // The table has a partitioning column with a default value; this is not
allowed.
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint, q int default 42 )
" +
+ "using parquet partitioned by (i)")
+ assert(intercept[ParseException] {
+ sql("insert into t partition(i=default) values(5, default)")
+ }.getMessage.contains(
+ "References to DEFAULT column values are not allowed within the
PARTITION clause"))
+ }
+ // The configuration option to append missing NULL values to the end of
the INSERT INTO
+ // statement is not enabled.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ assert(intercept[AnalysisException] {
+ sql("insert into t values(true)")
+ }.getMessage.contains("target table has 2 column(s) but the inserted
data has 1 column(s)"))
+ }
+ }
+ }
+
test("Stop task set if FileAlreadyExistsException was thrown") {
Seq(true, false).foreach { fastFail =>
withSQLConf("fs.file.impl" ->
classOf[FileExistingTestFileSystem].getName,
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 4dc08a0e..a7148e9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -857,52 +857,65 @@ class InsertSuite extends QueryTest with
TestHiveSingleton with BeforeAndAfter
}
test("SPARK-35531: Insert data with different cases of bucket column") {
- withTable("test1") {
- Seq(true, false).foreach { isHiveTable =>
- val createSpark = if (isHiveTable) {
- """
- |CREATE TABLE TEST1(
- |v1 BIGINT,
- |s1 INT)
- |PARTITIONED BY (pk BIGINT)
- |CLUSTERED BY (v1)
- |SORTED BY (s1)
- |INTO 200 BUCKETS
- |STORED AS PARQUET
- """.stripMargin
- } else {
- """
- |CREATE TABLE test1(
- |v1 BIGINT,
- |s1 INT)
- |USING PARQUET
- |PARTITIONED BY (pk BIGINT)
- |CLUSTERED BY (v1)
- |SORTED BY (s1)
- |INTO 200 BUCKETS
- """.stripMargin
- }
+ def testDefaultColumn: Unit = {
+ withTable("test1") {
+ Seq(true, false).foreach { isHiveTable =>
+ val createSpark = if (isHiveTable) {
+ """
+ |CREATE TABLE TEST1(
+ |v1 BIGINT,
+ |s1 INT)
+ |PARTITIONED BY (pk BIGINT)
+ |CLUSTERED BY (v1)
+ |SORTED BY (s1)
+ |INTO 200 BUCKETS
+ |STORED AS PARQUET
+ """.stripMargin
+ } else {
+ """
+ |CREATE TABLE test1(
+ |v1 BIGINT,
+ |s1 INT)
+ |USING PARQUET
+ |PARTITIONED BY (pk BIGINT)
+ |CLUSTERED BY (v1)
+ |SORTED BY (s1)
+ |INTO 200 BUCKETS
+ """.stripMargin
+ }
- val insertString =
- """
- |INSERT INTO test1
- |SELECT * FROM VALUES(1,1,1)
- """.stripMargin
+ val insertString =
+ """
+ |INSERT INTO test1
+ |SELECT * FROM VALUES(1,1,1)
+ """.stripMargin
- val dropString = "DROP TABLE IF EXISTS test1"
+ val dropString = "DROP TABLE IF EXISTS test1"
- sql(dropString)
- sql(createSpark.toLowerCase(Locale.ROOT))
+ sql(dropString)
+ sql(createSpark.toLowerCase(Locale.ROOT))
- sql(insertString.toLowerCase(Locale.ROOT))
- sql(insertString.toUpperCase(Locale.ROOT))
+ sql(insertString.toLowerCase(Locale.ROOT))
+ sql(insertString.toUpperCase(Locale.ROOT))
- sql(dropString)
- sql(createSpark.toUpperCase(Locale.ROOT))
+ sql(dropString)
+ sql(createSpark.toUpperCase(Locale.ROOT))
- sql(insertString.toLowerCase(Locale.ROOT))
- sql(insertString.toUpperCase(Locale.ROOT))
+ sql(insertString.toLowerCase(Locale.ROOT))
+ sql(insertString.toUpperCase(Locale.ROOT))
+ }
}
}
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ testDefaultColumn
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+ SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
+ testDefaultColumn
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+ SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+ testDefaultColumn
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]