Repository: spark
Updated Branches:
  refs/heads/master e41acb757 -> 0b7d4966c


[SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpoint

## What changes were proposed in this pull request?

RpcEndpoint is not thread safe and allows multiple messages to be processed at 
the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <[email protected]>

Closes #12100 from zsxwing/fix-StateStoreCoordinator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b7d4966
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b7d4966
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b7d4966

Branch: refs/heads/master
Commit: 0b7d4966ca7e02f351c4b92a74789cef4799fcb1
Parents: e41acb7
Author: Shixiong Zhu <[email protected]>
Authored: Fri Apr 1 15:00:38 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Fri Apr 1 15:00:38 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/state/StateStoreCoordinator.scala    | 4 ++--
 .../sql/execution/streaming/state/StateStoreRDDSuite.scala   | 8 +++-----
 2 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b7d4966/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 5aa0636..812e1b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
-import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, 
RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.util.RpcUtils
 
@@ -112,7 +112,7 @@ private[sql] class StateStoreCoordinatorRef 
private(rpcEndpointRef: RpcEndpointR
  * Class for coordinating instances of [[StateStore]]s loaded in executors 
across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends 
RpcEndpoint {
+private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint {
   private val instances = new mutable.HashMap[StateStoreId, 
ExecutorCacheTaskLocation]
 
   override def receive: PartialFunction[Any, Unit] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0b7d4966/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
index df50cbd..85db051 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
@@ -124,11 +124,9 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAn
         coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), 
"host1", "exec1")
         coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), 
"host2", "exec2")
 
-        eventually(timeout(10 seconds)) {
-          assert(
-            coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
-              Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
-        }
+        assert(
+          coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
+            Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
 
         val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore(
           increment, path, opId, storeVersion = 0, keySchema, valueSchema)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to