CrazyBeeline opened a new issue, #7102:
URL: https://github.com/apache/hudi/issues/7102

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. extract  data from kafka used by HoodieDeltaStreamer 
   
   spark-submit --master yarn   \
     --deploy-mode client \
    -c spark.driver.memory=${spark_driver_memory} \
    -c spark.driver.cores=${spark_driver_cores} \
    -c spark.driver.memoryOverhead=${spark_driver_memoryOverhead} \
    -c spark.executor.memory=${spark_executor_memory}  \
    -c spark.executor.cores=${spark_executor_cores}  \
    -c spark.executor.instances=${spark_executor_num} \
    -c spark.executor.memoryOverhead=${spark_executor_memoryOverhead} \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  
${jar_path}/${jar_file}  \
   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
   --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
    --target-base-path ${table_file_dir}/test_aa \
   --continuous \
   --schemaprovider-class org.apache.hudi.utilities.schema.MySchemaProvider \
    --enable-sync \
   --target-table test_aa \
   --table-type MERGE_ON_READ \
   --min-sync-interval-seconds 10 \
   --source-limit  100 \
   --op UPSERT
   
   hudi conf ----------
   
   "hoodie.upsert.shuffle.parallelism=10"
   "hoodie.insert.shuffle.parallelism=10"
   "hoodie.bulkinsert.shuffle.parallelism=10"
   "hoodie.delete.shuffle.parallelism=10"
   "hoodie.rollback.parallelism=10"
   "hoodie.cleaner.parallelism=10"
   "hoodie.datasource.write.partitionpath.field=dt:SIMPLE"
   "hoodie.datasource.write.hive_style_partitioning=true"
   
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator"
   "hoodie.datasource.write.recordkey.field=id,accessTime"
   "hoodie.datasource.write.precombine.field=accessTime"
   "hoodie.deltastreamer.source.kafka.topic=${kafka_topic}"
   "bootstrap.servers=${kafka_servers}"
   "auto.offset.reset=earliest"
   "hoodie.write.commit.callback.on=true"
   
"hoodie.write.commit.callback.class=org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback"
   "hoodie.write.commit.callback.kafka.topic=hudi_commit_callback"
   "hoodie.write.commit.callback.kafka.retries=3"
   "hoodie.write.commit.callback.kafka.acks=all"
   "hoodie.write.commit.callback.kafka.bootstrap.servers=${kafka_servers}"
   "hoodie.metadata.enable=false"
   "hoodie.index.type=BLOOM"
   "hoodie.database.name=default"
   "hoodie.datasource.hive_sync.database=default"
   "hoodie.datasource.hive_sync.mode=hms"
   "hoodie.datasource.hive_sync.sync_comment_info=true"
   "hoodie.datasource.hive_sync.table=ods_test_aa"
   "hoodie.datasource.hive_sync.username=${hive_user}"
   "hoodie.datasource.hive_sync.password=${hive_password}"
   "hoodie.datasource.hive_sync.partition_fields=dt"
   "hoodie.datasource.hive_sync.support_timestamp=false"
   
"hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor"
   "hoodie.deltastreamer.transformer.sql=select 
a.*,from_unixtime(qsdiAccessTime/1000,'yyyy-MM-dd') as dt   from <SRC>  a"
   
   2.spark-shell read hive table 
   spark.sql("select count(1) from  ods_test_aa_rt").show
   spark.sql("select count(1) from  ods_test_aa_rt").show
   spark.sql("select count(1) from  ods_test_aa_rt").show
   ....
   ..
   ..
   
   spark-defaults.conf ------
   
   
   spark.cleaner.periodicGC.interval 5min
   spark.driver.cores 1
   spark.driver.maxResultSize 1g
   spark.eventLog.dir hdfs:///spark-history/
   spark.eventLog.enabled true
   spark.executor.extraJavaOptions -XX:+UseNUMA
   spark.executor.memory 1g
   spark.hadoop.cacheConf false
   spark.history.fs.cleaner.enabled true
   spark.history.fs.cleaner.interval 7d
   spark.history.fs.cleaner.maxAge 90d
   spark.history.fs.logDirectory hdfs:///spark-history/
   spark.history.kerberos.keytab none
   spark.history.kerberos.principal none
   spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
   spark.history.store.path /var/lib/spark/shs_db
   spark.history.ui.port 18081
   spark.io.compression.lz4.blockSize 128kb
   spark.master yarn
   spark.serializer org.apache.spark.serializer.KryoSerializer
   spark.shuffle.file.buffer 1m
   spark.shuffle.io.backLog 8192
   spark.sql.adaptive.advisoryPartitionSizeInBytes 128M
   spark.sql.adaptive.autoBroadcastJoinThreshold 10MB
   spark.sql.adaptive.coalescePartitions.enabled true
   spark.sql.adaptive.coalescePartitions.initialPartitionNum 8192
   spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB
   spark.sql.adaptive.coalescePartitions.parallelismFirst true
   spark.sql.adaptive.enabled true
   spark.sql.adaptive.forceOptimizeSkewedJoin false
   spark.sql.adaptive.localShuffleReader.enabled true
   spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true
   spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2
   spark.sql.adaptive.skewJoin.enabled true
   spark.sql.adaptive.skewJoin.skewedPartitionFactor 5
   spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB
   spark.sql.autoBroadcastJoinThreshold 10MB
   spark.sql.catalog.spark_catalog 
org.apache.spark.sql.hudi.catalog.HoodieCatalog
   spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
   spark.sql.hive.convertMetastoreOrc true
   spark.sql.hive.metastore.jars /usr/lib/hive/lib/*
   spark.sql.hive.metastore.version 3.1.3
   spark.sql.orc.filterPushdown true
   spark.sql.statistics.fallBackToHdfs true
   spark.sql.warehouse.dir /apps/hive/warehouse
   spark.submit.deployMode client
   spark.yarn.driver.memory 2g
   spark.yarn.executor.failuresValidityInterval 2h
   spark.yarn.historyServer.address bigdata.hadoop.master01:18081
   spark.yarn.maxAppAttempts 1
   spark.yarn.queue default
   
   
   3.error find 
   
   Driver stacktrace:
     at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
     at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
     at scala.Option.foreach(Option.scala:407)
     at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   Caused by: java.io.FileNotFoundException: File does not exist: 
hdfs://bigdata.hadoop.master01:8020/hudi/kafka/test_aa/dt=2022-10-30/1f8a2a89-17ea-43c7-8d4b-e22205448a57-0_0-18225-14227_20221101115201467.parquet
     at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1757)
     at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1750)
     at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
     at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1765)
     at 
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
     at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:39)
     at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.footerFileMetaData$lzycompute$1(Spark32PlusHoodieParquetFileFormat.scala:161)
     at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.footerFileMetaData$1(Spark32PlusHoodieParquetFileFormat.scala:160)
     at 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:164)
     at 
org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
     at 
org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:590)
     at 
org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:652)
     at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:121)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
     at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
     at org.apache.spark.scheduler.Task.run(Task.scala:136)
     at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.12.0
   
   * Spark version :3.3.1
   
   * Hive version :3.1.3
   
   * Hadoop version :3.3.4
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to