This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new de42adc [HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428) de42adc is described below commit de42adc2302528541145a714078cc6d10cbb8d9a Author: lw0090 <lw309637...@gmail.com> AuthorDate: Tue Jan 12 01:07:47 2021 +0800 [HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428) --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 13 ++++++------- .../org/apache/hudi/functional/TestCOWDataSource.scala | 3 ++- .../org/apache/hudi/functional/TestMORDataSource.scala | 1 - 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 472f450..4e9caa5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -94,13 +94,6 @@ private[hudi] object HoodieSparkSqlWriter { operation = WriteOperationType.INSERT } - // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE. - // Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite - // the table. This will replace the old fs.delete(tablepath) mode. - if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { - operation = WriteOperationType.INSERT_OVERWRITE_TABLE - } - val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(path.get) val instantTime = HoodieActiveTimeline.createNewInstantTime() @@ -340,6 +333,12 @@ private[hudi] object HoodieSparkSqlWriter { if (operation != WriteOperationType.DELETE) { if (mode == SaveMode.ErrorIfExists && tableExists) { throw new HoodieException(s"hoodie table at $tablePath already exists.") + } else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { + // When user set operation as INSERT_OVERWRITE_TABLE, + // overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation + log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.") + fs.delete(tablePath, true) + tableExists = false } } else { // Delete Operation only supports Append mode diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 730b7d2..b15a7d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -202,7 +202,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) .save(basePath) @@ -229,6 +229,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) .save(basePath) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 121957e..1ea6ceb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -278,7 +278,6 @@ class TestMORDataSource extends HoodieClientTestBase { val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2)) inputDF5.write.format("org.apache.hudi") .options(commonOpts) - .option("hoodie.compact.inline", "true") .mode(SaveMode.Append) .save(basePath) val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)