[
https://issues.apache.org/jira/browse/SPARK-19645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872921#comment-15872921
]
Feng Gui edited comment on SPARK-19645 at 2/20/17 6:39 AM:
-----------------------------------------------------------
[~zsxwing] ok, I think the solution is that determining whether the current
versionId's Delta File exists before rename delta file. when file exists, just
ignore the next rename operation. PR: https://github.com/apache/spark/pull/16980
was (Author: [email protected]):
[~zsxwing] ok, I think the solution is that determining whether the current
versionId's Delta File exists before rename delta file. when file exists, just
ignore the next rename operation(may also need to delete current versionId's
snapshot).
> structured streaming job restart bug
> ------------------------------------
>
> Key: SPARK-19645
> URL: https://issues.apache.org/jira/browse/SPARK-19645
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Feng Gui
> Priority: Critical
>
> We are trying to use Structured Streaming in product, however currently
> there exists a bug refer to the process of streaming job restart.
> The following is the concrete error message:
> {quote}
> Caused by: java.lang.IllegalStateException: Error committing version 2
> into HDFSStateStore[id = (op=0, part=136), dir =
> /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136]
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162)
> at
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173)
> at
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123)
> at
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to rename
> /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284
> to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156)
> ... 14 more
> {quote}
> The bug can be easily reproduced, just modify {color:red} val
> metadataRoot = "hdfs://localhost:8020/tmp/checkpoint" {color} in StreamTest
> and then run the test {color:red} sort after aggregate in complete mode
> {color} in StreamingAggregationSuite. The main reason is that when restart
> streaming job spark will recompute WAL offsets and generate the same hdfs
> delta file(latest delta file generated before restart and named
> "currentBatchId.delta") . In my opinion, this is a bug. If you guy consider
> that this is a bug also, I can fix it.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]