This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cfd0c1ee34 [HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS
performance gap (#6213)
cfd0c1ee34 is described below
commit cfd0c1ee34460332053491fd1e68c2607c14e958
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Thu Jul 28 15:36:03 2022 -0700
[HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap
(#6213)
---
.../spark/sql/HoodieCatalystPlansUtils.scala | 23 +-
.../hudi/common/table/TableSchemaResolver.java | 21 ++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 87 ++++----
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 4 +-
.../command/InsertIntoHoodieTableCommand.scala | 243 +++++++++++++--------
.../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 18 +-
.../apache/spark/sql/hudi/TestDeleteTable.scala | 39 +++-
.../apache/spark/sql/hudi/TestInsertTable.scala | 31 +--
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 2 +-
.../apache/spark/sql/hudi/TestShowPartitions.scala | 20 +-
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 29 ++-
.../apache/spark/sql/hudi/TestUpdateTable.scala | 28 ++-
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark3CatalystPlanUtils.scala | 12 +-
14 files changed, 381 insertions(+), 188 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index c277dcb3e6..7566458b1b 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -19,12 +19,33 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.internal.SQLConf
trait HoodieCatalystPlansUtils {
+ /**
+ * Resolves output of the provided [[query]] against the [[expected]] list
of [[Attribute]],
+ * and returns new (reshaped) instance of the [[LogicalPlan]]
+ *
+ * @param tableName used purely for more human-readable error output (if any)
+ * @param expected list of attributes output of the query has to adhere to
+ * @param query query whose output has to be reshaped
+ * @param byName whether the matching should occur by-name or positionally
+ * @param conf instance of [[SQLConf]]
+ * @return [[LogicalPlan]] which output is aligned to match to that of
[[expected]]
+ */
+ def resolveOutputColumns(tableName: String,
+ expected: Seq[Attribute],
+ query: LogicalPlan,
+ byName: Boolean,
+ conf: SQLConf): LogicalPlan
+
+ /**
+ * Instantiates an [[Explain]] command
+ */
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 5fc989e2e5..4ada97e35c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -316,6 +316,9 @@ public class TableSchemaResolver {
* @param oldSchema Older schema to check.
* @param newSchema Newer schema to check.
* @return True if the schema validation is successful
+ *
+ * TODO revisit this method: it's implemented incorrectly as it might be
applying different criteria
+ * to top-level record and nested record (for ex, if that nested record
is contained w/in an array)
*/
public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema)
{
if (oldSchema.getType() == newSchema.getType() && newSchema.getType() ==
Schema.Type.RECORD) {
@@ -366,13 +369,31 @@ public class TableSchemaResolver {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new
Schema.Parser().parse(newSchema));
}
+ /**
+ * Returns table's latest Avro {@link Schema} iff table is non-empty (ie
there's at least
+ * a single commit)
+ *
+ * This method differs from {@link #getTableAvroSchema(boolean)} in that it
won't fallback
+ * to use table's schema used at creation
+ */
+ public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean
includeMetadataFields) throws Exception {
+ if (metaClient.isTimelineNonEmpty()) {
+ return Option.of(getTableAvroSchemaInternal(includeMetadataFields,
Option.empty()));
+ }
+
+ return Option.empty();
+ }
+
/**
* Get latest schema either from incoming schema or table schema.
* @param writeSchema incoming batch's write schema.
* @param convertTableSchemaToAddNamespace {@code true} if table schema
needs to be converted. {@code false} otherwise.
* @param converterFn converter function to be called over table schema (to
add namespace may be). Each caller can decide if any conversion is required.
* @return the latest schema.
+ *
+ * @deprecated will be removed (HUDI-4472)
*/
+ @Deprecated
public Schema getLatestSchema(Schema writeSchema, boolean
convertTableSchemaToAddNamespace,
Function1<Schema, Schema> converterFn) {
Schema latestSchema = writeSchema;
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7324a5ca5b..167001863d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
@@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, StringUtils}
+import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
@@ -72,8 +72,7 @@ object HoodieSparkSqlWriter {
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty,
- asyncClusteringTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty
- )
+ asyncClusteringTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String],
common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
- var schema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
- val lastestSchema = getLatestTableSchema(fs, basePath,
sparkContext, schema)
+
+ // TODO(HUDI-4472) revisit and simplify schema handling
+ val sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
+ val latestTableSchema = getLatestTableSchema(fs, basePath,
sparkContext).getOrElse(sourceSchema)
+
+ val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext)
- if (reconcileSchema &&
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
- && internalSchemaOpt.isEmpty) {
- // force apply full schema evolution.
- internalSchemaOpt =
Some(AvroInternalSchemaConverter.convert(schema))
- }
- if (reconcileSchema) {
- schema = lastestSchema
- }
- if (internalSchemaOpt.isDefined) {
- // Apply schema evolution.
- val mergedSparkSchema = if (!reconcileSchema) {
-
AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema,
lastestSchema))
+
+ val writerSchema: Schema =
+ if (reconcileSchema) {
+ // In case we need to reconcile the schema and schema
evolution is enabled,
+ // we will force-apply schema evolution to the writer's schema
+ if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
+ internalSchemaOpt =
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+ }
+
+ if (internalSchemaOpt.isDefined) {
+ // Apply schema evolution, by auto-merging write schema and
read schema
+ val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+ AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getName)
+ } else if
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
+ // In case schema reconciliation is enabled and source and
latest table schemas
+ // are compatible (as defined by
[[TableSchemaResolver#isSchemaCompatible]], then we will
+ // pick latest table's schema as the writer's schema
+ latestTableSchema
+ } else {
+ // Otherwise fallback to original source's schema
+ sourceSchema
+ }
} else {
- // Auto merge write schema and read schema.
- val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
-
AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema,
lastestSchema.getName))
+ // In case reconciliation is disabled, we still have to do
nullability attributes
+ // (minor) reconciliation, making sure schema of the incoming
batch is in-line with
+ // the data already committed in the table
+
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
}
- schema =
AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema,
structName, nameSpace)
- }
- if (reconcileSchema && internalSchemaOpt.isEmpty) {
- schema = lastestSchema
- }
- validateSchemaForHoodieIsDeleted(schema)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
+ validateSchemaForHoodieIsDeleted(writerSchema)
+ sparkContext.getConf.registerAvroSchemas(writerSchema)
+ log.info(s"Registered avro schema :
${writerSchema.toString(true)}")
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
- org.apache.hudi.common.util.Option.of(schema))
+ org.apache.hudi.common.util.Option.of(writerSchema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
@@ -295,10 +304,10 @@ object HoodieSparkSqlWriter {
hoodieRecord
}).toJavaRDD()
- val writeSchema = if (dropPartitionColumns)
generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
+ val writerDataSchema = if (dropPartitionColumns)
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else
writerSchema
// Create a HoodieWriteClient & issue the write.
- val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
writeSchema.toString, path,
+ val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
writerDataSchema.toString, path,
tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters,
internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -388,14 +397,18 @@ object HoodieSparkSqlWriter {
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true
only if schema conversion is required.
*/
- def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext:
SparkContext, schema: Schema): Schema = {
- var latestSchema: Schema = schema
+ def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext:
SparkContext): Option[Schema] = {
if (FSUtils.isTableExists(basePath.toString, fs)) {
- val tableMetaClient =
HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
+ val tableMetaClient = HoodieTableMetaClient.builder
+ .setConf(sparkContext.hadoopConfiguration)
+ .setBasePath(basePath.toString)
+ .build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
- latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null)
+
+
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
+ } else {
+ None
}
- latestSchema
}
def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext:
SparkContext, df: Dataset[Row],
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 8328882239..b02881bc3d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -317,8 +317,8 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf):
Expression = {
child match {
case Literal(nul, NullType) => Literal(nul, dataType)
- case _ => if (child.dataType != dataType)
- Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
+ case expr if child.dataType != dataType => Cast(expr, dataType,
Option(conf.sessionLocalTimeZone))
+ case _ => child
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index be1ad8e9b8..8bd81df3d2 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -17,27 +17,39 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.HoodieSparkSqlWriter
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal,
NamedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql._
/**
- * Command for insert into hoodie table.
+ * Command for insert into Hudi table.
+ *
+ * This is correspondent to Spark's native [[InsertIntoStatement]]
+ *
+ * @param logicalRelation the [[LogicalRelation]] representing the table to be
writing into.
+ * @param query the logical plan representing data to be written
+ * @param partitionSpec a map from the partition key to the partition value
(optional).
+ * If the value is missing, dynamic partition insert
will be performed.
+ * As an example, `INSERT INTO tbl PARTITION (a=1, b=2)
AS` would have
+ * Map('a' -> Some('1'), 'b' -> Some('2')),
+ * and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
+ * would have Map('a' -> Some('1'), 'b' -> None).
+ * @param overwrite overwrite existing table or partitions.
*/
-case class InsertIntoHoodieTableCommand(
- logicalRelation: LogicalRelation,
- query: LogicalPlan,
- partition: Map[String, Option[String]],
- overwrite: Boolean)
+case class InsertIntoHoodieTableCommand(logicalRelation: LogicalRelation,
+ query: LogicalPlan,
+ partitionSpec: Map[String,
Option[String]],
+ overwrite: Boolean)
extends HoodieLeafRunnableCommand {
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -45,18 +57,19 @@ case class InsertIntoHoodieTableCommand(
assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
val table = logicalRelation.catalogTable.get
- InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition,
overwrite)
+ InsertIntoHoodieTableCommand.run(sparkSession, table, query,
partitionSpec, overwrite)
Seq.empty[Row]
}
}
-object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
+object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig
with SparkAdapterSupport {
+
/**
* Run the insert query. We support both dynamic partition insert and static
partition insert.
* @param sparkSession The spark session.
* @param table The insert table.
* @param query The insert query.
- * @param insertPartitions The specified insert partition map.
+ * @param partitionSpec The specified insert partition map.
* e.g. "insert into h(dt = '2021') select id, name
from src"
* "dt" is the key in the map and "2021" is the
partition value. If the
* partition value has not specified(in the case of
dynamic partition)
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig {
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession,
- table: CatalogTable,
- query: LogicalPlan,
- insertPartitions: Map[String, Option[String]],
- overwrite: Boolean,
- refreshTable: Boolean = true,
- extraOptions: Map[String, String] = Map.empty): Boolean = {
-
- val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
- val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession,
overwrite, insertPartitions, extraOptions)
-
- val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
+ table: CatalogTable,
+ query: LogicalPlan,
+ partitionSpec: Map[String, Option[String]],
+ overwrite: Boolean,
+ refreshTable: Boolean = true,
+ extraOptions: Map[String, String] = Map.empty): Boolean = {
+ val catalogTable = new HoodieCatalogTable(sparkSession, table)
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
overwrite, partitionSpec, extraOptions)
+
+ // NOTE: In case of partitioned table we override specified "overwrite"
parameter
+ // to instead append to the dataset
+ val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
SaveMode.Overwrite
} else {
- // for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
- val conf = sparkSession.sessionState.conf
- val alignedQuery = alignOutputFields(query, hoodieCatalogTable,
insertPartitions, conf)
- // If we create dataframe using the Dataset.ofRows(sparkSession,
alignedQuery),
- // The nullable attribute of fields will lost.
- // In order to pass the nullable attribute to the inputDF, we specify the
schema
- // of the rdd.
- val inputDF = sparkSession.createDataFrame(
- Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
- val success =
- HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config,
inputDF)._1
- if (success) {
- if (refreshTable) {
- sparkSession.catalog.refreshTable(table.identifier.unquotedString)
- }
- true
- } else {
- false
+
+ val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec,
sparkSession.sessionState.conf)
+
+ val (success, _, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config,
Dataset.ofRows(sparkSession, alignedQuery))
+
+ if (success && refreshTable) {
+ sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
+
+ success
}
/**
- * Aligned the type and name of query's output fields with the result
table's fields.
- * @param query The insert query which to aligned.
- * @param hoodieCatalogTable The result hoodie catalog table.
- * @param insertPartitions The insert partition map.
- * @param conf The SQLConf.
- * @return
+ * Align provided [[query]]'s output with the expected [[catalogTable]]
schema by
+ *
+ * <ul>
+ * <li>Performing type coercion (casting corresponding outputs, where
needed)</li>
+ * <li>Adding aliases (matching column names) to corresponding outputs
</li>
+ * </ul>
+ *
+ * @param query target query whose output is to be inserted
+ * @param catalogTable catalog table
+ * @param partitionsSpec partition spec specifying static/dynamic partition
values
+ * @param conf Spark's [[SQLConf]]
*/
- private def alignOutputFields(
- query: LogicalPlan,
- hoodieCatalogTable: HoodieCatalogTable,
- insertPartitions: Map[String, Option[String]],
- conf: SQLConf): LogicalPlan = {
-
- val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
- val staticPartitionValues = insertPartitions.filter(p =>
p._2.isDefined).mapValues(_.get)
- assert(staticPartitionValues.isEmpty ||
- insertPartitions.size == targetPartitionSchema.size,
- s"Required partition columns is: ${targetPartitionSchema.json}, Current
input partitions " +
- s"is: ${staticPartitionValues.mkString("," + "")}")
-
- val queryOutputWithoutMetaFields = removeMetaFields(query.output)
- assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
- == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
- s"Required select columns count:
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
- s"Current select columns(including static partition column) count: " +
- s"${staticPartitionValues.size +
queryOutputWithoutMetaFields.size},columns: " +
- s"(${(queryOutputWithoutMetaFields.map(_.name) ++
staticPartitionValues.keys).mkString(",")})")
-
- val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
- hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f =>
staticPartitionValues.contains(f.name)))
- val dataProjectsWithoutMetaFields =
getTableFieldsAlias(queryOutputWithoutMetaFields,
- dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
- val partitionProjects = targetPartitionSchema.fields.filter(f =>
staticPartitionValues.contains(f.name))
- .map(f => {
- val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
- s"Missing static partition value for: ${f.name}")
- val castAttr = castIfNeeded(Literal.create(staticPartitionValue),
f.dataType, conf)
- Alias(castAttr, f.name)()
- })
+ private def alignQueryOutput(query: LogicalPlan,
+ catalogTable: HoodieCatalogTable,
+ partitionsSpec: Map[String, Option[String]],
+ conf: SQLConf): LogicalPlan = {
+
+ val targetPartitionSchema = catalogTable.partitionSchema
+ val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+ validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+ // Make sure we strip out meta-fields from the incoming dataset (these
will have to be discarded anyway)
+ val cleanedQuery = stripMetaFields(query)
+ // To validate and align properly output of the query, we simply filter
out partition columns with already
+ // provided static values from the table's schema
+ //
+ // NOTE: This is a crucial step: since coercion might rely on either of a)
name-based or b) positional-based
+ // matching it's important to strip out partition columns, having
static values provided in the partition spec,
+ // since such columns wouldn't be otherwise specified w/in the query
itself and therefore couldn't be matched
+ // positionally for example
+ val expectedQueryColumns =
catalogTable.tableSchemaWithoutMetaFields.filterNot(f =>
staticPartitionValues.contains(f.name))
+ val coercedQueryOutput =
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery,
catalogTable, conf)
+
+ val staticPartitionValuesExprs =
createStaticPartitionValuesExpressions(staticPartitionValues,
targetPartitionSchema, conf)
+
+ Project(coercedQueryOutput.output ++ staticPartitionValuesExprs,
coercedQueryOutput)
+ }
+
+ private def coerceQueryOutputColumns(expectedSchema: StructType,
+ query: LogicalPlan,
+ catalogTable: HoodieCatalogTable,
+ conf: SQLConf): LogicalPlan = {
+ val planUtils = sparkAdapter.getCatalystPlanUtils
+ try {
+ planUtils.resolveOutputColumns(catalogTable.catalogTableName,
expectedSchema.toAttributes, query, byName = true, conf)
+ } catch {
+ // NOTE: In case matching by name didn't match the query output, we will
attempt positional matching
+ case ae: AnalysisException if ae.getMessage().startsWith("Cannot write
incompatible data to table") =>
+ planUtils.resolveOutputColumns(catalogTable.catalogTableName,
expectedSchema.toAttributes, query, byName = false, conf)
+ }
+ }
- Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
+ private def validate(queryOutputSchema: StructType, partitionsSpec:
Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
+ // Validate that partition-spec has proper format (it could be empty if
all of the partition values are dynamic,
+ // ie there are no static partition-values specified)
+ if (partitionsSpec.nonEmpty && partitionsSpec.size !=
catalogTable.partitionSchema.size) {
+ throw new HoodieException(s"Required partition schema is:
${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " +
+ s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}")
+ }
+
+ val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+ val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++
staticPartitionValues.keys.map(StructField(_, StringType)))
+
+ // Assert that query provides all the required columns
+ if (!conforms(fullQueryOutputSchema,
catalogTable.tableSchemaWithoutMetaFields)) {
+ throw new HoodieException(s"Expected table's schema:
${catalogTable.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, "
+
+ s"query's output (including static partition values):
${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}")
+ }
+ }
+
+ private def createStaticPartitionValuesExpressions(staticPartitionValues:
Map[String, String],
+ partitionSchema:
StructType,
+ conf: SQLConf):
Seq[NamedExpression] = {
+ partitionSchema.fields
+ .filter(pf => staticPartitionValues.contains(pf.name))
+ .map(pf => {
+ val staticPartitionValue = staticPartitionValues(pf.name)
+ val castExpr = castIfNeeded(Literal.create(staticPartitionValue),
pf.dataType, conf)
+
+ Alias(castExpr, pf.name)()
+ })
}
- private def getTableFieldsAlias(
- queryOutputWithoutMetaFields: Seq[Attribute],
- schemaWithoutMetaFields: Seq[StructField],
- conf: SQLConf): Seq[Alias] = {
- queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case
(dataAttr, dataField) =>
- val targetAttrOption = if (dataAttr.name.startsWith("col")) {
- None
- } else {
- queryOutputWithoutMetaFields.find(_.name.equals(dataField.name))
+ private def conforms(sourceSchema: StructType, targetSchema: StructType):
Boolean = {
+ if (sourceSchema.fields.length != targetSchema.fields.length) {
+ false
+ } else {
+ targetSchema.fields.zip(sourceSchema).forall {
+ case (targetColumn, sourceColumn) =>
+ // Make sure we can cast source column to the target column type
+ Cast.canCast(sourceColumn.dataType, targetColumn.dataType)
}
- val targetAttr = targetAttrOption.getOrElse(dataAttr)
- val castAttr =
castIfNeeded(targetAttr.withNullability(dataField.nullable),
- dataField.dataType, conf)
- Alias(castAttr, dataField.name)()
}
}
+
+ def stripMetaFields(query: LogicalPlan): LogicalPlan = {
+ val filteredOutput = query.output.filterNot(attr => isMetaField(attr.name))
+ if (filteredOutput == query.output) {
+ query
+ } else {
+ Project(filteredOutput, query)
+ }
+ }
+
+ private def filterStaticPartitionValues(partitionsSpec: Map[String,
Option[String]]): Map[String, String] =
+ partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index 5e2afd7490..e7848320ff 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -145,11 +145,23 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
assertResult(true)(hasException)
}
+ def dropTypeLiteralPrefix(value: Any): Any = {
+ value match {
+ case s: String =>
+ s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("X")
+ case _ => value
+ }
+ }
- protected def removeQuotes(value: Any): Any = {
+ protected def extractRawValue(value: Any): Any = {
value match {
- case s: String => s.stripPrefix("'").stripSuffix("'")
- case _=> value
+ case s: String =>
+ // We need to strip out data-type prefixes like "DATE", "TIMESTAMP"
+ dropTypeLiteralPrefix(s)
+ .asInstanceOf[String]
+ .stripPrefix("'")
+ .stripSuffix("'")
+ case _ => value
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index 4c7c626966..3ab52a0bac 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkUtils.isSpark2
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode
@@ -93,11 +94,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
- spark.sql(
- s"""
- |insert into $tableName
- |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2',
30.0, 1000)
- """.stripMargin)
+ if (isSpark2) {
+ spark.sql(
+ s"""
+ |insert into $tableName
+ |values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2',
cast(20.0 as double), 1000), (3, 'a2', cast(30.0 as double), 1000)
+ |""".stripMargin)
+ } else {
+ spark.sql(
+ s"""
+ |insert into $tableName
+ |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2',
30.0, 1000)
+ |""".stripMargin)
+ }
+
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000),
@@ -132,11 +142,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
- spark.sql(
- s"""
- |insert into $ptTableName
- |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000,
"2021"), (3, 'a2', 30.0, 1000, "2022")
- """.stripMargin)
+ if (isSpark2) {
+ spark.sql(
+ s"""
+ |insert into $ptTableName
+ |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2,
'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double),
1000, "2022")
+ |""".stripMargin)
+ } else {
+ spark.sql(
+ s"""
+ |insert into $ptTableName
+ |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000,
"2021"), (3, 'a2', 30.0, 1000, "2022")
+ |""".stripMargin)
+ }
+
checkAnswer(s"select id, name, price, ts, pt from $ptTableName")(
Seq(1, "a1", 10.0, 1000, "2021"),
Seq(2, "a2", 20.0, 1000, "2021"),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 8d21fe32ea..ced6fef72d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.internal.SQLConf
import java.io.File
@@ -396,8 +397,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
("string", "'1000'"),
("int", 1000),
("bigint", 10000),
- ("timestamp", "'2021-05-20 00:00:00'"),
- ("date", "'2021-05-20'")
+ ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
+ ("date", "DATE'2021-05-20'")
)
typeAndValue.foreach { case (partitionType, partitionValue) =>
val tableName = generateTableName
@@ -409,8 +410,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test TimestampType Partition Column With Consistent Logical Timestamp
Enabled") {
withTempDir { tmp =>
val typeAndValue = Seq(
- ("timestamp", "'2021-05-20 00:00:00'"),
- ("date", "'2021-05-20'")
+ ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
+ ("date", "DATE'2021-05-20'")
)
typeAndValue.foreach { case (partitionType, partitionValue) =>
val tableName = generateTableName
@@ -433,11 +434,12 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
- spark.sql(s"insert into $tableName partition(dt = $partitionValue) select
1, 'a1', 10")
+ // NOTE: We have to drop type-literal prefix since Spark doesn't parse
type literals appropriately
+ spark.sql(s"insert into $tableName partition(dt =
${dropTypeLiteralPrefix(partitionValue)}) select 1, 'a1', 10")
spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName
order by id")(
- Seq(1, "a1", 10, removeQuotes(partitionValue).toString),
- Seq(2, "a2", 10, removeQuotes(partitionValue).toString)
+ Seq(1, "a1", 10, extractRawValue(partitionValue).toString),
+ Seq(2, "a2", 10, extractRawValue(partitionValue).toString)
)
}
@@ -481,14 +483,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
""".stripMargin)
- checkException(s"insert into $tableName partition(dt = '2021-06-20')" +
- s" select 1, 'a1', 10, '2021-06-20'") (
- "assertion failed: Required select columns count: 4, Current select
columns(including static partition column)" +
- " count: 5,columns: (1,a1,10,2021-06-20,dt)"
+ checkException(s"insert into $tableName partition(dt = '2021-06-20')
select 1, 'a1', 10, '2021-06-20'") (
+ "Expected table's schema: " +
+ "[StructField(id,IntegerType,true), StructField(name,StringType,true),
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
+ "query's output (including static partition values): " +
+ "[StructField(1,IntegerType,false), StructField(a1,StringType,false),
StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false),
StructField(dt,StringType,true)]"
)
checkException(s"insert into $tableName select 1, 'a1', 10")(
- "assertion failed: Required select columns count: 4, Current select
columns(including static partition column)" +
- " count: 3,columns: (1,a1,10)"
+ "Expected table's schema: " +
+ "[StructField(id,IntegerType,true), StructField(name,StringType,true),
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
+ "query's output (including static partition values): " +
+ "[StructField(1,IntegerType,false), StructField(a1,StringType,false),
StructField(10,IntegerType,false)]"
)
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql("set hoodie.sql.insert.mode = strict")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index ac11f83d53..58c808d28a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase {
| when not matched then insert *
|""".stripMargin)
checkAnswer(s"select id, name, cast(value as string), ts from
$tableName")(
- Seq(1, "a1", removeQuotes(dataValue), 1000)
+ Seq(1, "a1", extractRawValue(dataValue), 1000)
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
index 005d5fed71..59ee642861 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.HoodieSparkUtils.isSpark2
import
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
class TestShowPartitions extends HoodieSparkSqlTestBase {
@@ -84,11 +85,22 @@ class TestShowPartitions extends HoodieSparkSqlTestBase {
checkAnswer(s"show partitions $tableName
partition(dt='2021-01-02')")(Seq("dt=2021-01-02"))
// Insert into null partition
- spark.sql(
- s"""
- | insert into $tableName
- | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
+ if (isSpark2) {
+ // Spark 2 isn't able to convert NullType to any other type w/
appropriate nullability, so
+ // explicit cast is required
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, cast(null
as string) as dt
""".stripMargin)
+ } else {
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
+ """.stripMargin)
+ }
+
checkAnswer(s"show partitions $tableName")(
Seq("dt=2021-01-01"), Seq("dt=2021-01-02"),
Seq("dt=%s".format(DEFAULT_PARTITION_PATH))
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index b64d386f1f..65357b903b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -55,11 +55,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(
s"""
| insert into $tableName values
- |
(1,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(2,2,12,100002,102.02,1002.0002,100002.0002,'a000002','2021-12-25','2021-12-25
12:02:02',true,'a02','2021-12-25'),
- |
(3,3,13,100003,103.03,1003.0003,100003.0003,'a000003','2021-12-25','2021-12-25
12:03:03',false,'a03','2021-12-25'),
- |
(4,4,14,100004,104.04,1004.0004,100004.0004,'a000004','2021-12-26','2021-12-26
12:04:04',true,'a04','2021-12-26'),
- |
(5,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
+ |
(1,1,11,100001,101.01,1001.0001,100001.0001,'a000001',DATE'2021-12-25',TIMESTAMP'2021-12-25
12:01:01',true,X'a01',TIMESTAMP'2021-12-25'),
+ |
(2,2,12,100002,102.02,1002.0002,100002.0002,'a000002',DATE'2021-12-25',TIMESTAMP'2021-12-25
12:02:02',true,X'a02',TIMESTAMP'2021-12-25'),
+ |
(3,3,13,100003,103.03,1003.0003,100003.0003,'a000003',DATE'2021-12-25',TIMESTAMP'2021-12-25
12:03:03',false,X'a03',TIMESTAMP'2021-12-25'),
+ |
(4,4,14,100004,104.04,1004.0004,100004.0004,'a000004',DATE'2021-12-26',TIMESTAMP'2021-12-26
12:04:04',true,X'a04',TIMESTAMP'2021-12-26'),
+ |
(5,5,15,100005,105.05,1005.0005,100005.0005,'a000005',DATE'2021-12-26',TIMESTAMP'2021-12-26
12:05:05',false,X'a05',TIMESTAMP'2021-12-26')
|""".stripMargin)
}
@@ -70,6 +70,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
+ // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
+ // and are disallowed now by default in Spark 3.x
+ spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
// date -> string -> date
spark.sql(s"alter table $tableName alter column col6 type String")
@@ -138,6 +141,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
+ // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
+ // and are disallowed now by default in Spark 3.x
+ spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
// float -> double -> decimal -> String
spark.sql(s"alter table $tableName alter column col2 type double")
@@ -172,6 +178,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
+ // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
+ // and are disallowed now by default in Spark 3.x
+ spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
// test set properties
@@ -402,7 +411,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"alter table $tableName alter column members.value.a
first")
- spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1',
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStruct', 29, 100),
1000)")
+ spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100),
1000)")
// rename column
spark.sql(s"alter table ${tableName} rename column user to userx")
@@ -424,7 +433,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
checkAnswer(spark.sql(s"select name, userx.name, userx.score from
${tableName}").collect())(Seq(null, null, null))
// insert again
- spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 ,
101), 'jacknew', 1000)")
+ spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 ,
101), 'jacknew', 1000)")
// check again
checkAnswer(spark.sql(s"select name, userx.name as uxname,
userx.score as uxs from ${tableName} order by id").collect())(
@@ -440,9 +449,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
Seq(291, 2, "jacknew"))
// test map value type change
spark.sql(s"alter table ${tableName} add columns(mxp map<String,
int>)")
- spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 ,
101), 'jacknew', 1000, map('t1', 9))")
+ spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100,
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew',
1000, map('t1', 9))")
spark.sql(s"alter table ${tableName} alter column mxp.value type
double")
- spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 ,
101), 'jacknew', 1000, map('t1', 10))")
+ spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100,
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew',
1000, map('t1', 10))")
spark.sql(s"select * from $tableName").show(false)
checkAnswer(spark.sql(s"select mxp from ${tableName} order by
id").collect())(
Seq(null),
@@ -453,7 +462,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"alter table ${tableName} rename column userx to us")
spark.sql(s"alter table ${tableName} rename column us.age to age1")
- spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 ,
101), 'jacknew', 1000, map('t1', 10))")
+ spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100,
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew',
1000, map('t1', 10))")
spark.sql(s"select mem.value.nn, us.age1 from $tableName order by
id").show()
checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName
order by id").collect())(
Seq(null, 29),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 8c709ab37a..2d8d6ceca7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.HoodieSparkUtils.isSpark2
+
class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test Update Table") {
@@ -84,7 +86,12 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
- spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2,
'a2', 20.0, 1000)")
+ if (isSpark2) {
+ spark.sql(s"insert into $tableName values (1, 'a1', cast(10.0 as
double), 1000), (2, 'a2', cast(20.0 as double), 1000)")
+ } else {
+ spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2,
'a2', 20.0, 1000)")
+ }
+
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000)
@@ -119,11 +126,20 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
- spark.sql(
- s"""
- |insert into $ptTableName
- |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000,
"2021"), (3, 'a2', 30.0, 1000, "2022")
- """.stripMargin)
+ if (isSpark2) {
+ spark.sql(
+ s"""
+ |insert into $ptTableName
+ |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2,
'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double),
1000, "2022")
+ |""".stripMargin)
+ } else {
+ spark.sql(
+ s"""
+ |insert into $ptTableName
+ |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000,
"2021"), (3, 'a2', 30.0, 1000, "2022")
+ |""".stripMargin)
+ }
+
checkAnswer(s"select id, name, price, ts, pt from $ptTableName")(
Seq(1, "a1", 10.0, 1000, "2021"),
Seq(2, "a2", 20.0, 1000, "2021"),
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 2797b8caa1..cf54504d0d 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -18,14 +18,22 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
+import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer,
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
+ def resolveOutputColumns(tableName: String,
+ expected: Seq[Attribute],
+ query: LogicalPlan,
+ byName: Boolean,
+ conf: SQLConf): LogicalPlan =
+ SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName,
expected, query, byName)
+
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, extended = extended)
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index 0cdf5782c0..abece34dea 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -18,17 +18,25 @@
package org.apache.spark.sql
import org.apache.hudi.spark3.internal.ReflectUtil
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
+import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver,
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join,
JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
+import org.apache.spark.sql.internal.SQLConf
abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
+ def resolveOutputColumns(tableName: String,
+ expected: Seq[Attribute],
+ query: LogicalPlan,
+ byName: Boolean,
+ conf: SQLConf): LogicalPlan =
+ TableOutputResolver.resolveOutputColumns(tableName, expected, query,
byName, conf)
+
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode)