santoshraj123 opened a new issue, #6504:
URL: https://github.com/apache/hudi/issues/6504

   
   Hello, we are facing issue when running the spark-submit command after 
performing a DELETE operation on the PostGres database.  The spark command, 
schema and properties file are given.  The spark command generates the target 
Hudi tables after an INSERT of a row into the database, successfully.  But, it 
fails with a "rolled-back" HudiException when after running the Spark command. 
We are using EMR version 6.7.0 with Hudi 0.11.0-amzn-0.  The source of the data 
is a PostGres database.  AWS DMS generates the parquet file from Postgres and 
lands the datasets into S3 landing zone.  We tried both COPY_ON_WRITE and 
MERGE_ON_READ, yet DELETEs fail. 
   
   Environment information:
   ----------------------
   Hudi version : 0.11.0-amzn-0
   Spark version : version 3.2.1-amzn-0
   Hive version :  3.1.3
   Scala version : 2.12.15
   Hadoop version : xxx
   Storage (HDFS/S3/GCS..) : S3
   Running on Docker? (yes/no) : no
   DMS Engine Version: 3.4.7
   
   **Spark command**
   -----------------
   sudo spark-submit --jars 
/usr/lib/spark/external/lib/spark-avro.jar,/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hudi/hudi-utilities-bundle.jar
  \
   --master yarn  \
   --deploy-mode client  \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
   --conf spark.sql.hive.convertMetastoreParquet=false   \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/usr/lib/hudi/hudi-utilities-bundle.jar  \
   --table-type MERGE_ON_READ   \
   --source-ordering-field order_id    \
   --props s3_url/hoodie-glue.properties   \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource   \
   --target-base-path s3_url/v_hudi_orders   \
   --target-table v_hudi_orders --payload-class 
org.apache.hudi.common.model.AWSDmsAvroPayload \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
   --hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT Op, 
dms_received_ts, col1, col2, col3, CASE WHEN a.Op = 'D' THEN true ELSE false  
END as _hoodie_is_deleted  FROM <SRC> a" \
   --op BULK_INSERT
   
   **Stacktrace**:
   -----------
   22/08/23 15:15:46 INFO 
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
   22/08/23 15:15:46 INFO SparkContext: Successfully stopped SparkContext
   Exception in thread "main" org.apache.hudi.exception.HoodieException: Commit 
20220823151531894 failed and rolled-back !
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:649)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
           at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   22/08/23 15:15:46 INFO ShutdownHookManager: Shutdown hook called
   
   Properties file
   -----------------
   hoodie.table.name=t_hudi_able
   hoodie.table.type=MERGE_ON_READ
   hoodie.deltastreamer.source.dfs.root=s3_URL
   hoodie.datasource.write.recordkey.field=col1 (pk)
   hoodie.datasource.write.partitionpath.field=col3
   hoodie.datasource.write.precombine.field=ts (DMS generated)
   hoodie.datasource.hive_sync.enable=true
   hoodie.datasource.hive_sync.table=t_hudi_able
   hoodie.datasource.hive_sync.database=default
   hoodie.datasource.write.hive_style_partitioning=true
   hoodie.datasource.hive_sync.partition_fields=col3
   
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.datasource.hive_sync.use_jdbc=false
   
   
hoodie.deltastreamer.schemaprovider.source.schema.file=s3_url/source_schema.avsc
   
hoodie.deltastreamer.schemaprovider.target.schema.file=s3_url/target_schema.avsc
   hoodie.datasource.hive_sync.mode=hms
   
   Target schema file
   -----------------
   {  
     "type": "record",
     "name": "orders",
     "fields": [
        {
         "name": "Op",
          "type": "string"
       },
        {
        "name": "ts",
        "type": [ "null", "string" ]
        },      
       {
        "name": "col1",
        "type": [ "null", "int" ]
        }, 
        {
        "name": "col12",
        "type": [ "null", "int" ]
        }, 
        {
        "name": "col3",
        "type": [ "null", "string" ]
        },       
        {
       "name" : "_hoodie_is_deleted",
        "type": "boolean" 
     }
   
     ]
   }
   
   Source schema file
   -----------------
   {  
     "type": "record",
     "name": "orders",
     "fields": [
        {
         "name": "Op",
          "type": "string"
       },
        {
        "name": "ts",
        "type": [ "null", "string" ]
        },      
       {
        "name": "col1",
        "type": [ "null", "int" ]
        }, 
        {
        "name": "col12",
        "type": [ "null", "int" ]
        }, 
        {
        "name": "col3",
        "type": [ "null", "string" ]
        }
   
     ]
   }
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to