This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c37d7dec8aa [SPARK-44271][SQL] Move default values functions from StructType to ResolveDefaultColumns c37d7dec8aa is described below commit c37d7dec8aa4d703b6dac9b9d60ff25d9d5dc665 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Mon Jul 10 06:23:03 2023 -0400 [SPARK-44271][SQL] Move default values functions from StructType to ResolveDefaultColumns ### What changes were proposed in this pull request? Move default values functions from StructType to ResolveDefaultColumns. ### Why are the changes needed? To simply DataType interface. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test Closes #41820 from amaliujia/clean_up_left_errors. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../spark/sql/catalyst/json/JacksonParser.scala | 7 +++-- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 32 ++++++++++++++++------ .../org/apache/spark/sql/types/StructType.scala | 8 ------ .../apache/spark/sql/types/StructTypeSuite.scala | 16 +++++------ .../datasources/orc/OrcColumnarBatchReader.java | 3 +- .../parquet/VectorizedParquetRecordReader.java | 3 +- .../datasources/orc/OrcDeserializer.scala | 15 ++++++---- .../datasources/parquet/ParquetRowConverter.scala | 15 ++++++---- 9 files changed, 58 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index b58649da61c..a02d57c0bc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -353,7 +353,7 @@ class UnivocityParser( case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) // Use the corresponding DEFAULT value associated with the column, if any. - row.update(i, requiredSchema.existenceDefaultValues(i)) + row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i)) } i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 5286e16b088..03dce431837 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -448,14 +448,15 @@ class JacksonParser( var skipRow = false structFilters.reset() - resetExistenceDefaultsBitmask(schema) + lazy val bitmask = ResolveDefaultColumns.existenceDefaultsBitmask(schema) + resetExistenceDefaultsBitmask(schema, bitmask) while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => try { row.update(index, fieldConverters(index).apply(parser)) skipRow = structFilters.skipRow(row, index) - schema.existenceDefaultsBitmask(index) = false + bitmask(index) = false } catch { case e: SparkUpgradeException => throw e case NonFatal(e) if isRoot || enablePartialResults => @@ -469,7 +470,7 @@ class JacksonParser( if (skipRow) { None } else if (badRecordException.isEmpty) { - applyExistenceDefaultValuesToRow(schema, row) + applyExistenceDefaultValuesToRow(schema, row, bitmask) Some(row) } else { throw PartialResultException(row, badRecordException.get) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 26efa8c8df2..6489fb9aaaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -396,27 +396,30 @@ object ResolveDefaultColumns extends QueryErrorsBase { * above, for convenience. */ def getExistenceDefaultsBitmask(schema: StructType): Array[Boolean] = { - Array.fill[Boolean](schema.existenceDefaultValues.size)(true) + Array.fill[Boolean](existenceDefaultValues(schema).size)(true) } /** * Resets the elements of the array initially returned from [[getExistenceDefaultsBitmask]] above. * Afterwards, set element(s) to false before calling [[applyExistenceDefaultValuesToRow]] below. */ - def resetExistenceDefaultsBitmask(schema: StructType): Unit = { - for (i <- 0 until schema.existenceDefaultValues.size) { - schema.existenceDefaultsBitmask(i) = (schema.existenceDefaultValues(i) != null) + def resetExistenceDefaultsBitmask(schema: StructType, bitmask: Array[Boolean]): Unit = { + val defaultValues = existenceDefaultValues(schema) + for (i <- 0 until defaultValues.size) { + bitmask(i) = (defaultValues(i) != null) } } /** * Updates a subset of columns in the row with default values from the metadata in the schema. */ - def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow): Unit = { - if (schema.hasExistenceDefaultValues) { - for (i <- 0 until schema.existenceDefaultValues.size) { - if (schema.existenceDefaultsBitmask(i)) { - row.update(i, schema.existenceDefaultValues(i)) + def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow, + bitmask: Array[Boolean]): Unit = { + val existingValues = existenceDefaultValues(schema) + if (hasExistenceDefaultValues(schema)) { + for (i <- 0 until existingValues.size) { + if (bitmask(i)) { + row.update(i, existingValues(i)) } } } @@ -437,6 +440,17 @@ object ResolveDefaultColumns extends QueryErrorsBase { rows.toSeq } + /** + * These define existence default values for the struct fields for efficiency purposes. + * The caller should avoid using such methods in a loop for efficiency. + */ + def existenceDefaultValues(schema: StructType): Array[Any] = + getExistenceDefaultValues(schema) + def existenceDefaultsBitmask(schema: StructType): Array[Boolean] = + getExistenceDefaultsBitmask(schema) + def hasExistenceDefaultValues(schema: StructType): Boolean = + existenceDefaultValues(schema).exists(_ != null) + /** * This is an Analyzer for processing default column values using built-in functions only. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 56e1356aec7..5eea207d15b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.parser.{DataTypeParser, LegacyTypeStringPar import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{SparkStringUtils, StringConcat} -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.util.collection.Utils @@ -498,13 +497,6 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = { f(this) || fields.exists(field => field.dataType.existsRecursively(f)) } - - /** - * These define and cache existence default values for the struct fields for efficiency purposes. - */ - private[sql] lazy val existenceDefaultValues: Array[Any] = getExistenceDefaultValues(this) - private[sql] lazy val existenceDefaultsBitmask: Array[Boolean] = getExistenceDefaultsBitmask(this) - private[sql] lazy val hasExistenceDefaultValues = existenceDefaultValues.exists(_ != null) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala index 1f4d8311540..6eedd7f9b6f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala @@ -538,10 +538,10 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'") .build()), StructField("c3", BooleanType))) - assert(source1.existenceDefaultValues.size == 3) - assert(source1.existenceDefaultValues(0) == 42) - assert(source1.existenceDefaultValues(1) == UTF8String.fromString("abc")) - assert(source1.existenceDefaultValues(2) == null) + assert(ResolveDefaultColumns.existenceDefaultValues(source1).size == 3) + assert(ResolveDefaultColumns.existenceDefaultValues(source1)(0) == 42) + assert(ResolveDefaultColumns.existenceDefaultValues(source1)(1) == UTF8String.fromString("abc")) + assert(ResolveDefaultColumns.existenceDefaultValues(source1)(2) == null) // Positive test: StructType.defaultValues works because the existence default value parses and // resolves successfully, then evaluates to a non-literal expression: this is constant-folded at @@ -553,8 +553,8 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "1 + 1") .build()))) val error = "fails to parse as a valid literal value" - assert(source2.existenceDefaultValues.size == 1) - assert(source2.existenceDefaultValues(0) == 2) + assert(ResolveDefaultColumns.existenceDefaultValues(source2).size == 1) + assert(ResolveDefaultColumns.existenceDefaultValues(source2)(0) == 2) // Negative test: StructType.defaultValues fails because the existence default value fails to // parse. @@ -565,7 +565,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid") .build()))) assert(intercept[AnalysisException] { - source3.existenceDefaultValues + ResolveDefaultColumns.existenceDefaultValues(source3) }.getMessage.contains(error)) // Negative test: StructType.defaultValues fails because the existence default value fails to @@ -581,7 +581,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { "(SELECT 'abc' FROM missingtable)") .build()))) assert(intercept[AnalysisException] { - source4.existenceDefaultValues + ResolveDefaultColumns.existenceDefaultValues(source4) }.getMessage.contains(error)) } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index 97f9d47d095..b6184baa2e0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -32,6 +32,7 @@ import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcInputFormat; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ConstantColumnVector; @@ -179,7 +180,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); // Check if the missing column has an associated default value in the schema metadata. // If so, fill the corresponding column vector with the value. - Object defaultValue = requiredSchema.existenceDefaultValues()[i]; + Object defaultValue = ResolveDefaultColumns.existenceDefaultValues(requiredSchema)[i]; if (defaultValue == null) { missingCol.putNulls(0, capacity); } else if (!missingCol.appendObjects(capacity, defaultValue).isPresent()) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 25712367c23..0f0455c0d8e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import scala.Option; import scala.collection.JavaConverters; @@ -280,7 +281,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa for (int i = 0; i < columnVectors.length; i++) { Object defaultValue = null; if (sparkRequestedSchema != null) { - defaultValue = sparkRequestedSchema.existenceDefaultValues()[i]; + defaultValue = ResolveDefaultColumns.existenceDefaultValues(sparkRequestedSchema)[i]; } columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i), (WritableColumnVector) vectors[i], capacity, memMode, missingColumns, true, defaultValue); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index 5bac404fd53..795c2618af1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -37,6 +37,8 @@ class OrcDeserializer( private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + private lazy val bitmask = ResolveDefaultColumns.existenceDefaultsBitmask(requiredSchema) + // `fieldWriters(index)` is // - null if the respective source column is missing, since the output value // is always null in this case @@ -46,13 +48,14 @@ class OrcDeserializer( // ADD COLUMN c DEFAULT <value>" on the Orc table, this adds one field to the Catalyst schema. // Then if we query the old files with the new Catalyst schema, we should only apply the // existence default value to the columns whose IDs are not explicitly requested. - if (requiredSchema.hasExistenceDefaultValues) { - for (i <- 0 until requiredSchema.existenceDefaultValues.size) { - requiredSchema.existenceDefaultsBitmask(i) = + val existingValues = ResolveDefaultColumns.existenceDefaultValues(requiredSchema) + if (ResolveDefaultColumns.hasExistenceDefaultValues(requiredSchema)) { + for (i <- 0 until existingValues.size) { + bitmask(i) = if (requestedColIds(i) != -1) { false } else { - requiredSchema.existenceDefaultValues(i) != null + existingValues(i) != null } } } @@ -81,7 +84,7 @@ class OrcDeserializer( } targetColumnIndex += 1 } - applyExistenceDefaultValuesToRow(requiredSchema, resultRow) + applyExistenceDefaultValuesToRow(requiredSchema, resultRow, bitmask) resultRow } @@ -98,7 +101,7 @@ class OrcDeserializer( } targetColumnIndex += 1 } - applyExistenceDefaultValuesToRow(requiredSchema, resultRow) + applyExistenceDefaultValuesToRow(requiredSchema, resultRow, bitmask) resultRow } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ba0314f3be3..e257be3d189 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{PhysicalByteType, PhysicalShortType} -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData, ResolveDefaultColumns} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -188,11 +188,13 @@ private[parquet] class ParquetRowConverter( private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType)) + private[this] lazy val bitmask = ResolveDefaultColumns.existenceDefaultsBitmask(catalystType) + /** * The [[InternalRow]] converted from an entire Parquet record. */ def currentRecord: InternalRow = { - applyExistenceDefaultValuesToRow(catalystType, currentRow) + applyExistenceDefaultValuesToRow(catalystType, currentRow, bitmask) currentRow } @@ -230,9 +232,10 @@ private[parquet] class ParquetRowConverter( } // If any fields in the Catalyst result schema have associated existence default values, // maintain a boolean array to track which fields have been explicitly assigned for each row. - if (catalystType.hasExistenceDefaultValues) { - for (i <- 0 until catalystType.existenceDefaultValues.size) { - catalystType.existenceDefaultsBitmask(i) = + if (ResolveDefaultColumns.hasExistenceDefaultValues(catalystType)) { + val existingValues = ResolveDefaultColumns.existenceDefaultValues(catalystType) + for (i <- 0 until existingValues.size) { + bitmask(i) = // Assume the schema for a Parquet file-based table contains N fields. Then if we later // run a command "ALTER TABLE t ADD COLUMN c DEFAULT <value>" on the Parquet table, this // adds one field to the Catalyst schema. Then if we query the old files with the new @@ -240,7 +243,7 @@ private[parquet] class ParquetRowConverter( if (i < parquetType.getFieldCount) { false } else { - catalystType.existenceDefaultValues(i) != null + existingValues(i) != null } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org