This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 288bc4dd809521af896d0538ec2ae87bc034df7b Author: AakashPradeep <[email protected]> AuthorDate: Sun May 3 23:09:17 2020 -0700 [HUDI-852] adding check for table name for Append Save mode (#1580) * adding check for table name for Append Save mode * adding existing table validation for delete and upsert operation Co-authored-by: Aakash Pradeep <[email protected]> --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 7 +++ .../apache/hudi/HoodieSparkSqlWriterSuite.scala | 60 ++++++++++++++++++++-- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 326595f..e5886eb 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -82,6 +82,13 @@ private[hudi] object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) + if (exists && mode == SaveMode.Append) { + val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName + if (!existingTableName.equals(tblName.get)) { + throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") + } + } + val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas diff --git a/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala index 58ca984..bb82f8d 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala @@ -17,6 +17,9 @@ package org.apache.hudi +import java.util.{Date, UUID} + +import org.apache.commons.io.FileUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException @@ -43,10 +46,59 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("throw hoodie exception when invalid serializer") { val session = SparkSession.builder().appName("hoodie_test").master("local").getOrCreate() - val sqlContext = session.sqlContext - val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl") - val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame)) - assert(e.getMessage.contains("spark.serializer")) + try { + val sqlContext = session.sqlContext + val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl") + val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame)) + assert(e.getMessage.contains("spark.serializer")) + } finally { + session.stop() + } + } + + + test("throw hoodie exception when there already exist a table with different name with Append Save mode") { + + val session = SparkSession.builder() + .appName("test_append_mode") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { + + val sqlContext = session.sqlContext + val hoodieFooTableName = "hoodie_foo_tbl" + + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4") + val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier) + val dataFrame = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) + + //on same path try append with different("hoodie_bar_tbl") table name which should throw an exception + val barTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> "hoodie_bar_tbl", + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4") + val barTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(barTableModifier) + val dataFrame2 = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) + val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2)) + assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) + + //on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception + val deleteTableParams = barTableParams ++ Map(OPERATION_OPT_KEY -> "delete") + val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2)) + assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) + } finally { + session.stop() + FileUtils.deleteDirectory(path.toFile) + } } + case class Test(uuid: String, ts: Long) + }
