[
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397520#comment-15397520
]
Cody Koeninger commented on SPARK-16746:
----------------------------------------
>From conversation on mailing list, it wasn't clear whether this was using the
>official Kafka integration or dibbhatt from spark packages. If the latter,
>should probably bring it up to him first.
> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs
> timeout
> -------------------------------------------------------------------------------
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.6.1
> Reporter: Hongyao Zhao
> Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic
> of Kafka, did some transformation, and wrote the result back to another
> topic. But only found 988 messages in the second topic. I checked log info
> and confirmed all messages was received by receivers. But I found a hdfs
> writing time out message printed from Class BatchedWriteAheadLog.
>
> I checkout source code and found code like this:
>
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead
> log (if enabled). */
> def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
> try {
> val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
> if (writeResult) {
> synchronized {
> getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo
> }
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
> s"block ${receivedBlockInfo.blockStoreResult.blockId}")
> } else {
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId}
> receiving " +
> s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write
> Ahead Log.")
> }
> writeResult
> } catch {
> case NonFatal(e) =>
> logError(s"Error adding block $receivedBlockInfo", e)
> false
> }
> }
> {code}
>
> It seems that ReceiverTracker tries to write block info to hdfs, but the
> write operation time out, this cause writeToLog function return false, and
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo" is skipped. so the block info is lost.
> The spark version I use is 1.6.1 and I did not turn on
> spark.streaming.receiver.writeAheadLog.enable.
>
> I want to know whether or not this is a designed behaviour.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]