spark git commit: [SPARK-26042][SS][TESTS] Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite

2018-11-14 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5f11e8c4c -> 4035c98a0


[SPARK-26042][SS][TESTS] Fix a potential hang in 
KafkaContinuousSourceTopicDeletionSuite

## What changes were proposed in this pull request?

As initializing lazy vals shares the same lock, a thread is trying to 
initialize `executedPlan` when `isRDD` is running, this thread will hang 
forever.

This PR just materializes `executedPlan` so that accessing it when `toRdd` is 
running doesn't need to wait for a lock

## How was this patch tested?

Jenkins

Closes #23023 from zsxwing/SPARK-26042.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4035c98a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4035c98a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4035c98a

Branch: refs/heads/master
Commit: 4035c98a0c03cf61d1fb9a9916df513ab1081a9b
Parents: 5f11e8c
Author: Shixiong Zhu 
Authored: Wed Nov 14 10:19:20 2018 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 14 10:19:20 2018 -0800

--
 .../execution/streaming/continuous/ContinuousExecution.scala  | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4035c98a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f009c52..4a7df73 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -262,7 +262,12 @@ class ContinuousExecution(
 
   reportTimeTaken("runContinuous") {
 SQLExecution.withNewExecutionId(
-  sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
+  sparkSessionForQuery, lastExecution) {
+  // Materialize `executedPlan` so that accessing it when `toRdd` is 
running doesn't need to
+  // wait for a lock
+  lastExecution.executedPlan
+  lastExecution.toRdd
+}
   }
 } catch {
   case t: Throwable if StreamExecution.isInterruptionException(t, 
sparkSession.sparkContext) &&


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-26042][SS][TESTS] Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite

2018-11-14 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 e2e1f0ad8 -> ca426bfa5


[SPARK-26042][SS][TESTS] Fix a potential hang in 
KafkaContinuousSourceTopicDeletionSuite

## What changes were proposed in this pull request?

As initializing lazy vals shares the same lock, a thread is trying to 
initialize `executedPlan` when `isRDD` is running, this thread will hang 
forever.

This PR just materializes `executedPlan` so that accessing it when `toRdd` is 
running doesn't need to wait for a lock

## How was this patch tested?

Jenkins

Closes #23023 from zsxwing/SPARK-26042.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 4035c98a0c03cf61d1fb9a9916df513ab1081a9b)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca426bfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca426bfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca426bfa

Branch: refs/heads/branch-2.4
Commit: ca426bfa56045f01de0ea14480a375753073e025
Parents: e2e1f0a
Author: Shixiong Zhu 
Authored: Wed Nov 14 10:19:20 2018 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 14 10:19:37 2018 -0800

--
 .../execution/streaming/continuous/ContinuousExecution.scala  | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ca426bfa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f104422..2e24fa6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -259,7 +259,12 @@ class ContinuousExecution(
 
   reportTimeTaken("runContinuous") {
 SQLExecution.withNewExecutionId(
-  sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
+  sparkSessionForQuery, lastExecution) {
+  // Materialize `executedPlan` so that accessing it when `toRdd` is 
running doesn't need to
+  // wait for a lock
+  lastExecution.executedPlan
+  lastExecution.toRdd
+}
   }
 } catch {
   case t: Throwable


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org