novakov-alexey opened a new issue #3759: URL: https://github.com/apache/hudi/issues/3759
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? Yes. Right address is [https://hudi.apache.org/learn/faq/](https://hudi.apache.org/learn/faq/) **Describe the problem you faced** Write operation using builk_insert fails, when writing to non-empty Hudi table. It does not fail if table is empty. **To Reproduce** Steps to reproduce the behavior: 1. Create new table and write some data with **bulk_insert** option. 2. Write the same data batch to this table with **bulk_insert** option. Hudi settings: ```scala val unpartitionDataConfig = Map( HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[ NonPartitionedExtractor ].getName, KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName ) private def options( table: String, primaryKey: String, database: String, operation: String ): Map[String, String] = Map( OPERATION.key -> operation, PRECOMBINE_FIELD.key -> EventTimestampColumn, RECORDKEY_FIELD.key -> primaryKey, TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL, TBL_NAME.key -> table, "hoodie.consistency.check.enabled" -> "true", HIVE_SYNC_MODE.key -> "jdbc", HIVE_SYNC_ENABLED.key -> "true", HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", HIVE_DATABASE.key -> database, HIVE_TABLE.key -> table, UPSERT_PARALLELISM_VALUE.key -> "4", DELETE_PARALLELISM_VALUE.key -> "4", BULKINSERT_PARALLELISM_VALUE.key -> "4" ) ++ unpartitionDataConfig def writerOptions( table: String, primaryKey: String, database: String ): Map[String, String] = { val operation = BULK_INSERT_OPERATION_OPT_VAL options( table, primaryKey, database, operation ) ++ unpartitionDataConfig } ``` Spark main code: ```scala val options = writerOptions( tableName, primaryKey, database ) session.read.format("parquet") .load(inputPath) .write .format("hudi") .options(options) .mode(SaveMode.Overwrite) .save(targetPath) ``` **Expected behavior** Data is overwritten when second step is finished. No logical duplicates in the table. **Environment Description** * Hudi version : 0.9 ("org.apache.hudi" %% "hudi-spark3-bundle" % "0.9.0") Self-package in fat jar with Spark app. * Spark version : 3.1.2 (EMR) * Hive version : AWS Glue * Hadoop version : Hadoop 3.2.1 (EMR) * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** 1. It is dynamic issues. Sometimes it works fine, so that my Spark job can successfully overwrite the table with bulk_insert. 2. I am using JVM concurrency via Scala Spark code by writing several tables via Spark in parallel. Perhaps that leads to some Hudi / Spark thread-safety issue? **Stacktrace** ``` 21/10/07 12:03:18 INFO YarnScheduler: Killing all running tasks in stage 17: Stage cancelled 21/10/07 12:03:18 INFO DAGScheduler: ResultStage 17 (save at HoodieSparkSqlWriter.scala:463) failed in 3.282 s due to Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 32) (ip-10-100-160-252.syncron.local executor 1): org.apache.spark.SparkException: Failed to execute user defined function(UDFRegistration$$Lambda$2098/1888531409: (struct<here comes my table schema in struct format.... it has many columns and they have different logical types>) => string) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42) at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:306) at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:304) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "my_primary_key_column_here" cannot be null or empty. at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141) at org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator.getRecordKey(NonpartitionedAvroKeyGenerator.java:60) at org.apache.hudi.keygen.NonpartitionedKeyGenerator.getRecordKey(NonpartitionedKeyGenerator.java:50) at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:75) at org.apache.spark.sql.UDFRegistration.$anonfun$register$352(UDFRegistration.scala:777) ... 22 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
