Repository: spark Updated Branches: refs/heads/master b6de0c98c -> 6f7ecb0f2
[SPARK-18342] Make rename failures fatal in HDFSBackedStateStore ## What changes were proposed in this pull request? If the rename operation in the state store fails (`fs.rename` returns `false`), the StateStore should throw an exception and have the task retry. Currently if renames fail, nothing happens during execution immediately. However, you will observe that snapshot operations will fail, and then any attempt at recovery (executor failure / checkpoint recovery) also fails. ## How was this patch tested? Unit test Author: Burak Yavuz <[email protected]> Closes #15804 from brkyvz/rename-state. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f7ecb0f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f7ecb0f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f7ecb0f Branch: refs/heads/master Commit: 6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb Parents: b6de0c9 Author: Burak Yavuz <[email protected]> Authored: Tue Nov 8 15:08:09 2016 -0800 Committer: Tathagata Das <[email protected]> Committed: Tue Nov 8 15:08:09 2016 -0800 ---------------------------------------------------------------------- .../state/HDFSBackedStateStoreProvider.scala | 6 ++- .../streaming/state/StateStoreSuite.scala | 41 +++++++++++++++++--- 2 files changed, 40 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6f7ecb0f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index f1e7f1d..8087131 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider( private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { synchronized { val finalDeltaFile = deltaFile(newVersion) - fs.rename(tempDeltaFile, finalDeltaFile) + if (!fs.rename(tempDeltaFile, finalDeltaFile)) { + throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") + } loadedMaps.put(newVersion, map) finalDeltaFile } @@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider( val deltaFiles = allFiles.filter { file => file.version > snapshotFile.version && file.version <= version - } + }.toList verify( deltaFiles.size == version - snapshotFile.version, s"Unexpected list of delta files for version $version for $this: $deltaFiles" http://git-wip-us.apache.org/repos/asf/spark/blob/6f7ecb0f/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index fcf300b..504a265 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.File +import java.io.{File, IOException} +import java.net.URI import scala.collection.mutable import scala.util.Random import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } } + test("SPARK-18342: commit fails when rename fails") { + import RenameReturnsFalseFileSystem._ + val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString + val conf = new Configuration() + conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) + val provider = newStoreProvider(dir = dir, hadoopConf = conf) + val store = provider.getStore(0) + put(store, "a", 0) + val e = intercept[IllegalStateException](store.commit()) + assert(e.getCause.getMessage.contains("Failed to rename")) + } + def getDataFromFiles( provider: HDFSBackedStateStoreProvider, version: Int = -1): Set[(String, Int)] = { @@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth def newStoreProvider( opId: Long = Random.nextLong, partition: Int = 0, - minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get + minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, + dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString, + hadoopConf: Configuration = new Configuration() ): HDFSBackedStateStoreProvider = { - val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString val sqlConf = new SQLConf() sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) new HDFSBackedStateStoreProvider( @@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth keySchema, valueSchema, new StateStoreConf(sqlConf), - new Configuration()) + hadoopConf) } def remove(store: StateStore, condition: String => Boolean): Unit = { @@ -598,3 +612,20 @@ private[state] object StateStoreSuite { }}.toSet } } + +/** + * Fake FileSystem to test that the StateStore throws an exception while committing the + * delta file, when `fs.rename` returns `false`. + */ +class RenameReturnsFalseFileSystem extends RawLocalFileSystem { + import RenameReturnsFalseFileSystem._ + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def rename(src: Path, dst: Path): Boolean = false +} + +object RenameReturnsFalseFileSystem { + val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs" +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
