MihawkZoro opened a new issue, #7623:
URL: https://github.com/apache/hudi/issues/7623

   **Environment Description**
   
   * Hudi version :
   0.11.1
   * Spark version :
   3.2.1
   * Hadoop version :
   2.7.3
   * Storage :
   hdfs
   
   **Describe the problem you faced**
   
   I have a hudi table which is about 100GB, and I want to cluster it .
   I think I have given enough resources , but it clustered failed finally
   
   **To Reproduce**
   
   1. submit a clustering job
   ```
   nohup spark-submit --queue mid --master yarn --name shuiqing_test_clustering 
--driver-memory 4g --executor-cores 4 --num-executors 8 --executor-memory 36g 
--conf spark.dynamicAllocation.enabled=false --conf 
spark.executor.memoryOverhead=12g \
   --conf spark.executor.extraLibraryPath="-verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCDateStamps" \
   --class org.apache.hudi.utilities.HoodieClusteringJob 
hudi-utilities-bundle_2.12-0.11.1.jar \
   --props file:///home/work/test/clusteringjob.properties \
   --mode scheduleAndExecute \
   --base-path /user/work/testdb/bill_item \
   --table-name bill_item \
   --parallelism 8 \
   --spark-memory 36g &> clustering.log &
   ```
      clusteringjob.properties  details
   ```
   hoodie.clustering.async.enabled=true
   hoodie.clustering.async.max.commits=4
   hoodie.clustering.plan.strategy.target.file.max.bytes=805306368
   hoodie.clustering.plan.strategy.small.file.limit=268435456
   
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
   hoodie.clustering.plan.strategy.sort.columns=id
   ```
   2. watch spark job detail 
   
![image](https://user-images.githubusercontent.com/32875366/211232986-e83e2408-62e3-484c-b854-8e9c911f2063.png)
   
   3. table struct
   ```
   _hoodie_commit_time  string
   _hoodie_commit_seqno string
   _hoodie_record_key   string
   _hoodie_partition_path       string
   _hoodie_file_name    string
   id                   string
   rek_id               string
   item_id              string
   .........
   item_flag            string
   ts                   bigint
   _key_partition       string
   # Partition Information
   # col_name           data_type               comment
   _key_partition       string
   ```
   
   **Stacktrace**
   
   ```
   org.apache.spark.shuffle.FetchFailedException
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1165)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
        at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAllAndUpdateMetrics(ExternalSorter.scala:686)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:134)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.RuntimeException: 
java.lang.UnsupportedOperationException
        at 
org.apache.spark.network.server.StreamManager.openStream(StreamManager.java:59)
        at 
org.apache.spark.network.server.TransportRequestHandler.processStreamRequest(TransportRequestHandler.java:136)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:106)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:748)
        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:260)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
   ```
   
   ```
   org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
   org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:157)
   
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering(BaseCommitActionExecutor.java:250)
   
org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor.execute(SparkExecuteClusteringCommitActionExecutor.java:53)
   
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.cluster(HoodieSparkCopyOnWriteTable.java:188)
   
org.apache.hudi.client.SparkRDDWriteClient.cluster(SparkRDDWriteClient.java:363)
   
org.apache.hudi.utilities.HoodieClusteringJob.doScheduleAndCluster(HoodieClusteringJob.java:255)
   
org.apache.hudi.utilities.HoodieClusteringJob.lambda$cluster$0(HoodieClusteringJob.java:168)
   org.apache.hudi.utilities.UtilHelpers.retry(UtilHelpers.java:541)
   
org.apache.hudi.utilities.HoodieClusteringJob.cluster(HoodieClusteringJob.java:155)
   
org.apache.hudi.utilities.HoodieClusteringJob.main(HoodieClusteringJob.java:132)
   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   java.lang.reflect.Method.invoke(Method.java:498)
   org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
   org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
   org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
   org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
   ```
   
   
   I want know what is the problem, thank you !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to