This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 26cc766ded7f9b898554a346d1a0d4b6dc8837e9 Author: Shiyan Xu <[email protected]> AuthorDate: Thu Aug 31 21:57:11 2023 -0500 [HUDI-6579] Fix streaming write when meta cols dropped (#9589) --- .../main/scala/org/apache/hudi/DefaultSource.scala | 36 +++++++++++----------- .../org/apache/hudi/HoodieCreateRecordUtils.scala | 11 +++---- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 14 ++++----- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 5a0b0a53d33..f982fb1e1c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -19,17 +19,17 @@ package org.apache.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, RECORDKEY_FIELD, SPARK_SQL_WRITES_PREPPED_KEY, STREAMING_CHECKPOINT_IDENTIFIER} +import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER} import org.apache.hudi.cdc.CDCRelation import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} -import org.apache.hudi.common.model.{HoodieRecord, WriteConcurrencyMode} +import org.apache.hudi.common.model.WriteConcurrencyMode import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ConfigUtils import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY -import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_CONCURRENCY_MODE} +import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE import org.apache.hudi.exception.HoodieException import org.apache.hudi.util.PathUtils import org.apache.spark.sql.execution.streaming.{Sink, Source} @@ -124,21 +124,21 @@ class DefaultSource extends RelationProvider } /** - * This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy - * relation here because Spark does not really make use of the relation returned, and just returns an empty - * dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost - * of creating and returning a parquet relation here. - * - * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API. - * That is the only case where Spark seems to actually need a relation to be returned here - * [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]] - * - * @param sqlContext Spark SQL Context - * @param mode Mode for saving the DataFrame at the destination - * @param optParams Parameters passed as part of the DataFrame write operation - * @param rawDf Spark DataFrame to be written - * @return Spark Relation - */ + * This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy + * relation here because Spark does not really make use of the relation returned, and just returns an empty + * dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost + * of creating and returning a parquet relation here. + * + * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API. + * That is the only case where Spark seems to actually need a relation to be returned here + * [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]] + * + * @param sqlContext Spark SQL Context + * @param mode Mode for saving the DataFrame at the destination + * @param optParams Parameters passed as part of the DataFrame write operation + * @param df Spark DataFrame to be written + * @return Spark Relation + */ override def createRelation(sqlContext: SQLContext, mode: SaveMode, optParams: Map[String, String], diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index b7d9429331e..e9201cc66cc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -24,21 +24,18 @@ import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, PAYLOAD_CLASS_N import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieKey, HoodieRecord, HoodieRecordLocation, HoodieSparkRecord, WriteOperationType} -import org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS -import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.model._ import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory -import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, KeyGenerator, SparkKeyGeneratorInterface} +import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface} import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeRowWriter -import org.apache.spark.sql.{DataFrame, HoodieInternalRowUtils} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, HoodieInternalRowUtils} import org.slf4j.LoggerFactory import scala.collection.JavaConversions.mapAsJavaMap @@ -98,7 +95,7 @@ object HoodieCreateRecordUtils { } } // we can skip key generator for prepped flow - val usePreppedInsteadOfKeyGen = preppedSparkSqlWrites && preppedWriteOperation + val usePreppedInsteadOfKeyGen = preppedSparkSqlWrites || preppedWriteOperation // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework // (due to containing cyclic refs), therefore we have to convert it to string before diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 57baba29c92..cf78e514dda 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -146,12 +146,12 @@ object HoodieSparkSqlWriter { toReturn } - def writeInternal(sqlContext: SQLContext, - mode: SaveMode, - optParams: Map[String, String], - sourceDf: DataFrame, - streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): + private def writeInternal(sqlContext: SQLContext, + mode: SaveMode, + optParams: Map[String, String], + sourceDf: DataFrame, + streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") @@ -260,7 +260,7 @@ object HoodieSparkSqlWriter { val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient) - val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto) { + val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) { sourceDf } else { sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS: _*)
