This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 26cc766ded7f9b898554a346d1a0d4b6dc8837e9
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Aug 31 21:57:11 2023 -0500

    [HUDI-6579] Fix streaming write when meta cols dropped (#9589)
---
 .../main/scala/org/apache/hudi/DefaultSource.scala | 36 +++++++++++-----------
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  | 11 +++----
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 14 ++++-----
 3 files changed, 29 insertions(+), 32 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 5a0b0a53d33..f982fb1e1c3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,17 +19,17 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, RECORDKEY_FIELD, SPARK_SQL_WRITES_PREPPED_KEY, 
STREAMING_CHECKPOINT_IDENTIFIER}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
-import org.apache.hudi.common.model.{HoodieRecord, WriteConcurrencyMode}
+import org.apache.hudi.common.model.WriteConcurrencyMode
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import 
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
WRITE_CONCURRENCY_MODE}
+import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.util.PathUtils
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -124,21 +124,21 @@ class DefaultSource extends RelationProvider
   }
 
   /**
-    * This DataSource API is used for writing the DataFrame at the 
destination. For now, we are returning a dummy
-    * relation here because Spark does not really make use of the relation 
returned, and just returns an empty
-    * dataset at 
[[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. 
This saves us the cost
-    * of creating and returning a parquet relation here.
-    *
-    * TODO: Revisit to return a concrete relation here when we support CREATE 
TABLE AS for Hudi with DataSource API.
-    *       That is the only case where Spark seems to actually need a 
relation to be returned here
-    *       
[[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
-    *
-    * @param sqlContext Spark SQL Context
-    * @param mode Mode for saving the DataFrame at the destination
-    * @param optParams Parameters passed as part of the DataFrame write 
operation
-    * @param rawDf Spark DataFrame to be written
-    * @return Spark Relation
-    */
+   * This DataSource API is used for writing the DataFrame at the destination. 
For now, we are returning a dummy
+   * relation here because Spark does not really make use of the relation 
returned, and just returns an empty
+   * dataset at 
[[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. 
This saves us the cost
+   * of creating and returning a parquet relation here.
+   *
+   * TODO: Revisit to return a concrete relation here when we support CREATE 
TABLE AS for Hudi with DataSource API.
+   * That is the only case where Spark seems to actually need a relation to be 
returned here
+   * [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
+   *
+   * @param sqlContext Spark SQL Context
+   * @param mode       Mode for saving the DataFrame at the destination
+   * @param optParams  Parameters passed as part of the DataFrame write 
operation
+   * @param df         Spark DataFrame to be written
+   * @return Spark Relation
+   */
   override def createRelation(sqlContext: SQLContext,
                               mode: SaveMode,
                               optParams: Map[String, String],
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index b7d9429331e..e9201cc66cc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -24,21 +24,18 @@ import 
org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, PAYLOAD_CLASS_N
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieKey, HoodieRecord, 
HoodieRecordLocation, HoodieSparkRecord, WriteOperationType}
-import 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
-import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.model._
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, KeyGenerator, 
SparkKeyGeneratorInterface}
+import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, 
SparkKeyGeneratorInterface}
 import org.apache.spark.TaskContext
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeRowWriter
-import org.apache.spark.sql.{DataFrame, HoodieInternalRowUtils}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, HoodieInternalRowUtils}
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions.mapAsJavaMap
@@ -98,7 +95,7 @@ object HoodieCreateRecordUtils {
       }
     }
     // we can skip key generator for prepped flow
-    val usePreppedInsteadOfKeyGen = preppedSparkSqlWrites && 
preppedWriteOperation
+    val usePreppedInsteadOfKeyGen = preppedSparkSqlWrites || 
preppedWriteOperation
 
     // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native 
serialization framework
     //       (due to containing cyclic refs), therefore we have to convert it 
to string before
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 57baba29c92..cf78e514dda 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -146,12 +146,12 @@ object HoodieSparkSqlWriter {
     toReturn
   }
 
-  def writeInternal(sqlContext: SQLContext,
-                    mode: SaveMode,
-                    optParams: Map[String, String],
-                    sourceDf: DataFrame,
-                    streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
-                    hoodieWriteClient: Option[SparkRDDWriteClient[_]] = 
Option.empty):
+  private def writeInternal(sqlContext: SQLContext,
+                            mode: SaveMode,
+                            optParams: Map[String, String],
+                            sourceDf: DataFrame,
+                            streamingWritesParamsOpt: 
Option[StreamingWriteParams] = Option.empty,
+                            hoodieWriteClient: Option[SparkRDDWriteClient[_]] 
= Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), 
"'path' must be set")
@@ -260,7 +260,7 @@ object HoodieSparkSqlWriter {
 
       val shouldReconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
       val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, 
tableMetaClient)
-      val df = if (preppedWriteOperation || preppedSparkSqlWrites || 
preppedSparkSqlMergeInto) {
+      val df = if (preppedWriteOperation || preppedSparkSqlWrites || 
preppedSparkSqlMergeInto || sourceDf.isStreaming) {
         sourceDf
       } else {
         sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS: _*)

Reply via email to