nikoshet opened a new issue, #8343:
URL: https://github.com/apache/hudi/issues/8343

   **Describe the problem you faced**
   
   Hello, 
   I am experimenting with AWS DMS -> Hudi architecture using `DeltaStreamer` 
with parquet, and I want to partition the files in folders based on year, month 
and day, where the partition field is a column named _created_at_ that is of 
type _TIMESTAMP_ . When using a command to spawn the Spark Job and use 
partition only on year, it works as expected, and the folders inside the S3 
bucket are partitioned correctly. 
   
   The command is:
   ```bash
   spark-submit \
   --jars 
local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar
 \
   --master k8s://http://localhost:8001 --deploy-mode cluster \
   --conf 
spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.kubernetes.namespace=spark \
   --conf 
spark.kubernetes.executor.podTemplateFile=$(pwd)/pod-templates/podTemplateExecutor.yaml
 \
   --conf 
spark.kubernetes.driver.podTemplateFile=$(pwd)/pod-templates/podTemplateDriver.yaml
 \
   --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \
   --conf spark.ui.port=4040 \
   --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
   --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
\
   --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" 
local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
   --table-type COPY_ON_WRITE --op BULK_INSERT \
   --target-base-path s3a://cdc-spike/hudi/postgres/employee \
   --target-table employee \
   --min-sync-interval-seconds 60 \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
   --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
   --hoodie-conf 
"hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
   --source-ordering-field _dms_ingestion_timestamp \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=created_at:TIMESTAMP \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
 \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy" 
\
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="SCALAR" \
   --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit="microseconds"
   ```
   
   Also, when I start a Spark SQL client and load the hudi table, it is read 
without any errors.
   
   
   However, when I am trying to run a command that will use `year,month,day` 
partitioning , thus changing this line:
   ```bash
   --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd" \
   ```
   
   <details>
   <summary>
   i.e:</summary>
   <br>
   
   ```bash
   spark-submit \
   --jars 
local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar
 \
   --master k8s://http://localhost:8001 --deploy-mode cluster \
   --conf 
spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.kubernetes.namespace=spark \
   --conf 
spark.kubernetes.executor.podTemplateFile=$(pwd)/pod-templates/podTemplateExecutor.yaml
 \
   --conf 
spark.kubernetes.driver.podTemplateFile=$(pwd)/pod-templates/podTemplateDriver.yaml
 \
   --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \
   --conf spark.ui.port=4040 \
   --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
   --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
\
   --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" 
local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
   --table-type COPY_ON_WRITE --op BULK_INSERT \
   --target-base-path s3a://cdc-spike/hudi/postgres/employee \
   --target-table employee \
   --min-sync-interval-seconds 60 \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
   --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
   --hoodie-conf 
"hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
   --source-ordering-field _dms_ingestion_timestamp \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=created_at:TIMESTAMP \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
 \
   --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd" \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="SCALAR" \
   --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit="microseconds"
   ```
   
   </br>
   </details>
   
   The folders are split correctly on the S3 bucket, as shown below:
   
   ```bash
   ws s3 ls s3://cdc-spike/hudi/postgres/employee --recursive
   2023-03-31 18:22:27          0 
hudi/postgres/employee/.hoodie/.aux/.bootstrap/.fileids/
   2023-03-31 18:22:26          0 
hudi/postgres/employee/.hoodie/.aux/.bootstrap/.partitions/
   2023-03-31 18:22:22          0 hudi/postgres/employee/.hoodie/.schema/
   2023-03-31 18:23:51          0 hudi/postgres/employee/.hoodie/.temp/
   2023-03-31 18:23:49       1721 
hudi/postgres/employee/.hoodie/20230331152242137.commit
   2023-03-31 18:22:45          0 
hudi/postgres/employee/.hoodie/20230331152242137.commit.requested
   2023-03-31 18:23:10          0 
hudi/postgres/employee/.hoodie/20230331152242137.inflight
   2023-03-31 18:22:23          0 hudi/postgres/employee/.hoodie/archived/
   2023-03-31 18:23:07        697 
hudi/postgres/employee/.hoodie/hoodie.properties
   2023-03-31 18:22:55          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/.aux/.bootstrap/.fileids/
   2023-03-31 18:22:54          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/.aux/.bootstrap/.partitions/
   2023-03-31 18:22:50          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/.schema/
   2023-03-31 18:23:45          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/.temp/
   2023-03-31 18:23:04       5615 
hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit
   2023-03-31 18:23:03        121 
hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight
   2023-03-31 18:23:00          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit.requested
   2023-03-31 18:23:42       6712 
hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit
   2023-03-31 18:23:34       1502 
hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit.inflight
   2023-03-31 18:23:30          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit.requested
   2023-03-31 18:22:51          0 
hudi/postgres/employee/.hoodie/metadata/.hoodie/archived/
   2023-03-31 18:22:56        672 
hudi/postgres/employee/.hoodie/metadata/.hoodie/hoodie.properties
   2023-03-31 18:22:58        124 
hudi/postgres/employee/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0
   2023-03-31 18:23:40      11050 
hudi/postgres/employee/.hoodie/metadata/files/.files-0000_00000000000000.log.2_0-10-10
   2023-03-31 18:23:36         93 
hudi/postgres/employee/.hoodie/metadata/files/.hoodie_partition_metadata
   2023-03-31 18:23:15         96 
hudi/postgres/employee/created_at=2023/03/31/.hoodie_partition_metadata
   2023-03-31 18:23:19     436276 
hudi/postgres/employee/created_at=2023/03/31/30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet
   ```
   
   ,but with the Spark SQL client I get the following error:
   ```bash
   java.lang.ClassCastException: class org.apache.spark.unsafe.types.UTF8String 
cannot be cast to class java.lang.Long 
(org.apache.spark.unsafe.types.UTF8String is in unnamed module of loader 'app'; 
java.lang.Long is in module java.base of loader 'bootstrap')
        at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
        at 
org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:100)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:269)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:280)
        at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:309)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
        at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:554)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   ```
   
   The same error occurs when using spark shell and loading the table using 
DataFrames.
   
   A sample of the data in the Hudi parquet (seem to be ok) file are:
   <details>
   <summary> drop down</summary>
   
   <br>
   
   ```bash
   
+-----------------------+------------------------+----------------------+--------------------------+------------------------------------------------------------------------+------+----------------------------+------+-------------+----------+----------------------------------+
   |   _hoodie_commit_time |   _hoodie_commit_seqno |   _hoodie_record_key | 
_hoodie_partition_path   | _hoodie_file_name                                    
                  | Op   | _dms_ingestion_timestamp   |   id | name        |   
salary | created_at                       |
   
|-----------------------+------------------------+----------------------+--------------------------+------------------------------------------------------------------------+------+----------------------------+------+-------------+----------+----------------------------------|
   |     20230331152242137 |  20230331152242137_0_0 |                    1 | 
created_at=2023/03/31    | 
30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I    | 
2023-03-31 13:06:13.492681 |    1 | Employee 1  |     2000 | 2023-03-31 
14:13:40.973882+00:00 |
   |     20230331152242137 |  20230331152242137_0_1 |                    2 | 
created_at=2023/03/31    | 
30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I    | 
2023-03-31 13:06:13.492721 |    2 | Employee 2  |     5000 | 2023-03-31 
14:13:40.973882+00:00 |
   |     20230331152242137 |  20230331152242137_0_2 |                    3 | 
created_at=2023/03/31    | 
30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I    | 
2023-03-31 13:06:13.492727 |    3 | Employee 3  |     1000 | 2023-03-31 
14:13:40.973882+00:00 |
   ```
   
   </br>
   
   </details>
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a DMS instance that reads from an RDS and writes to S3
   2. Start a Hudi DeltaStreamer Job for a table with a _created_at_ column of 
type TIMESTAMP
   3. Start a Spark SQL client and load the Hudi table
   4. Run a `select` statement on the table
   
   **Expected behavior**
   
   I expected the Spark SQL client to show the table without any errors, since 
the data are partitioned correctly on S3.
   
   **Environment Description**
   
   * Hudi version : `0.13.0`
   
   * Spark version : `3.3.1`
   
   * Hive version : - 
   
   * Hadoop version : - 
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes, _Kubernetes_
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to