spark git commit: [SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation

2018-05-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 aba52f449 -> 8889d7864


[SPARK-24214][SS] Fix toJSON for 
StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation

## What changes were proposed in this pull request?

We should overwrite "otherCopyArgs" to provide the SparkSession parameter 
otherwise TreeNode.toJSON cannot get the full constructor parameter list.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu 

Closes #21275 from zsxwing/SPARK-24214.

(cherry picked from commit fd1179c17273283d32f275d5cd5f97aaa2aca1f7)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8889d786
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8889d786
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8889d786

Branch: refs/heads/branch-2.3
Commit: 8889d78643154a0eb5ce81363ba471a80a1e64f1
Parents: aba52f4
Author: Shixiong Zhu 
Authored: Wed May 9 11:32:17 2018 -0700
Committer: Shixiong Zhu 
Committed: Wed May 9 11:32:27 2018 -0700

--
 .../sql/execution/streaming/StreamingRelation.scala  |  3 +++
 .../spark/sql/streaming/StreamingQuerySuite.scala| 15 +++
 2 files changed, 18 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8889d786/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index f02d3a2..24195b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -66,6 +66,7 @@ case class StreamingExecutionRelation(
 output: Seq[Attribute])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
 
@@ -97,6 +98,7 @@ case class StreamingRelationV2(
 output: Seq[Attribute],
 v1Relation: Option[StreamingRelation])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = sourceName
 
@@ -116,6 +118,7 @@ case class ContinuousExecutionRelation(
 output: Seq[Attribute])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8889d786/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 2b0ab33..e3429b5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -687,6 +687,21 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
   CheckLastBatch(("A", 1)))
   }
 
+  
test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON
 " +
+"should not fail") {
+val df = spark.readStream.format("rate").load()
+assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))
+
+testStream(df)(
+  
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
+)
+
+testStream(df, useV2Sink = true)(
+  StartStream(trigger = Trigger.Continuous(100)),
+  
AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
+)
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns 
the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame 
= {
 require(!triggerDF.isStreaming)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation

2018-05-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 7aaa148f5 -> fd1179c17


[SPARK-24214][SS] Fix toJSON for 
StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation

## What changes were proposed in this pull request?

We should overwrite "otherCopyArgs" to provide the SparkSession parameter 
otherwise TreeNode.toJSON cannot get the full constructor parameter list.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu 

Closes #21275 from zsxwing/SPARK-24214.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd1179c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd1179c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd1179c1

Branch: refs/heads/master
Commit: fd1179c17273283d32f275d5cd5f97aaa2aca1f7
Parents: 7aaa148
Author: Shixiong Zhu 
Authored: Wed May 9 11:32:17 2018 -0700
Committer: Shixiong Zhu 
Committed: Wed May 9 11:32:17 2018 -0700

--
 .../sql/execution/streaming/StreamingRelation.scala  |  3 +++
 .../spark/sql/streaming/StreamingQuerySuite.scala| 15 +++
 2 files changed, 18 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fd1179c1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index f02d3a2..24195b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -66,6 +66,7 @@ case class StreamingExecutionRelation(
 output: Seq[Attribute])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
 
@@ -97,6 +98,7 @@ case class StreamingRelationV2(
 output: Seq[Attribute],
 v1Relation: Option[StreamingRelation])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = sourceName
 
@@ -116,6 +118,7 @@ case class ContinuousExecutionRelation(
 output: Seq[Attribute])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fd1179c1/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 0cb2375..5798699 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -831,6 +831,21 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
   CheckLastBatch(("A", 1)))
   }
 
+  
test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON
 " +
+"should not fail") {
+val df = spark.readStream.format("rate").load()
+assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))
+
+testStream(df)(
+  
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
+)
+
+testStream(df, useV2Sink = true)(
+  StartStream(trigger = Trigger.Continuous(100)),
+  
AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
+)
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns 
the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame 
= {
 require(!triggerDF.isStreaming)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org