This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8de53571e0 [HUDI-5346][HUDI-5320] Fixing Create Table as Select (CTAS)
performance gaps (#7370)
8de53571e0 is described below
commit 8de53571e0e1e7ea63c7ecf7b646da64eceaef8c
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Thu Dec 8 18:52:47 2022 -0800
[HUDI-5346][HUDI-5320] Fixing Create Table as Select (CTAS) performance
gaps (#7370)
This PR is addressing some of the performance traps detected while
stress-testing Spark SQL's Create Table as Select command:
Avoids reordering of the columns w/in CTAS (there's no need for it,
InsertIntoTableCommand will be resolving columns anyway)
Fixing validation sequence w/in InsertIntoTableCommand to first resolve the
columns and then run validation (currently it's done the other way around)
Propagating properties specified in CTAS to the HoodieSparkSqlWriter (for
ex, currently there's no way to disable MT when using CTAS precisely b/c of the
fact that these properties are not propagated)
Additionally following improvements to HoodieBulkInsertHelper were made:
Now if meta-fields are disabled, we won't be dereferencing incoming Dataset
into RDD and instead simply add stubbed out meta-fields t/h additional
Projection
---
.../hudi/HoodieDatasetBulkInsertHelper.scala | 68 ++++++++++---------
.../org/apache/spark/sql/HoodieUnsafeUtils.scala | 12 +++-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 7 +-
.../command/CreateHoodieTableAsSelectCommand.scala | 76 +++++++++-------------
.../command/InsertIntoHoodieTableCommand.scala | 3 +-
.../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 4 +-
.../apache/spark/sql/hudi/TestInsertTable.scala | 22 ++++---
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 11 +++-
8 files changed, 106 insertions(+), 97 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 296abaf4f5..79fa67acdb 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -33,7 +33,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath,
getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.types.{DataType, StringType, StructField,
StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
@@ -58,31 +60,6 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val populateMetaFields = config.populateMetaFields()
val schema = df.schema
- val keyGeneratorClassName =
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
- "Key-generator class name is required")
-
- val prependedRdd: RDD[InternalRow] =
- df.queryExecution.toRdd.mapPartitions { iter =>
- val keyGenerator =
- ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps))
- .asInstanceOf[SparkKeyGeneratorInterface]
-
- iter.map { row =>
- val (recordKey, partitionPath) =
- if (populateMetaFields) {
- (keyGenerator.getRecordKey(row, schema),
keyGenerator.getPartitionPath(row, schema))
- } else {
- (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
- }
- val commitTimestamp = UTF8String.EMPTY_UTF8
- val commitSeqNo = UTF8String.EMPTY_UTF8
- val filename = UTF8String.EMPTY_UTF8
-
- // TODO use mutable row, avoid re-allocating
- new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey,
partitionPath, filename, row, false)
- }
- }
-
val metaFields = Seq(
StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val updatedSchema = StructType(metaFields ++ schema.fields)
- val updatedDF = if (populateMetaFields &&
config.shouldCombineBeforeInsert) {
- val dedupedRdd = dedupeRows(prependedRdd, updatedSchema,
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+ val updatedDF = if (populateMetaFields) {
+ val keyGeneratorClassName =
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+ "Key-generator class name is required")
+
+ val prependedRdd: RDD[InternalRow] =
+ df.queryExecution.toRdd.mapPartitions { iter =>
+ val keyGenerator =
+ ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps))
+ .asInstanceOf[SparkKeyGeneratorInterface]
+
+ iter.map { row =>
+ val recordKey = keyGenerator.getRecordKey(row, schema)
+ val partitionPath = keyGenerator.getPartitionPath(row, schema)
+ val commitTimestamp = UTF8String.EMPTY_UTF8
+ val commitSeqNo = UTF8String.EMPTY_UTF8
+ val filename = UTF8String.EMPTY_UTF8
+
+ // TODO use mutable row, avoid re-allocating
+ new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey,
partitionPath, filename, row, false)
+ }
+ }
+
+ val dedupedRdd = if (config.shouldCombineBeforeInsert) {
+ dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField,
SparkHoodieIndexFactory.isGlobalIndex(config))
+ } else {
+ prependedRdd
+ }
+
HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd,
updatedSchema)
} else {
- HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd,
updatedSchema)
+ // NOTE: In cases when we're not populating meta-fields we actually don't
+ // need access to the [[InternalRow]] and therefore can avoid the
need
+ // to dereference [[DataFrame]] into [[RDD]]
+ val query = df.queryExecution.logical
+ val metaFieldsStubs = metaFields.map(f =>
Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)())
+ val prependedQuery = Project(metaFieldsStubs ++ query.output, query)
+
+ HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
}
val trimmedDF = if (shouldDropPartitionColumns) {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index edf05f2db2..c981cd8113 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -30,6 +30,15 @@ import org.apache.spark.util.MutablePair
*/
object HoodieUnsafeUtils {
+ /**
+ * Creates [[DataFrame]] from provided [[plan]]
+ *
+ * @param spark spark's session
+ * @param plan given plan to wrap into [[DataFrame]]
+ */
+ def createDataFrameFrom(spark: SparkSession, plan: LogicalPlan): DataFrame =
+ Dataset.ofRows(spark, plan)
+
/**
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with
provided [[schema]]
*
@@ -39,7 +48,6 @@ object HoodieUnsafeUtils {
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
- * @return
*/
def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema:
StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes,
rows))
@@ -53,7 +61,6 @@ object HoodieUnsafeUtils {
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
- * @return
*/
def createDataFrameFromInternalRows(spark: SparkSession, rows:
Seq[InternalRow], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
@@ -65,7 +72,6 @@ object HoodieUnsafeUtils {
* @param spark spark's session
* @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
- * @return
*/
def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow],
schema: StructType): DataFrame =
spark.internalCreateDataFrame(rdd, schema)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 97c2c805cc..faa6fada13 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -66,7 +66,6 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-import scala.util.matching.Regex
object HoodieSparkSqlWriter {
@@ -121,6 +120,7 @@ object HoodieSparkSqlWriter {
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
// Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
// or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
@@ -749,8 +749,7 @@ object HoodieSparkSqlWriter {
val userDefinedBulkInsertPartitionerOpt =
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
- }
- else {
+ } else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
}
} else {
@@ -842,7 +841,7 @@ object HoodieSparkSqlWriter {
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key,
SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key,
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
- //Collect exceptions in list because we want all sync to run. Then we
can throw
+ // Collect exceptions in list because we want all sync to run. Then we
can throw
val metaSyncExceptions = new ListBuffer[HoodieException]()
syncClientToolClassSet.foreach(impl => {
try {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index 6a93f0c7af..1f8d009530 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -19,16 +19,15 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-
import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils
-
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, HoodieCatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType,
HoodieCatalogTable}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import scala.collection.JavaConverters._
@@ -43,8 +42,8 @@ case class CreateHoodieTableAsSelectCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
- assert(table.tableType != CatalogTableType.VIEW)
- assert(table.provider.isDefined)
+ checkState(table.tableType != CatalogTableType.VIEW)
+ checkState(table.provider.isDefined)
val hasQueryAsProp = (table.storage.properties ++
table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
if (hasQueryAsProp) {
@@ -53,11 +52,11 @@ case class CreateHoodieTableAsSelectCommand(
val sessionState = sparkSession.sessionState
val db =
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = table.identifier.copy(database = Some(db))
- val tableName = tableIdentWithDB.unquotedString
+ val qualifiedTableIdentifier = table.identifier.copy(database = Some(db))
+ val tableName = qualifiedTableIdentifier.unquotedString
- if (sessionState.catalog.tableExists(tableIdentWithDB)) {
- assert(mode != SaveMode.Overwrite,
+ if (sessionState.catalog.tableExists(qualifiedTableIdentifier)) {
+ checkState(mode != SaveMode.Overwrite,
s"Expect the table $tableName has been dropped when the save mode is
Overwrite")
if (mode == SaveMode.ErrorIfExists) {
@@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand(
}
}
- // ReOrder the query which move the partition columns to the last of the
project list
- val reOrderedQuery = reOrderPartitionColumn(query,
table.partitionColumnNames)
// Remove some properties should not be used
- val newStorage = new CatalogStorageFormat(
- table.storage.locationUri,
- table.storage.inputFormat,
- table.storage.outputFormat,
- table.storage.serde,
- table.storage.compressed,
- table.storage.properties.--(needFilterProps))
- val newTable = table.copy(
- identifier = tableIdentWithDB,
- storage = newStorage,
- schema = reOrderedQuery.schema,
- properties = table.properties.--(needFilterProps)
+ val updatedStorageFormat = table.storage.copy(
+ properties = table.storage.properties -- needFilterProps)
+
+ val updatedTable = table.copy(
+ identifier = qualifiedTableIdentifier,
+ storage = updatedStorageFormat,
+ // TODO need to add meta-fields here
+ schema = query.schema,
+ properties = table.properties -- needFilterProps
)
- val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
+ val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable)
val tablePath = hoodieCatalogTable.tableLocation
val hadoopConf = sparkSession.sessionState.newHadoopConf()
- // Execute the insert query
try {
- // init hoodie table
+ // Init hoodie table
hoodieCatalogTable.initHoodieTable()
- val tblProperties = hoodieCatalogTable.catalogProperties
- val options = Map(
+ val tableProperties = hoodieCatalogTable.catalogProperties
+ // NOTE: Users might be specifying write-configuration (inadvertently)
as options or table properties
+ // in CTAS, therefore we need to make sure that these are
appropriately propagated to the
+ // write operation
+ val options = tableProperties ++ Map(
HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType
== CatalogTableType.MANAGED).toString,
- HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key ->
ConfigUtils.configToString(tblProperties.asJava),
- HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key ->
ConfigUtils.configToString(newTable.properties.asJava),
+ HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key ->
ConfigUtils.configToString(tableProperties.asJava),
+ HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key ->
ConfigUtils.configToString(updatedTable.properties.asJava),
DataSourceWriteOptions.SQL_INSERT_MODE.key ->
InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
- val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap
- val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable,
reOrderedQuery, partitionSpec,
+ val partitionSpec = updatedTable.partitionColumnNames.map((_,
None)).toMap
+ val success = InsertIntoHoodieTableCommand.run(sparkSession,
updatedTable, query, partitionSpec,
mode == SaveMode.Overwrite, refreshTable = false, extraOptions =
options)
if (success) {
// If write success, create the table in catalog if it has not synced
to the
// catalog by the meta sync.
- if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+ if
(!sparkSession.sessionState.catalog.tableExists(qualifiedTableIdentifier)) {
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession,
hoodieCatalogTable, mode == SaveMode.Ignore)
}
@@ -132,16 +128,4 @@ case class CreateHoodieTableAsSelectCommand(
val fs = path.getFileSystem(conf)
fs.delete(path, true)
}
-
- private def reOrderPartitionColumn(query: LogicalPlan,
- partitionColumns: Seq[String]): LogicalPlan = {
- if (partitionColumns.isEmpty) {
- query
- } else {
- val nonPartitionAttrs = query.output.filter(p =>
!partitionColumns.contains(p.name))
- val partitionAttrs = query.output.filter(p =>
partitionColumns.contains(p.name))
- val reorderAttrs = nonPartitionAttrs ++ partitionAttrs
- Project(reorderAttrs, query)
- }
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 125e802802..0228e5ddcf 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -132,7 +132,6 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
val targetPartitionSchema = catalogTable.partitionSchema
val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
- validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
// Make sure we strip out meta-fields from the incoming dataset (these
will have to be discarded anyway)
val cleanedQuery = stripMetaFields(query)
// To validate and align properly output of the query, we simply filter
out partition columns with already
@@ -144,6 +143,8 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
// positionally for example
val expectedQueryColumns =
catalogTable.tableSchemaWithoutMetaFields.filterNot(f =>
staticPartitionValues.contains(f.name))
val coercedQueryOutput =
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery,
catalogTable, conf)
+ // After potential reshaping validate that the output of the query
conforms to the table's schema
+ validate(removeMetaFields(coercedQueryOutput.schema), partitionsSpec,
catalogTable)
val staticPartitionValuesExprs =
createStaticPartitionValuesExpressions(staticPartitionValues,
targetPartitionSchema, conf)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index e7848320ff..13800e6c8b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -128,7 +128,7 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
spark.sql(sql)
} catch {
case e: Throwable =>
- assertResult(errorMsg)(e.getMessage)
+ assertResult(errorMsg.trim)(e.getMessage.trim)
hasException = true
}
assertResult(true)(hasException)
@@ -139,7 +139,7 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
try {
spark.sql(sql)
} catch {
- case e: Throwable if e.getMessage.contains(errorMsg) => hasException =
true
+ case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) =>
hasException = true
case f: Throwable => fail("Exception should contain: " + errorMsg + ",
error message: " + f.getMessage, f)
}
assertResult(true)(hasException)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 2fa6b939ac..4c2bec48a6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -529,17 +529,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
""".stripMargin)
- checkException(s"insert into $tableName partition(dt = '2021-06-20')
select 1, 'a1', 10, '2021-06-20'") (
- "Expected table's schema: " +
- "[StructField(id,IntegerType,true), StructField(name,StringType,true),
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
- "query's output (including static partition values): " +
- "[StructField(1,IntegerType,false), StructField(a1,StringType,false),
StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false),
StructField(dt,StringType,true)]"
+ checkExceptionContain(s"insert into $tableName partition(dt =
'2021-06-20') select 1, 'a1', 10, '2021-06-20'") (
+ """
+ |too many data columns:
+ |Table columns: 'id', 'name', 'price'
+ |Data columns: '1', 'a1', '10', '2021-06-20'
+ |""".stripMargin
)
- checkException(s"insert into $tableName select 1, 'a1', 10")(
- "Expected table's schema: " +
- "[StructField(id,IntegerType,true), StructField(name,StringType,true),
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
- "query's output (including static partition values): " +
- "[StructField(1,IntegerType,false), StructField(a1,StringType,false),
StructField(10,IntegerType,false)]"
+ checkExceptionContain(s"insert into $tableName select 1, 'a1', 10")(
+ """
+ |not enough data columns:
+ |Table columns: 'id', 'name', 'price', 'dt'
+ |Data columns: '1', 'a1', '10'
+ |""".stripMargin
)
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql("set hoodie.sql.insert.mode = strict")
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 2672e2c4cb..9be7198e6d 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer,
UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
+import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan}
import
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand,
ExplainCommand}
@@ -31,8 +32,14 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
- conf: SQLConf): LogicalPlan =
- SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName,
expected, query, byName)
+ conf: SQLConf): LogicalPlan = {
+ // NOTE: We have to apply [[ResolveUpCast]] and [[SimplifyCasts]] rules
since by default Spark 2.x will
+ // always be wrapping matched attributes into [[UpCast]]s which
aren't resolvable and render some
+ // APIs like [[QueryPlan.schema]] unusable
+ SimplifyCasts.apply(
+ SimpleAnalyzer.ResolveUpCast.apply(
+ SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName,
expected, query, byName)))
+ }
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, extended = extended)