santoshraj123 opened a new issue, #6504:
URL: https://github.com/apache/hudi/issues/6504
Hello, we are facing issue when running the spark-submit command after
performing a DELETE operation on the PostGres database. The spark command,
schema and properties file are given. The spark command generates the target
Hudi tables after an INSERT of a row into the database, successfully. But, it
fails with a "rolled-back" HudiException when after running the Spark command.
We are using EMR version 6.7.0 with Hudi 0.11.0-amzn-0. The source of the data
is a PostGres database. AWS DMS generates the parquet file from Postgres and
lands the datasets into S3 landing zone. We tried both COPY_ON_WRITE and
MERGE_ON_READ, yet DELETEs fail.
Environment information:
----------------------
Hudi version : 0.11.0-amzn-0
Spark version : version 3.2.1-amzn-0
Hive version : 3.1.3
Scala version : 2.12.15
Hadoop version : xxx
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : no
DMS Engine Version: 3.4.7
**Spark command**
-----------------
sudo spark-submit --jars
/usr/lib/spark/external/lib/spark-avro.jar,/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hudi/hudi-utilities-bundle.jar
\
--master yarn \
--deploy-mode client \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.hive.convertMetastoreParquet=false \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
/usr/lib/hudi/hudi-utilities-bundle.jar \
--table-type MERGE_ON_READ \
--source-ordering-field order_id \
--props s3_url/hoodie-glue.properties \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path s3_url/v_hudi_orders \
--target-table v_hudi_orders --payload-class
org.apache.hudi.common.model.AWSDmsAvroPayload \
--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--transformer-class
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT Op,
dms_received_ts, col1, col2, col3, CASE WHEN a.Op = 'D' THEN true ELSE false
END as _hoodie_is_deleted FROM <SRC> a" \
--op BULK_INSERT
**Stacktrace**:
-----------
22/08/23 15:15:46 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
22/08/23 15:15:46 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hudi.exception.HoodieException: Commit
20220823151531894 failed and rolled-back !
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:649)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22/08/23 15:15:46 INFO ShutdownHookManager: Shutdown hook called
Properties file
-----------------
hoodie.table.name=t_hudi_able
hoodie.table.type=MERGE_ON_READ
hoodie.deltastreamer.source.dfs.root=s3_URL
hoodie.datasource.write.recordkey.field=col1 (pk)
hoodie.datasource.write.partitionpath.field=col3
hoodie.datasource.write.precombine.field=ts (DMS generated)
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.table=t_hudi_able
hoodie.datasource.hive_sync.database=default
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.hive_sync.partition_fields=col3
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.deltastreamer.schemaprovider.source.schema.file=s3_url/source_schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=s3_url/target_schema.avsc
hoodie.datasource.hive_sync.mode=hms
Target schema file
-----------------
{
"type": "record",
"name": "orders",
"fields": [
{
"name": "Op",
"type": "string"
},
{
"name": "ts",
"type": [ "null", "string" ]
},
{
"name": "col1",
"type": [ "null", "int" ]
},
{
"name": "col12",
"type": [ "null", "int" ]
},
{
"name": "col3",
"type": [ "null", "string" ]
},
{
"name" : "_hoodie_is_deleted",
"type": "boolean"
}
]
}
Source schema file
-----------------
{
"type": "record",
"name": "orders",
"fields": [
{
"name": "Op",
"type": "string"
},
{
"name": "ts",
"type": [ "null", "string" ]
},
{
"name": "col1",
"type": [ "null", "int" ]
},
{
"name": "col12",
"type": [ "null", "int" ]
},
{
"name": "col3",
"type": [ "null", "string" ]
}
]
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]