This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b245aa7098 Fix race condition in HoodieSparkSqlWriter (#9749)
2b245aa7098 is described below
commit 2b245aa709852f8023a703bd4d574b0491f2c96b
Author: StreamingFlames <[email protected]>
AuthorDate: Sun Oct 22 05:24:16 2023 -0500
Fix race condition in HoodieSparkSqlWriter (#9749)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 47 ++++++++++++++++++++--
.../TestSparkDataSourceDAGExecution.scala | 2 +-
2 files changed, 44 insertions(+), 5 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 581cf2bdc69..49ee4c4b670 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -27,6 +27,7 @@ import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable
import
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
+import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE,
SQL_MERGE_INTO_WRITES, StreamingWriteParams}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.{canProject,
isCompatibleProjectionOf, isSchemaCompatible, resolveNullableSchema}
import org.apache.hudi.avro.HoodieAvroUtils
@@ -110,6 +111,48 @@ object HoodieSparkSqlWriter {
*/
val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id"
+ def write(sqlContext: SQLContext,
+ mode: SaveMode,
+ optParams: Map[String, String],
+ sourceDf: DataFrame,
+ streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+ (Boolean, HOption[String], HOption[String], HOption[String],
SparkRDDWriteClient[_], HoodieTableConfig) = {
+ new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams,
sourceDf, streamingWritesParamsOpt, hoodieWriteClient)
+ }
+
+ def bootstrap(sqlContext: SQLContext,
+ mode: SaveMode,
+ optParams: Map[String, String],
+ df: DataFrame,
+ hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
+ streamingWritesParamsOpt: Option[StreamingWriteParams] =
Option.empty,
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] =
Option.empty): Boolean = {
+ new HoodieSparkSqlWriterInternal().bootstrap(sqlContext, mode, optParams,
df, hoodieTableConfigOpt, streamingWritesParamsOpt, hoodieWriteClient)
+ }
+
+ /**
+ * Deduces writer's schema based on
+ * <ul>
+ * <li>Source's schema</li>
+ * <li>Target table's schema (including Hudi's [[InternalSchema]]
representation)</li>
+ * </ul>
+ */
+ def deduceWriterSchema(sourceSchema: Schema,
+ latestTableSchemaOpt: Option[Schema],
+ internalSchemaOpt: Option[InternalSchema],
+ opts: Map[String, String]): Schema = {
+ new HoodieSparkSqlWriterInternal().deduceWriterSchema(sourceSchema,
latestTableSchemaOpt, internalSchemaOpt, opts)
+ }
+
+ def cleanup(): Unit = {
+ Metrics.shutdownAllMetrics()
+ }
+
+}
+
+class HoodieSparkSqlWriterInternal {
+
private val log = LoggerFactory.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
@@ -949,10 +992,6 @@ object HoodieSparkSqlWriter {
}
}
- def cleanup() : Unit = {
- Metrics.shutdownAllMetrics()
- }
-
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath:
Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem):
Unit = {
if (mode == SaveMode.Append && tableExists) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
index 52e1ae812c9..15b4cda243d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
@@ -89,7 +89,7 @@ class TestSparkDataSourceDAGExecution extends
HoodieSparkClientTestBase with Sca
@CsvSource(Array(
"upsert,org.apache.hudi.client.SparkRDDWriteClient.commit",
"insert,org.apache.hudi.client.SparkRDDWriteClient.commit",
- "bulk_insert,org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow"))
+
"bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow"))
def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event:
String): Unit = {
// register stage event listeners
val stageListener = new StageListener(event)