Repository: spark Updated Branches: refs/heads/master e1aaab1e2 -> 271175e2b
[SPARK-20716][SS] StateStore.abort() should not throw exceptions ## What changes were proposed in this pull request? StateStore.abort() should do a best effort attempt to clean up temporary resources. It should not throw errors, especially because its called in a TaskCompletionListener, because this error could hide previous real errors in the task. ## How was this patch tested? No unit test. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #17958 from tdas/SPARK-20716. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/271175e2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/271175e2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/271175e2 Branch: refs/heads/master Commit: 271175e2bd0f7887a068db92de73eff60f5ef2b2 Parents: e1aaab1 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Mon May 15 10:46:38 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Mon May 15 10:46:38 2017 -0700 ---------------------------------------------------------------------- .../state/HDFSBackedStateStoreProvider.scala | 22 ++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/271175e2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 1426728..fb2bf47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} +import java.nio.channels.ClosedChannelException import java.util.Locale import scala.collection.JavaConverters._ @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { + state = ABORTED + if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() + } + if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) + } + } catch { + case c: ClosedChannelException => + // This can happen when underlying file output stream has been closed before the + // compression stream. + logDebug(s"Error aborting version $newVersion into $this", c) - state = ABORTED - if (tempDeltaFileStream != null) { - tempDeltaFileStream.close() - } - if (tempDeltaFile != null) { - fs.delete(tempDeltaFile, true) + case e: Exception => + logWarning(s"Error aborting version $newVersion into $this", e) } logInfo(s"Aborted version $newVersion for $this") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org