Repository: spark Updated Branches: refs/heads/master b5f8c36e3 -> 11fa8741c
[SQL][HOTFIX] Fix flakiness in StateStoreRDDSuite ## What changes were proposed in this pull request? StateStoreCoordinator.reportActiveInstance is async, so subsequence state checks must be in eventually. ## How was this patch tested? Jenkins tests Author: Tathagata Das <[email protected]> Closes #11924 from tdas/state-store-flaky-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11fa8741 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11fa8741 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11fa8741 Branch: refs/heads/master Commit: 11fa8741ca5b550e4f373c5d6e520c64f5118d0c Parents: b5f8c36 Author: Tathagata Das <[email protected]> Authored: Fri Mar 25 12:04:47 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Fri Mar 25 12:04:47 2016 -0700 ---------------------------------------------------------------------- .../streaming/state/StateStoreCoordinatorSuite.scala | 1 - .../execution/streaming/state/StateStoreRDDSuite.scala | 11 ++++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/11fa8741/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala index c99c2f5..a7e3262 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala @@ -71,7 +71,6 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { assert(coordinatorRef.verifyIfInstanceActive(id1, exec) === true) assert(coordinatorRef.verifyIfInstanceActive(id2, exec) === true) assert(coordinatorRef.verifyIfInstanceActive(id3, exec) === true) - } coordinatorRef.deactivateInstances("x") http://git-wip-us.apache.org/repos/asf/spark/blob/11fa8741/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala index 24cec30..df50cbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala @@ -23,6 +23,8 @@ import java.nio.file.Files import scala.util.Random import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.LocalSparkContext._ @@ -121,9 +123,12 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn val coordinatorRef = sqlContext.streams.stateStoreCoordinator coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1") coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2") - assert( - coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === - Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) + + eventually(timeout(10 seconds)) { + assert( + coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === + Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) + } val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore( increment, path, opId, storeVersion = 0, keySchema, valueSchema) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
