mithalee commented on issue #3336:
URL: https://github.com/apache/hudi/issues/3336#issuecomment-892112742


   @codope Hi, I did try the HoodieDeltaStreamer on below version of EMR:
   Release label:emr-6.3.0
   Hadoop distribution:Amazon 3.2.1
   Applications:Tez 0.9.2, Spark 3.1.1, Hive 3.1.2, JupyterHub 1.2.0, Sqoop 
1.4.7, Zeppelin 0.9.0, Hue 4.9.0, Presto 0.245.1
   HUDI Utility: 
https://mvnrepository.com/artifact/org.apache.hudi/hudi-utilities-bundle_2.12/0.8.0
   The HoodieDeltaStreamer works as expected(upserts/deletes).  But the same 
data set and same spark submit configuration does not work on the Spark on K8 
binaries.
   https://spark.apache.org/docs/3.1.1/submitting-applications.html
   I can perform the initial insert into Hudi table through the below spark 
submit but the upserts/deletes are throwing error. Can you confirm if the 
upserts /deletes works using the DS in Spark on K8s.
   
   **SPARK SUBMIT**
   spark-submit --master k8s://https://...sk1.us-west-1.eks.amazonaws.com 
   --deploy-mode cluster 
   --name spark-hudi 
   --jars 
https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar
 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.executor.instances=1 
   --conf spark.kubernetes.container.image=d.../spark:spark-hudi-0.2 
   --conf spark.kubernetes.namespace=spark-k8 
   --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret 
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem 
   --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com 
   --conf spark.hadoop.fs.s3a.access.key='A........' 
   --conf spark.hadoop.fs.s3a.secret.key='L.........' 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   s3a://l...v/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar 
   --table-type COPY_ON_WRITE --source-ordering-field one 
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource 
   --target-base-path 
s3a://......./hudi-root/transformed-tables/hudi_writer_mm7/ 
   --target-table test_table --base-file-format PARQUET 
   --hoodie-conf hoodie.datasource.write.recordkey.field=one 
   --hoodie-conf hoodie.datasource.write.partitionpath.field=two 
   --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3a://....../jen/example_4.parquet  
   
   **Parquet file example_4.parquet  :**
   import pyarrow.parquet as pq
   
   import numpy as np
   import pandas as pd
   import pyarrow as pa
   import uuid
   
   df = pd.DataFrame({'one': [-1, 3, 2.5],
   'two': [100, 200, 300],
   'three': [True, True, True],
   '_hoodie_is_deleted': [False, False, False]},
   index=list('abc'))
   table = pa.Table.from_pandas(df)
   print(table)
   pq.write_table(table, 'example_4.parquet')
   
   **Delete file:**
   import pyarrow.parquet as pq
   
   import numpy as np
   import pandas as pd
   import pyarrow as pa
   import uuid
   df = pd.DataFrame({'one': [-1, 3, 2.5],
   'two': [100, 200, 300],
   'three': [True, True, True],
   '_hoodie_is_deleted': [True, False, False]},
   index=list('abc'))
   table = pa.Table.from_pandas(df)
   print(table)
   pq.write_table(table, 'example_delete4.parquet')


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to