novakov-alexey opened a new issue #3759:
URL: https://github.com/apache/hudi/issues/3759


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   Yes. Right address is 
[https://hudi.apache.org/learn/faq/](https://hudi.apache.org/learn/faq/)
   
   **Describe the problem you faced**
   
   Write operation using builk_insert fails, when writing to non-empty Hudi 
table. It does not fail if table is empty.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create new table and write some data with **bulk_insert** option.
   2. Write the same data batch to this table with **bulk_insert** option.
   
   Hudi settings:
   
   ```scala
   val unpartitionDataConfig = Map(
       HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[
         NonPartitionedExtractor
       ].getName,
       KEYGENERATOR_CLASS_NAME.key -> 
classOf[NonpartitionedKeyGenerator].getName
     )
   
   private def options(
         table: String,
         primaryKey: String,
         database: String,
         operation: String
     ): Map[String, String] =
       Map(
         OPERATION.key -> operation,
         PRECOMBINE_FIELD.key -> EventTimestampColumn,
         RECORDKEY_FIELD.key -> primaryKey,
         TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL,
         TBL_NAME.key -> table,
         "hoodie.consistency.check.enabled" -> "true",
         HIVE_SYNC_MODE.key -> "jdbc",
         HIVE_SYNC_ENABLED.key -> "true",
         HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
         HIVE_DATABASE.key -> database,
         HIVE_TABLE.key -> table,
         UPSERT_PARALLELISM_VALUE.key -> "4",
         DELETE_PARALLELISM_VALUE.key -> "4",
         BULKINSERT_PARALLELISM_VALUE.key -> "4"
       ) ++ unpartitionDataConfig
   
     def writerOptions(
         table: String,
         primaryKey: String,
         database: String
     ): Map[String, String] = {
       val operation = BULK_INSERT_OPERATION_OPT_VAL
       options(
         table,
         primaryKey,
         database,
         operation
       ) ++ unpartitionDataConfig
     }
   ```
   
   Spark main code:
   
   ```scala
   val options = writerOptions(
       tableName,
       primaryKey,
       database
     )
   session.read.format("parquet")
   .load(inputPath)
   .write
   .format("hudi")
   .options(options)
   .mode(SaveMode.Overwrite)
   .save(targetPath)
   ```
   
   **Expected behavior**
   
   Data is overwritten when second step is finished. No logical duplicates in 
the table.
   
   **Environment Description**
   
   * Hudi version : 0.9 ("org.apache.hudi" %% "hudi-spark3-bundle" % "0.9.0") 
Self-package in fat jar with Spark app.
   
   * Spark version : 3.1.2 (EMR)
   
   * Hive version : AWS Glue
   
   * Hadoop version : Hadoop 3.2.1 (EMR)
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   1. It is dynamic issues. Sometimes it works fine, so that my Spark job can 
successfully overwrite the table with bulk_insert.
   2. I am using JVM concurrency via Scala Spark code by writing several tables 
via Spark in parallel. Perhaps that leads to some Hudi / Spark thread-safety 
issue?
   
   **Stacktrace**
   ```
   21/10/07 12:03:18 INFO YarnScheduler: Killing all running tasks in stage 17: 
Stage cancelled
   21/10/07 12:03:18 INFO DAGScheduler: ResultStage 17 (save at 
HoodieSparkSqlWriter.scala:463) failed in 3.282 s due to Job aborted due to 
stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 17.0 (TID 32) (ip-10-100-160-252.syncron.local executor 1): 
org.apache.spark.SparkException: Failed to execute user defined 
function(UDFRegistration$$Lambda$2098/1888531409: (struct<here comes my table 
schema in struct format.... it has many columns and they have different logical 
types>) => string)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at 
org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42)
        at 
org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:306)
        at 
org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:304)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: 
"null" for field: "my_primary_key_column_here" cannot be null or empty.
        at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
        at 
org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator.getRecordKey(NonpartitionedAvroKeyGenerator.java:60)
        at 
org.apache.hudi.keygen.NonpartitionedKeyGenerator.getRecordKey(NonpartitionedKeyGenerator.java:50)
        at 
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
        at 
org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:75)
        at 
org.apache.spark.sql.UDFRegistration.$anonfun$register$352(UDFRegistration.scala:777)
        ... 22 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to