haripriyarhp opened a new issue, #8153:
URL: https://github.com/apache/hudi/issues/8153

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   I am having a spark structured streaming job reading from kafka and writing 
to S3 as hudi table. Based on the below link 
https://hudi.apache.org/docs/clustering/#spark-structured-streaming , I set up 
asynchronus clustering in my spark job. I have an MoR table and async 
clustering produces the below error. I checked the same by creating a CoW 
table. And async clustering works fine. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   I expected async clustering is also successful for MoR table. 
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.1.2
   
   * Hive version :
   
   * Hadoop version : 
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Map[String,String](
     "hoodie.table.name" -> tableName,
     "path" -> "s3a://path to table/tableName,
     "hoodie.datasource.write.table.name" -> tableName,
     "hoodie.datasource.write.table.type" -> MERGE_ON_READ,
     "hoodie.datasource.write.operation" -> "upsert",
     "hoodie.datasource.write.recordkey.field" -> "col4,col5,col6,col7",
     "hoodie.datasource.write.partitionpath.field" -> "col1,col2,col3",
     "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
     "hoodie.datasource.write.hive_style_partitioning" -> "true",
     //Cleaning options
     "hoodie.clean.automatic" -> "true",
     "hoodie.clean.async" -> "true",
     //hive_sync_options
     "hoodie.datasource.hive_sync.partition_fields" -> "col1,col2,col3",
     "hoodie.datasource.hive_sync.database" -> dbName,
     "hoodie.datasource.hive_sync.table" -> tableName,
     "hoodie.datasource.hive_sync.enable" -> "true",
     "hoodie.datasource.hive_sync.mode" -> "hms",
     "hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
     //Clustering options
     "hoodie.write.lock.provider" -> 
"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
     "hoodie.write.lock.dynamodb.table" -> "hudi_occ",
     "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "52428800",
     "hoodie.clustering.async.enabled" -> "true",
     "hoodie.clustering.async.max.commits" -> "4",
     "hoodie.clustering.execution.strategy.class" -> 
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
     "hoodie.upsert.shuffle.parallelism" -> "200",
     "hoodie.insert.shuffle.parallelism" -> "200",
     "hoodie.datasource.compaction.async.enable" -> "false",
     "hoodie.compact.inline.max.delta.commits" -> "5",
     "hoodie.index.type" -> "BLOOM"
   )
   
   **Stacktrace**
   
   23/03/08 16:43:15 INFO AbstractTableFileSystemView: addFilesToView: 
NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0
   23/03/08 16:43:15 INFO ClusteringUtils: Found 30 files in pending clustering 
operations
   23/03/08 16:43:15 INFO AbstractTableFileSystemView: addFilesToView: 
NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0
   23/03/08 16:43:15 ERROR AsyncClusteringService: Clustering executor failed
   java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.datasources.PartitionedFile.<init>(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   Caused by: java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.datasources.PartitionedFile.<init>(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$2(MergeOnReadSnapshotRelation.scala:238)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$1(MergeOnReadSnapshotRelation.scala:236)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.buildSplits(MergeOnReadSnapshotRelation.scala:232)
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:227)
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:64)
        at 
org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:346)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:351)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:384)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:439)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:383)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:351)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
        at 
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:431)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:104)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:104)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:97)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:117)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:114)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at 
org.apache.hudi.HoodieDatasetBulkInsertHelper$.bulkInsert(HoodieDatasetBulkInsertHelper.scala:142)
        at 
org.apache.hudi.HoodieDatasetBulkInsertHelper.bulkInsert(HoodieDatasetBulkInsertHelper.scala)
        at 
org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsAsRow(SparkSortAndSizeExecutionStrategy.java:72)
        at 
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:249)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 5 more
   23/03/08 16:43:15 INFO HoodieStreamingSink: Async clustering service 
shutdown. Errored ? true
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to