CrazyBeeline opened a new issue, #7102: URL: https://github.com/apache/hudi/issues/7102
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** A clear and concise description of the problem. **To Reproduce** Steps to reproduce the behavior: 1. extract data from kafka used by HoodieDeltaStreamer spark-submit --master yarn \ --deploy-mode client \ -c spark.driver.memory=${spark_driver_memory} \ -c spark.driver.cores=${spark_driver_cores} \ -c spark.driver.memoryOverhead=${spark_driver_memoryOverhead} \ -c spark.executor.memory=${spark_executor_memory} \ -c spark.executor.cores=${spark_executor_cores} \ -c spark.executor.instances=${spark_executor_num} \ -c spark.executor.memoryOverhead=${spark_executor_memoryOverhead} \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer ${jar_path}/${jar_file} \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --target-base-path ${table_file_dir}/test_aa \ --continuous \ --schemaprovider-class org.apache.hudi.utilities.schema.MySchemaProvider \ --enable-sync \ --target-table test_aa \ --table-type MERGE_ON_READ \ --min-sync-interval-seconds 10 \ --source-limit 100 \ --op UPSERT hudi conf ---------- "hoodie.upsert.shuffle.parallelism=10" "hoodie.insert.shuffle.parallelism=10" "hoodie.bulkinsert.shuffle.parallelism=10" "hoodie.delete.shuffle.parallelism=10" "hoodie.rollback.parallelism=10" "hoodie.cleaner.parallelism=10" "hoodie.datasource.write.partitionpath.field=dt:SIMPLE" "hoodie.datasource.write.hive_style_partitioning=true" "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator" "hoodie.datasource.write.recordkey.field=id,accessTime" "hoodie.datasource.write.precombine.field=accessTime" "hoodie.deltastreamer.source.kafka.topic=${kafka_topic}" "bootstrap.servers=${kafka_servers}" "auto.offset.reset=earliest" "hoodie.write.commit.callback.on=true" "hoodie.write.commit.callback.class=org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback" "hoodie.write.commit.callback.kafka.topic=hudi_commit_callback" "hoodie.write.commit.callback.kafka.retries=3" "hoodie.write.commit.callback.kafka.acks=all" "hoodie.write.commit.callback.kafka.bootstrap.servers=${kafka_servers}" "hoodie.metadata.enable=false" "hoodie.index.type=BLOOM" "hoodie.database.name=default" "hoodie.datasource.hive_sync.database=default" "hoodie.datasource.hive_sync.mode=hms" "hoodie.datasource.hive_sync.sync_comment_info=true" "hoodie.datasource.hive_sync.table=ods_test_aa" "hoodie.datasource.hive_sync.username=${hive_user}" "hoodie.datasource.hive_sync.password=${hive_password}" "hoodie.datasource.hive_sync.partition_fields=dt" "hoodie.datasource.hive_sync.support_timestamp=false" "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor" "hoodie.deltastreamer.transformer.sql=select a.*,from_unixtime(qsdiAccessTime/1000,'yyyy-MM-dd') as dt from <SRC> a" 2.spark-shell read hive table spark.sql("select count(1) from ods_test_aa_rt").show spark.sql("select count(1) from ods_test_aa_rt").show spark.sql("select count(1) from ods_test_aa_rt").show .... .. .. spark-defaults.conf ------ spark.cleaner.periodicGC.interval 5min spark.driver.cores 1 spark.driver.maxResultSize 1g spark.eventLog.dir hdfs:///spark-history/ spark.eventLog.enabled true spark.executor.extraJavaOptions -XX:+UseNUMA spark.executor.memory 1g spark.hadoop.cacheConf false spark.history.fs.cleaner.enabled true spark.history.fs.cleaner.interval 7d spark.history.fs.cleaner.maxAge 90d spark.history.fs.logDirectory hdfs:///spark-history/ spark.history.kerberos.keytab none spark.history.kerberos.principal none spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider spark.history.store.path /var/lib/spark/shs_db spark.history.ui.port 18081 spark.io.compression.lz4.blockSize 128kb spark.master yarn spark.serializer org.apache.spark.serializer.KryoSerializer spark.shuffle.file.buffer 1m spark.shuffle.io.backLog 8192 spark.sql.adaptive.advisoryPartitionSizeInBytes 128M spark.sql.adaptive.autoBroadcastJoinThreshold 10MB spark.sql.adaptive.coalescePartitions.enabled true spark.sql.adaptive.coalescePartitions.initialPartitionNum 8192 spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB spark.sql.adaptive.coalescePartitions.parallelismFirst true spark.sql.adaptive.enabled true spark.sql.adaptive.forceOptimizeSkewedJoin false spark.sql.adaptive.localShuffleReader.enabled true spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 spark.sql.adaptive.skewJoin.enabled true spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB spark.sql.autoBroadcastJoinThreshold 10MB spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.sql.hive.convertMetastoreOrc true spark.sql.hive.metastore.jars /usr/lib/hive/lib/* spark.sql.hive.metastore.version 3.1.3 spark.sql.orc.filterPushdown true spark.sql.statistics.fallBackToHdfs true spark.sql.warehouse.dir /apps/hive/warehouse spark.submit.deployMode client spark.yarn.driver.memory 2g spark.yarn.executor.failuresValidityInterval 2h spark.yarn.historyServer.address bigdata.hadoop.master01:18081 spark.yarn.maxAppAttempts 1 spark.yarn.queue default 3.error find Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.io.FileNotFoundException: File does not exist: hdfs://bigdata.hadoop.master01:8020/hudi/kafka/test_aa/dt=2022-10-30/1f8a2a89-17ea-43c7-8d4b-e22205448a57-0_0-18225-14227_20221101115201467.parquet at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1757) at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1750) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1765) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39) at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:39) at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.footerFileMetaData$lzycompute$1(Spark32PlusHoodieParquetFileFormat.scala:161) at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.footerFileMetaData$1(Spark32PlusHoodieParquetFileFormat.scala:160) at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:164) at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67) at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:590) at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:652) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:121) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) **Expected behavior** A clear and concise description of what you expected to happen. **Environment Description** * Hudi version :0.12.0 * Spark version :3.3.1 * Hive version :3.1.3 * Hadoop version :3.3.4 * Storage (HDFS/S3/GCS..) :HDFS * Running on Docker? (yes/no) :no **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
