This is an automated email from the ASF dual-hosted git repository.
bhavanisudha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e0f5e5 [HUDI-852] adding check for table name for Append Save mode
(#1580)
5e0f5e5 is described below
commit 5e0f5e5521c52a08c284a92a3a6a00e34805cce5
Author: AakashPradeep <[email protected]>
AuthorDate: Sun May 3 23:09:17 2020 -0700
[HUDI-852] adding check for table name for Append Save mode (#1580)
* adding check for table name for Append Save mode
* adding existing table validation for delete and upsert operation
Co-authored-by: Aakash Pradeep <[email protected]>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 9 +++-
.../apache/hudi/HoodieSparkSqlWriterSuite.scala | 60 ++++++++++++++++++++--
2 files changed, 64 insertions(+), 5 deletions(-)
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 66b6145..5456782 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
+import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
-import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
@@ -83,6 +83,13 @@ private[hudi] object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
+ if (exists && mode == SaveMode.Append) {
+ val existingTableName = new
HoodieTableMetaClient(sparkContext.hadoopConfiguration,
path.get).getTableConfig.getTableName
+ if (!existingTableName.equals(tblName.get)) {
+ throw new HoodieException(s"hoodie table with name $existingTableName
already exist at $basePath")
+ }
+ }
+
val (writeStatuses, writeClient:
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
diff --git
a/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
b/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index 58ca984..bb82f8d 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -17,6 +17,9 @@
package org.apache.hudi
+import java.util.{Date, UUID}
+
+import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
@@ -43,10 +46,59 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
test("throw hoodie exception when invalid serializer") {
val session =
SparkSession.builder().appName("hoodie_test").master("local").getOrCreate()
- val sqlContext = session.sqlContext
- val options = Map("path" -> "hoodie/test/path",
HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
- val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.ErrorIfExists, options, session.emptyDataFrame))
- assert(e.getMessage.contains("spark.serializer"))
+ try {
+ val sqlContext = session.sqlContext
+ val options = Map("path" -> "hoodie/test/path",
HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
+ val e =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.ErrorIfExists, options, session.emptyDataFrame))
+ assert(e.getMessage.contains("spark.serializer"))
+ } finally {
+ session.stop()
+ }
+ }
+
+
+ test("throw hoodie exception when there already exist a table with different
name with Append Save mode") {
+
+ val session = SparkSession.builder()
+ .appName("test_append_mode")
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .getOrCreate()
+ val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+ try {
+
+ val sqlContext = session.sqlContext
+ val hoodieFooTableName = "hoodie_foo_tbl"
+
+ //create a new table
+ val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4")
+ val fooTableParams =
HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+ val dataFrame =
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new
Date().getTime)))
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams,
dataFrame)
+
+ //on same path try append with different("hoodie_bar_tbl") table name
which should throw an exception
+ val barTableModifier = Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TABLE_NAME -> "hoodie_bar_tbl",
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4")
+ val barTableParams =
HoodieSparkSqlWriter.parametersWithWriteDefaults(barTableModifier)
+ val dataFrame2 =
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new
Date().getTime)))
+ val tableAlreadyExistException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, barTableParams, dataFrame2))
+ assert(tableAlreadyExistException.getMessage.contains("hoodie table with
name " + hoodieFooTableName + " already exist"))
+
+ //on same path try append with delete operation and
different("hoodie_bar_tbl") table name which should throw an exception
+ val deleteTableParams = barTableParams ++ Map(OPERATION_OPT_KEY ->
"delete")
+ val deleteCmdException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, deleteTableParams, dataFrame2))
+ assert(deleteCmdException.getMessage.contains("hoodie table with name "
+ hoodieFooTableName + " already exist"))
+ } finally {
+ session.stop()
+ FileUtils.deleteDirectory(path.toFile)
+ }
}
+ case class Test(uuid: String, ts: Long)
+
}