This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9f9d7 [HUDI-1991] Fixing drop dups exception in bulk insert row
writer path (#3055)
7d9f9d7 is described below
commit 7d9f9d7d8241bfb70d50c557b0194cc8a87b6ee7
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Jun 13 21:55:52 2021 -0400
[HUDI-1991] Fixing drop dups exception in bulk insert row writer path
(#3055)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 ++
.../functional/HoodieSparkSqlWriterSuite.scala | 53 ++++++++++++++++++----
2 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 794ea6f..f992a97 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -333,6 +333,9 @@ object HoodieSparkSqlWriter {
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema,
structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
+ if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+ throw new HoodieException("Dropping duplicates with bulk_insert in row
writer path is not supported yet")
+ }
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA,
schema.toString)
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString,
path.get, tblName, mapAsJavaMap(params))
val hoodieDF =
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext,
writeConfig, df, structName, nameSpace)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index f4b1d82..10141fb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -172,17 +172,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
}
})
+ test("test drop duplicates row writing for bulk_insert") {
+ initSparkContext("test_append_mode")
+ val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+ try {
+
+ val hoodieFooTableName = "hoodie_foo_tbl"
+
+ //create a new table
+ val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+ DataSourceWriteOptions.TABLE_TYPE_OPT_KEY ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+ "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
+ INSERT_DROP_DUPS_OPT_KEY -> "true",
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
"org.apache.hudi.keygen.SimpleKeyGenerator")
+ val fooTableParams =
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+ // generate the inserts
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val records = DataSourceTestUtils.generateRandomRows(100)
+ val recordsSeq = convertRowListToSeq(records)
+ val df =
spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
+ // write to Hudi
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams,
df)
+ fail("Drop duplicates with bulk insert in row writing should have thrown
exception")
+ } catch {
+ case e: HoodieException => assertTrue(e.getMessage.contains("Dropping
duplicates with bulk_insert in row writer path is not supported yet"))
+ } finally {
+ spark.stop()
+ FileUtils.deleteDirectory(path.toFile)
+ }
+ }
+
test("test insert dataset without precombine field") {
- val session = SparkSession.builder()
- .appName("test_insert_without_precombine")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate()
+ initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
- val sqlContext = session.sqlContext
- val sc = session.sparkContext
+ val sqlContext = spark.sqlContext
+ val sc = spark.sparkContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
@@ -201,7 +234,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
- val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
+ val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams -
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df)
@@ -215,7 +248,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
}
// fetch all records from parquet files generated from write to hudi
- val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
+ val actualDf = spark.sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be
compared as is
val trimmedDf =
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -224,7 +257,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
assert(df.except(trimmedDf).count() == 0)
} finally {
- session.stop()
+ spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}