MihawkZoro opened a new issue, #7623:
URL: https://github.com/apache/hudi/issues/7623
**Environment Description**
* Hudi version :
0.11.1
* Spark version :
3.2.1
* Hadoop version :
2.7.3
* Storage :
hdfs
**Describe the problem you faced**
I have a hudi table which is about 100GB, and I want to cluster it .
I think I have given enough resources , but it clustered failed finally
**To Reproduce**
1. submit a clustering job
```
nohup spark-submit --queue mid --master yarn --name shuiqing_test_clustering
--driver-memory 4g --executor-cores 4 --num-executors 8 --executor-memory 36g
--conf spark.dynamicAllocation.enabled=false --conf
spark.executor.memoryOverhead=12g \
--conf spark.executor.extraLibraryPath="-verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps" \
--class org.apache.hudi.utilities.HoodieClusteringJob
hudi-utilities-bundle_2.12-0.11.1.jar \
--props file:///home/work/test/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /user/work/testdb/bill_item \
--table-name bill_item \
--parallelism 8 \
--spark-memory 36g &> clustering.log &
```
clusteringjob.properties details
```
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=805306368
hoodie.clustering.plan.strategy.small.file.limit=268435456
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=id
```
2. watch spark job detail

3. table struct
```
_hoodie_commit_time string
_hoodie_commit_seqno string
_hoodie_record_key string
_hoodie_partition_path string
_hoodie_file_name string
id string
rek_id string
item_id string
.........
item_flag string
ts bigint
_key_partition string
# Partition Information
# col_name data_type comment
_key_partition string
```
**Stacktrace**
```
org.apache.spark.shuffle.FetchFailedException
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1165)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
at
org.apache.spark.util.collection.ExternalSorter.insertAllAndUpdateMetrics(ExternalSorter.scala:686)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:134)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException:
java.lang.UnsupportedOperationException
at
org.apache.spark.network.server.StreamManager.openStream(StreamManager.java:59)
at
org.apache.spark.network.server.TransportRequestHandler.processStreamRequest(TransportRequestHandler.java:136)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:106)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:260)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
```
```
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:157)
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering(BaseCommitActionExecutor.java:250)
org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor.execute(SparkExecuteClusteringCommitActionExecutor.java:53)
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.cluster(HoodieSparkCopyOnWriteTable.java:188)
org.apache.hudi.client.SparkRDDWriteClient.cluster(SparkRDDWriteClient.java:363)
org.apache.hudi.utilities.HoodieClusteringJob.doScheduleAndCluster(HoodieClusteringJob.java:255)
org.apache.hudi.utilities.HoodieClusteringJob.lambda$cluster$0(HoodieClusteringJob.java:168)
org.apache.hudi.utilities.UtilHelpers.retry(UtilHelpers.java:541)
org.apache.hudi.utilities.HoodieClusteringJob.cluster(HoodieClusteringJob.java:155)
org.apache.hudi.utilities.HoodieClusteringJob.main(HoodieClusteringJob.java:132)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
```
I want know what is the problem, thank you !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]