[ 
https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490729#comment-16490729
 ] 

Apache Spark commented on SPARK-23991:
--------------------------------------

User 'gaborgsomogyi' has created a pull request for this issue:
https://github.com/apache/spark/pull/21430

> data loss when allocateBlocksToBatch
> ------------------------------------
>
>                 Key: SPARK-23991
>                 URL: https://issues.apache.org/jira/browse/SPARK-23991
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Input/Output
>    Affects Versions: 2.2.0
>         Environment: spark 2.11
>            Reporter: kevin fu
>            Priority: Major
>
> with checkpoint and WAL enabled, driver will write the allocation of blocks 
> to batch into hdfs. however, if it fails as following, the blocks of this 
> batch cannot be computed by the DAG. Because the blocks have been dequeued 
> from the receivedBlockQueue and get lost.
> {panel:title=error log}
> 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
> record: BatchAllocationEvent(1523765480000 ms,AllocatedBlocks(Map(0 -> 
> ArrayBuffer()))) to the WriteAheadLog. org.apache.spark.SparkException: 
> Exception thrown in awaitResult: at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
>  at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused 
> by: java.util.concurrent.TimeoutException: Futures timed out after [5000 
> milliseconds] at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>  at scala.concurrent.Await$.result(package.scala:190) at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
> more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 
> 1523765480000 ms needs to be processed again in WAL recovery{panel}
> the concerning codes are showed below:
> {code}
>   /**
>    * Allocate all unallocated blocks to the given batch.
>    * This event will get written to the write ahead log (if enabled).
>    */
>   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
>     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) 
> {
>       val streamIdToBlocks = streamIds.map { streamId =>
>           (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
>       }.toMap
>       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
>       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
>         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
>         lastAllocatedBatchTime = batchTime
>       } else {
>         logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
>       }
>     } else {
>       // This situation occurs when:
>       // 1. WAL is ended with BatchAllocationEvent, but without 
> BatchCleanupEvent,
>       // possibly processed batch job or half-processed batch job need to be 
> processed again,
>       // so the batchTime will be equal to lastAllocatedBatchTime.
>       // 2. Slow checkpointing makes recovered batch time older than WAL 
> recovered
>       // lastAllocatedBatchTime.
>       // This situation will only occurs in recovery time.
>       logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to