rohit-m-99 opened a new issue, #6015:
URL: https://github.com/apache/hudi/issues/6015

   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   
   Upgrading to 0.11.1 , the deltastreamer is failing to write to a 6GB bucket. 
It is failing on the 
   
   `Building workload profile`
   
   <img width="1431" alt="image" 
src="https://user-images.githubusercontent.com/84733594/176726541-e833e7a3-9724-454b-8c95-02d6ceddd298.png";>
   
   **To Reproduce**
   
   ```
   #!/bin/bash
   spark-submit \
   --jars 
/opt/spark/jars/hudi-spark3-bundle.jar,/opt/spark/jars/hadoop-aws.jar,/opt/spark/jars/aws-java-sdk.jar,/opt/spark/jars/spark-avro.jar
 \
   --master spark://spark-master:7077 \
   --total-executor-cores 40 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider
 \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
   --target-table per_tick_stats \
   --table-type COPY_ON_WRITE \
   --min-sync-interval-seconds 30 \
   --source-limit 25000000 \
   --continuous \
   --source-ordering-field STATOVYGIYLUMVSF6YLU \
   --target-base-path s3a://simian-example-prod-output/stats/querying \
   --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3a://example-prod-output/stats/ingesting \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATONUW2X3UNFWWK___
  \
   --hoodie-conf hoodie.datasource.write.precombine.field=STATOVYGIYLUMVSF6YLU \
   --hoodie-conf hoodie.clustering.plan.strategy.sort.columns= 
STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATMJQXIY3IL5ZHK3S7NFSA____ \
   --hoodie-conf hoodie.datasource.write.partitionpath.field= \
   --hoodie-conf hoodie.clustering.inline=false \
   --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=100000000 \
   --hoodie-conf hoodie.clustering.inline.max.commits=4 \
   --hoodie-conf hoodie.metadata.enable=false \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=false
   ```
   
   
   **Expected behavior**
   
   Able to ingest fairly quickly given the source limit
   
   **Environment Description**
   
   * Hudi version : 0.11.1
   
   * Spark version : 3.2.1
   
   * Hive version :
   
   * Hadoop version : 3.3.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : Yes
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   ```
   22/06/30 00:34:32 WARN BlockManagerMaster: Failed to remove broadcast 17 
with removeFromMaster = true - Cannot receive any reply from 
/10.10.228.207:58666 in 120 seconds. This timeout is contr
   olled by spark.rpc.askTimeout                   
   org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from 
/10.10.228.207:58666 in 120 seconds. This timeout is controlled by 
spark.rpc.askTimeout
           at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
                                                                      
           at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
                                                                                
      
           at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
                                                                                
      
           at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)   
                                                                                
                     
           at scala.util.Failure.recover(Try.scala:234)                         
                   
           at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395)      
                                                                                
                                   
           at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)     
                                                                                
                                   
           at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)            
                                                                                
                     
           at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)      
                                                                                
                                   
           at 
org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)         
                                                                                
                     
           at 
scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
                                                                                
           
           at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)       
                                                                                
                     
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
                                                                                
               
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
                                                                                
       
           at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)     
                                                                                
                     
           at scala.concurrent.Promise.complete(Promise.scala:53)               
                                                                                
                                   
           at scala.concurrent.Promise.complete$(Promise.scala:52)              
                                                                                
                                   
           at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)        
                                                                                
                     
           at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)            
                                                                                
                     
           at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)      
                                                                                
                                   
           at 
scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67)
                                                                                
                    
           at 
scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82)
                                                                                
                    
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)          
                                                                                
                     
           at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)          
                                                                                
                     
           at 
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59)          
                                                                                
                     
           at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
                                                                                
                 
           at 
scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110)           
                                                                                
                     
           at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)          
                                                                                
                     
           at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)     
                                                                                
                     
           at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)       
                                                                                
                     
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
                                                                                
               
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
                                                                                
       
           at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)     
                                                                                
                     
           at scala.concurrent.Promise.tryFailure(Promise.scala:112)            
                                                                                
                                   
           at scala.concurrent.Promise.tryFailure$(Promise.scala:112)           
                                                                                
                                   
           at 
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)      
                                                                                
                     
           at 
org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214)
                                                                    
           at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:264)       
                                                                                
                     
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)         
                                                                                
                     
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)          
                                                                                
                                   
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
                                                                
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
                                                                       
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
                                                                                
                     
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
                                                                                
                     
           at java.lang.Thread.run(Thread.java:748)                             
                   
   Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply 
from /10.10.228.207:58666 in 120 seconds                                        
                                     
           at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:265)       
                                                                                
                     
           ... 7 more                              
   22/06/30 00:34:32 ERROR ContextCleaner: Error cleaning broadcast 17          
                                                                                
                                   
   org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from 
/10.10.228.207:58666 in 120 seconds. This timeout is controlled by 
spark.rpc.askTimeout
           at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
                                                                      
           at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
                                                                                
      
           at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
                                                                                
      
           at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)   
                                                                                
                     
           at scala.util.Failure.recover(Try.scala:234)                         
                   
           at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395)      
                                                                                
                                   
           at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)     
                                                                                
                                   
           at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)            
                                                                                
                     
           at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)      
                                                                                
                                   
           at 
org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)         
                                                                                
                     
           at 
scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
                                                                                
           
           at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)       
                                                                                
                     
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
                                                                                
               
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
                                                                                
       
           at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)     
                                                                                
                     
           at scala.concurrent.Promise.complete(Promise.scala:53)               
                                                                                
                                   
           at scala.concurrent.Promise.complete$(Promise.scala:52)              
                                                                                
                                   
           at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)        
                                                                                
                     
           at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)            
                                                                                
                     
           at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)      
                                                                                
                                   
           at 
scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:67)
                                                                                
                    
           at 
scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:82)
                                                                                
                    
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)          
                                                                                
                     
           at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)          
                                                                                
                     
           at 
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:59)          
                                                                                
                     
           at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
                                                                                
                 
           at 
scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:110)           
                                                                                
                     
           at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)          
                                                                                
                     
           at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)     
                                                                                
                     
           at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)       
                                                                                
                     
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
                                                                                
               
           at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
                                                                                
       
           at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)     
                                                                                
                     
           at scala.concurrent.Promise.tryFailure(Promise.scala:112)            
                                                                                
                                   
           at scala.concurrent.Promise.tryFailure$(Promise.scala:112)           
                                                                                
                                   
           at 
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)      
                                                                                
                     
           at 
org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214)
                                                                    
           at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:264)       
                                                                                
                     
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)         
                                                                                
                     
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)          
                                                                                
                                   
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
                                                                
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
                                                                       
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
                                                                                
                     
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
                                                                                
                     
           at java.lang.Thread.run(Thread.java:748)                             
                   
   Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply 
from /10.10.228.207:58666 in 120 seconds                                        
                                     
           at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:265)       
                                                                                
                     
           ... 7 more                              
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to