This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9f9d7  [HUDI-1991] Fixing drop dups exception in bulk insert row 
writer path (#3055)
7d9f9d7 is described below

commit 7d9f9d7d8241bfb70d50c557b0194cc8a87b6ee7
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Jun 13 21:55:52 2021 -0400

    [HUDI-1991] Fixing drop dups exception in bulk insert row writer path 
(#3055)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  3 ++
 .../functional/HoodieSparkSqlWriterSuite.scala     | 53 ++++++++++++++++++----
 2 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 794ea6f..f992a97 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -333,6 +333,9 @@ object HoodieSparkSqlWriter {
     val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, 
structName, nameSpace)
     sparkContext.getConf.registerAvroSchemas(schema)
     log.info(s"Registered avro schema : ${schema.toString(true)}")
+    if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+      throw new HoodieException("Dropping duplicates with bulk_insert in row 
writer path is not supported yet")
+    }
     val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA, 
schema.toString)
     val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, 
path.get, tblName, mapAsJavaMap(params))
     val hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, 
writeConfig, df, structName, nameSpace)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index f4b1d82..10141fb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -172,17 +172,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
     })
 
+  test("test drop duplicates row writing for bulk_insert") {
+    initSparkContext("test_append_mode")
+    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+    try {
+
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+        "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+        DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+        DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
+        INSERT_DROP_DUPS_OPT_KEY -> "true",
+        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+      val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+      // generate the inserts
+      val schema = DataSourceTestUtils.getStructTypeExampleSchema
+      val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+      val records = DataSourceTestUtils.generateRandomRows(100)
+      val recordsSeq = convertRowListToSeq(records)
+      val df = 
spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
+      // write to Hudi
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
df)
+      fail("Drop duplicates with bulk insert in row writing should have thrown 
exception")
+    } catch {
+      case e: HoodieException => assertTrue(e.getMessage.contains("Dropping 
duplicates with bulk_insert in row writer path is not supported yet"))
+    } finally {
+      spark.stop()
+      FileUtils.deleteDirectory(path.toFile)
+    }
+  }
+
   test("test insert dataset without precombine field") {
-    val session = SparkSession.builder()
-      .appName("test_insert_without_precombine")
-      .master("local[2]")
-      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .getOrCreate()
+    initSparkContext("test_bulk_insert_datasource")
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
     try {
 
-      val sqlContext = session.sqlContext
-      val sc = session.sparkContext
+      val sqlContext = spark.sqlContext
+      val sc = spark.sparkContext
       val hoodieFooTableName = "hoodie_foo_tbl"
 
       //create a new table
@@ -201,7 +234,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
       val records = DataSourceTestUtils.generateRandomRows(100)
       val recordsSeq = convertRowListToSeq(records)
-      val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
+      val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
       // write to Hudi
       HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - 
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df)
 
@@ -215,7 +248,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
 
       // fetch all records from parquet files generated from write to hudi
-      val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
+      val actualDf = spark.sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
 
       // remove metadata columns so that expected and actual DFs can be 
compared as is
       val trimmedDf = 
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -224,7 +257,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 
       assert(df.except(trimmedDf).count() == 0)
     } finally {
-      session.stop()
+      spark.stop()
       FileUtils.deleteDirectory(path.toFile)
     }
   }

Reply via email to