This is an automated email from the ASF dual-hosted git repository.

bhavanisudha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e0f5e5  [HUDI-852] adding check for table name for Append Save mode  
(#1580)
5e0f5e5 is described below

commit 5e0f5e5521c52a08c284a92a3a6a00e34805cce5
Author: AakashPradeep <[email protected]>
AuthorDate: Sun May 3 23:09:17 2020 -0700

    [HUDI-852] adding check for table name for Append Save mode  (#1580)
    
    * adding check for table name for Append Save mode
    
    * adding existing table validation for delete and upsert operation
    
    Co-authored-by: Aakash Pradeep <[email protected]>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  9 +++-
 .../apache/hudi/HoodieSparkSqlWriterSuite.scala    | 60 ++++++++++++++++++++--
 2 files changed, 64 insertions(+), 5 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 66b6145..5456782 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
+import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecordPayload
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
-import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
@@ -83,6 +83,13 @@ private[hudi] object HoodieSparkSqlWriter {
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
 
+    if (exists && mode == SaveMode.Append) {
+      val existingTableName = new 
HoodieTableMetaClient(sparkContext.hadoopConfiguration, 
path.get).getTableConfig.getTableName
+      if (!existingTableName.equals(tblName.get)) {
+        throw new HoodieException(s"hoodie table with name $existingTableName 
already exist at $basePath")
+      }
+    }
+
     val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
       if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
       // register classes & schemas
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala 
b/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index 58ca984..bb82f8d 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.hudi
 
+import java.util.{Date, UUID}
+
+import org.apache.commons.io.FileUtils
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
@@ -43,10 +46,59 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 
   test("throw hoodie exception when invalid serializer") {
     val session = 
SparkSession.builder().appName("hoodie_test").master("local").getOrCreate()
-    val sqlContext = session.sqlContext
-    val options = Map("path" -> "hoodie/test/path", 
HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
-    val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.ErrorIfExists, options, session.emptyDataFrame))
-    assert(e.getMessage.contains("spark.serializer"))
+    try {
+      val sqlContext = session.sqlContext
+      val options = Map("path" -> "hoodie/test/path", 
HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
+      val e = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.ErrorIfExists, options, session.emptyDataFrame))
+      assert(e.getMessage.contains("spark.serializer"))
+    } finally {
+      session.stop()
+    }
+  }
+
+
+  test("throw hoodie exception when there already exist a table with different 
name with Append Save mode") {
+
+    val session = SparkSession.builder()
+      .appName("test_append_mode")
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+    try {
+
+      val sqlContext = session.sqlContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        "hoodie.insert.shuffle.parallelism" -> "4",
+        "hoodie.upsert.shuffle.parallelism" -> "4")
+      val fooTableParams = 
HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+      val dataFrame = 
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new 
Date().getTime)))
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
dataFrame)
+
+      //on same path try append with different("hoodie_bar_tbl") table name 
which should throw an exception
+      val barTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> "hoodie_bar_tbl",
+        "hoodie.insert.shuffle.parallelism" -> "4",
+        "hoodie.upsert.shuffle.parallelism" -> "4")
+      val barTableParams = 
HoodieSparkSqlWriter.parametersWithWriteDefaults(barTableModifier)
+      val dataFrame2 = 
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new 
Date().getTime)))
+      val tableAlreadyExistException = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.Append, barTableParams, dataFrame2))
+      assert(tableAlreadyExistException.getMessage.contains("hoodie table with 
name " + hoodieFooTableName + " already exist"))
+
+      //on same path try append with delete operation and 
different("hoodie_bar_tbl") table name which should throw an exception
+      val deleteTableParams = barTableParams ++ Map(OPERATION_OPT_KEY -> 
"delete")
+      val deleteCmdException = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.Append, deleteTableParams, dataFrame2))
+      assert(deleteCmdException.getMessage.contains("hoodie table with name " 
+ hoodieFooTableName + " already exist"))
+    } finally {
+      session.stop()
+      FileUtils.deleteDirectory(path.toFile)
+    }
   }
 
+  case class Test(uuid: String, ts: Long)
+
 }

Reply via email to