Repository: spark
Updated Branches:
  refs/heads/master 8c5b34c42 -> ad640a5af


[SPARK-23303][SQL] improve the explain result for data source v2 relations

## What changes were proposed in this pull request?

The proposed explain format:
**[streaming header] [RelationV2/ScanV2] [data source name] [output] [pushed 
filters] [options]**

**streaming header**: if it's a streaming relation, put a "Streaming" at the 
beginning.
**RelationV2/ScanV2**: if it's a logical plan, put a "RelationV2", else, put a 
"ScanV2"
**data source name**: the simple class name of the data source implementation
**output**: a string of the plan output attributes
**pushed filters**: a string of all the filters that have been pushed to this 
data source
**options**: all the options to create the data source reader.

The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
      +- Project [j#1]
         +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
   +- Project [j#1, i#0]
      +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
   +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
   +- *(1) DataSourceV2Scan [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```

after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
      +- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
RelationV2 AdvancedDataSourceV2[j#1]

== Physical Plan ==
*(1) ScanV2 AdvancedDataSourceV2[j#1]
```
-------
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89]

== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: 
[GreaterThan(i,3)])

== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) ScanV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: 
[GreaterThan(i,3)])
```

an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject value#25.toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, 
count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = 
*********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0
   +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
      +- StateStoreRestore [value#6], state info [ checkpoint = 
*********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5]
         +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *(1) HashAggregate(keys=[value#6], 
functions=[partial_count(1)], output=[value#6, count#16L])
                  +- *(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
                     +- *(1) MapElements <function1>, obj#5: java.lang.String
                        +- *(1) DeserializeToObject value#25.toString, obj#4: 
java.lang.String
                           +- *(1) ScanV2 MemoryStreamDataSource[value#25]
```
## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #20647 from cloud-fan/explain.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad640a5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad640a5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad640a5a

Branch: refs/heads/master
Commit: ad640a5affceaaf3979e25848628fb1dfcdf932a
Parents: 8c5b34c
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Mon Mar 5 20:35:14 2018 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Mar 5 20:35:14 2018 -0800

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousSourceSuite.scala   |  2 +-
 .../sql/kafka010/KafkaContinuousTest.scala      |  2 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  2 +-
 .../datasources/v2/DataSourceReaderHolder.scala | 64 -------------
 .../datasources/v2/DataSourceV2Relation.scala   | 34 ++++---
 .../datasources/v2/DataSourceV2ScanExec.scala   | 18 +++-
 .../datasources/v2/DataSourceV2Strategy.scala   |  8 +-
 .../v2/DataSourceV2StringFormat.scala           | 94 ++++++++++++++++++++
 .../streaming/MicroBatchExecution.scala         | 29 ++++--
 .../continuous/ContinuousExecution.scala        |  8 +-
 .../spark/sql/streaming/StreamSuite.scala       | 12 ++-
 .../apache/spark/sql/streaming/StreamTest.scala |  4 +-
 .../streaming/continuous/ContinuousSuite.scala  | 11 +--
 13 files changed, 183 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index f679e9b..aab8ec4 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -60,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.logical.collectFirst {
-              case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) 
=> r
+              case StreamingDataSourceV2Relation(_, _, _, r: 
KafkaContinuousReader) => r
             }.exists { r =>
               // Ensure the new topic is present and the old topic is gone.
               r.knownPartitions.exists(_.topic == topic2)

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 48ac3fc..fa1468a 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
     eventually(timeout(streamingTimeout)) {
       assert(
         query.lastExecution.logical.collectFirst {
-          case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
+          case StreamingDataSourceV2Relation(_, _, _, r: 
KafkaContinuousReader) => r
         }.exists(_.knownPartitions.size == newCount),
         s"query never reconfigured to $newCount partitions")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index f2b3ff7..e017fd9 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -124,7 +124,7 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
         } ++ (query.get.lastExecution match {
           case null => Seq()
           case e => e.logical.collect {
-            case StreamingDataSourceV2Relation(_, reader: 
KafkaContinuousReader) => reader
+            case StreamingDataSourceV2Relation(_, _, _, reader: 
KafkaContinuousReader) => reader
           }
         })
       }.distinct

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
deleted file mode 100644
index 81219e9..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import java.util.Objects
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.sources.v2.reader._
-
-/**
- * A base class for data source reader holder with customized equals/hashCode 
methods.
- */
-trait DataSourceReaderHolder {
-
-  /**
-   * The output of the data source reader, w.r.t. column pruning.
-   */
-  def output: Seq[Attribute]
-
-  /**
-   * The held data source reader.
-   */
-  def reader: DataSourceReader
-
-  /**
-   * The metadata of this data source reader that can be used for equality 
test.
-   */
-  private def metadata: Seq[Any] = {
-    val filters: Any = reader match {
-      case s: SupportsPushDownCatalystFilters => 
s.pushedCatalystFilters().toSet
-      case s: SupportsPushDownFilters => s.pushedFilters().toSet
-      case _ => Nil
-    }
-    Seq(output, reader.getClass, filters)
-  }
-
-  def canEqual(other: Any): Boolean
-
-  override def equals(other: Any): Boolean = other match {
-    case other: DataSourceReaderHolder =>
-      canEqual(other) && metadata.length == other.metadata.length &&
-        metadata.zip(other.metadata).forall { case (l, r) => l == r }
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index cc6cb63..2b282ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -35,15 +35,12 @@ case class DataSourceV2Relation(
     options: Map[String, String],
     projection: Seq[AttributeReference],
     filters: Option[Seq[Expression]] = None,
-    userSpecifiedSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+    userSpecifiedSchema: Option[StructType] = None)
+  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
 
-  override def simpleString: String = {
-    s"DataSourceV2Relation(source=${source.name}, " +
-      s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
-      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
-  }
+  override def simpleString: String = "RelationV2 " + metadataString
 
   override lazy val schema: StructType = reader.readSchema()
 
@@ -107,19 +104,36 @@ case class DataSourceV2Relation(
 }
 
 /**
- * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
- * to the non-streaming relation.
+ * A specialization of [[DataSourceV2Relation]] with the streaming bit set to 
true.
+ *
+ * Note that, this plan has a mutable reader, so Spark won't apply operator 
push-down for this plan,
+ * to avoid making the plan mutable. We should consolidate this plan and 
[[DataSourceV2Relation]]
+ * after we figure out how to apply operator push-down for streaming data 
sources.
  */
 case class StreamingDataSourceV2Relation(
     output: Seq[AttributeReference],
+    source: DataSourceV2,
+    options: Map[String, String],
     reader: DataSourceReader)
-    extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
+  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
+
   override def isStreaming: Boolean = true
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+  override def simpleString: String = "Streaming RelationV2 " + metadataString
 
   override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: StreamingDataSourceV2Relation =>
+      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Seq(output, source, options).hashCode()
+  }
+
   override def computeStats(): Statistics = reader match {
     case r: SupportsReportStatistics =>
       Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 7d9581b..cb691ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
 import org.apache.spark.sql.types.StructType
@@ -36,10 +37,23 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
     output: Seq[AttributeReference],
+    @transient source: DataSourceV2,
+    @transient options: Map[String, String],
     @transient reader: DataSourceReader)
-  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
+  extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
+  override def simpleString: String = "ScanV2 " + metadataString
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: DataSourceV2ScanExec =>
+      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Seq(output, source, options).hashCode()
+  }
 
   override def outputPartitioning: physical.Partitioning = reader match {
     case s: SupportsReportPartitioning =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index c4e7644..1ac9572 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case relation: DataSourceV2Relation =>
-      DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
+    case r: DataSourceV2Relation =>
+      DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil
 
-    case relation: StreamingDataSourceV2Relation =>
-      DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
+    case r: StreamingDataSourceV2Relation =>
+      DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
new file mode 100644
index 0000000..aed55a4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.util.Utils
+
+/**
+ * A trait that can be used by data source v2 related query plans(both logical 
and physical), to
+ * provide a string format of the data source information for explain.
+ */
+trait DataSourceV2StringFormat {
+
+  /**
+   * The instance of this data source implementation. Note that we only 
consider its class in
+   * equals/hashCode, not the instance itself.
+   */
+  def source: DataSourceV2
+
+  /**
+   * The output of the data source reader, w.r.t. column pruning.
+   */
+  def output: Seq[Attribute]
+
+  /**
+   * The options for this data source reader.
+   */
+  def options: Map[String, String]
+
+  /**
+   * The created data source reader. Here we use it to get the filters that 
has been pushed down
+   * so far, itself doesn't take part in the equals/hashCode.
+   */
+  def reader: DataSourceReader
+
+  private lazy val filters = reader match {
+    case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
+    case s: SupportsPushDownFilters => s.pushedFilters().toSet
+    case _ => Set.empty
+  }
+
+  private def sourceName: String = source match {
+    case registered: DataSourceRegister => registered.shortName()
+    case _ => source.getClass.getSimpleName.stripSuffix("$")
+  }
+
+  def metadataString: String = {
+    val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
+
+    if (filters.nonEmpty) {
+      entries += "Filters" -> filters.mkString("[", ", ", "]")
+    }
+
+    // TODO: we should only display some standard options like path, table, 
etc.
+    if (options.nonEmpty) {
+      entries += "Options" -> Utils.redact(options).map {
+        case (k, v) => s"$k=$v"
+      }.mkString("[", ",", "]")
+    }
+
+    val outputStr = Utils.truncatedString(output, "[", ", ", "]")
+
+    val entriesStr = if (entries.nonEmpty) {
+      Utils.truncatedString(entries.map {
+        case (key, value) => key + ": " + StringUtils.abbreviate(value, 100)
+      }, " (", ", ", ")")
+    } else {
+      ""
+    }
+
+    s"$sourceName$outputStr$entriesStr"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ff4be9c..6e23197 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql.execution.streaming
 import java.util.Optional
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{Alias, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.execution.SQLExecution
 import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
WriteToDataSourceV2}
 import 
org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, 
MicroBatchWriter}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
MicroBatchReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset => OffsetV2}
 import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -52,6 +52,9 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
 
+  private val readerToDataSourceMap =
+    MutableMap.empty[MicroBatchReader, (DataSourceV2, Map[String, String])]
+
   private val triggerExecutor = trigger match {
     case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
     case OneTimeTrigger => OneTimeExecutor()
@@ -97,6 +100,7 @@ class MicroBatchExecution(
             metadataPath,
             new DataSourceOptions(options.asJava))
           nextSourceId += 1
+          readerToDataSourceMap(reader) = dataSourceV2 -> options
           logInfo(s"Using MicroBatchReader [$reader] from " +
             s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
           StreamingExecutionRelation(reader, output)(sparkSession)
@@ -419,8 +423,19 @@ class MicroBatchExecution(
             toJava(current),
             Optional.of(availableV2))
           logDebug(s"Retrieving data from $reader: $current -> $availableV2")
-          Some(reader ->
-            new 
StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
+
+          val (source, options) = reader match {
+            // `MemoryStream` is special. It's for test only and doesn't have 
a `DataSourceV2`
+            // implementation. We provide a fake one here for explain.
+            case _: MemoryStream[_] => MemoryStreamDataSource -> 
Map.empty[String, String]
+            // Provide a fake value here just in case something went wrong, 
e.g. the reader gives
+            // a wrong `equals` implementation.
+            case _ => readerToDataSourceMap.getOrElse(reader, {
+              FakeDataSourceV2 -> Map.empty[String, String]
+            })
+          }
+          Some(reader -> StreamingDataSourceV2Relation(
+            reader.readSchema().toAttributes, source, options, reader))
         case _ => None
       }
     }
@@ -525,3 +540,7 @@ class MicroBatchExecution(
 object MicroBatchExecution {
   val BATCH_ID_KEY = "streaming.sql.batchId"
 }
+
+object MemoryStreamDataSource extends DataSourceV2
+
+object FakeDataSourceV2 extends DataSourceV2

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index daebd1d..1758b38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
@@ -167,7 +167,7 @@ class ContinuousExecution(
 
     var insertedSourceId = 0
     val withNewSources = logicalPlan transform {
-      case ContinuousExecutionRelation(_, _, output) =>
+      case ContinuousExecutionRelation(source, options, output) =>
         val reader = continuousSources(insertedSourceId)
         insertedSourceId += 1
         val newOutput = reader.readSchema().toAttributes
@@ -180,7 +180,7 @@ class ContinuousExecution(
         val loggedOffset = offsets.offsets(0)
         val realOffset = loggedOffset.map(off => 
reader.deserializeOffset(off.json))
         reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
-        new StreamingDataSourceV2Relation(newOutput, reader)
+        StreamingDataSourceV2Relation(newOutput, source, options, reader)
     }
 
     // Rewire the plan to use the new attributes that were returned by the 
source.
@@ -201,7 +201,7 @@ class ContinuousExecution(
     val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
 
     val reader = withSink.collect {
-      case StreamingDataSourceV2Relation(_, r: ContinuousReader) => r
+      case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
     }.head
 
     reportTimeTaken("queryPlanning") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index d1a0483..c1ec1eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -492,16 +492,20 @@ class StreamSuite extends StreamTest {
 
       val explainWithoutExtended = q.explainInternal(false)
       // `extended = false` only displays the physical plan.
-      
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size
 === 0)
-      assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size 
=== 1)
+      assert("Streaming RelationV2 MemoryStreamDataSource".r
+        .findAllMatchIn(explainWithoutExtended).size === 0)
+      assert("ScanV2 MemoryStreamDataSource".r
+        .findAllMatchIn(explainWithoutExtended).size === 1)
       // Use "StateStoreRestore" to verify that it does output a streaming 
physical plan
       assert(explainWithoutExtended.contains("StateStoreRestore"))
 
       val explainWithExtended = q.explainInternal(true)
       // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
       // plan.
-      
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size
 === 3)
-      assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 
1)
+      assert("Streaming RelationV2 MemoryStreamDataSource".r
+        .findAllMatchIn(explainWithExtended).size === 3)
+      assert("ScanV2 MemoryStreamDataSource".r
+        .findAllMatchIn(explainWithExtended).size === 1)
       // Use "StateStoreRestore" to verify that it does output a streaming 
physical plan
       assert(explainWithExtended.contains("StateStoreRestore"))
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 08f722e..e44aef0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -629,8 +629,8 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
             def findSourceIndex(plan: LogicalPlan): Option[Int] = {
               plan
                 .collect {
-                  case StreamingExecutionRelation(s, _) => s
-                  case StreamingDataSourceV2Relation(_, r) => r
+                  case r: StreamingExecutionRelation => r.source
+                  case r: StreamingDataSourceV2Relation => r.reader
                 }
                 .zipWithIndex
                 .find(_._1 == source)

http://git-wip-us.apache.org/repos/asf/spark/blob/ad640a5a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 4b4ed82..f5884b9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -17,15 +17,12 @@
 
 package org.apache.spark.sql.streaming.continuous
 
-import java.util.UUID
-
-import org.apache.spark.{SparkContext, SparkEnv, SparkException}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, 
SparkListenerTaskStart}
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, 
WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.{StreamTest, Trigger}
 import org.apache.spark.sql.test.TestSparkSession
@@ -43,7 +40,7 @@ class ContinuousSuiteBase extends StreamTest {
       case s: ContinuousExecution =>
         assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure 
query is initialized")
         val reader = s.lastExecution.executedPlan.collectFirst {
-          case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r
+          case DataSourceV2ScanExec(_, _, _, r: RateStreamContinuousReader) => 
r
         }.get
 
         val deltaMs = numTriggers * 1000 + 300


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to