Jim Huang created SPARK-31995:
---------------------------------

             Summary: Spark Structure Streaming checkpiontFileManager ERROR 
when HDFS.DFSOutputStream.completeFile with IOException unable to close file 
because the last block does not have enough number of replicas
                 Key: SPARK-31995
                 URL: https://issues.apache.org/jira/browse/SPARK-31995
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.5
         Environment: Apache Spark 2.4.5 without Hadoop

Hadoop 2.7.3 - YARN cluster

delta-core_ 2.11:0.6.1

 
            Reporter: Jim Huang


I am using Spark 2.4.5's Spark Structured Streaming running in YARN cluster 
running on Hadoop 2.7.3.  I have been using Spark Structured Streaming for 
several months now in this runtime environment until this new corner case that 
handicapped my Spark structured streaming job in partial working state.

 

I have included the ERROR message and stack trace.  I did a quick search using 
the string "MicroBatchExecution: Query terminated with error" but did not find 
any existing Jira that looks like my stack trace.  

 

Based on the naive look at this error message and stack trace, is it possible 
the Spark's CheckpointFileManager could attempt to handle this HDFS exception 
better to simply wait a little longer for HDFS's pipeline to complete the 
replicas?  

 

Being new to this code, where can I find the configuration parameter that sets 
the replica counts for the `streaming.HDFSMetadataLog`?  I am just trying to 
understand if there are already some holistic configuration tuning variable(s) 
the current code provide to be able to handle this IOException more gracefully? 
 Hopefully experts can provide some pointers or directions.  

 

```

20/06/12 20:14:15 ERROR MicroBatchExecution: Query [id = yarn-job-id-redacted, 
runId = run-id-redacted] terminated with error
java.io.IOException: Unable to close file because the last block does not have 
enough number of replicas.
 at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2511)
 at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2472)
 at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2437)
 at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:126)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
 at scala.Option.getOrElse(Option.scala:121)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:547)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

```

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to