This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 02533d71806e [SPARK-46777][SS] Refactor
`StreamingDataSourceV2Relation` catalyst structure to be more on-par with the
batch version
02533d71806e is described below
commit 02533d71806ec0be97ec793d680189093c9a0ecb
Author: jackierwzhang <[email protected]>
AuthorDate: Mon Jan 22 18:58:55 2024 +0900
[SPARK-46777][SS] Refactor `StreamingDataSourceV2Relation` catalyst
structure to be more on-par with the batch version
### What changes were proposed in this pull request?
This PR refactors `StreamingDataSourceV2Relation` into
`StreamingDataSourceV2Relation` and `StreamingDataSourceV2ScanRelation` to
achieve better parity with the batch version. This prepares the codebase to be
able to extend certain V2 optimization rules (e.g. `V2ScanRelationPushDown`) to
be applied to streaming in the future.
### Why are the changes needed?
As described above, we would like to start reuse certain V2 batch
optimization rules to apply to streaming relations.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This is a pure refactoring, existing tests should be sufficient.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44818 from jackierwzhang/spark-46777.
Authored-by: jackierwzhang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 7 +-
.../catalyst/streaming/StreamingRelationV2.scala | 4 +-
.../datasources/v2/DataSourceV2Relation.scala | 83 ++++++++++++++++------
.../datasources/v2/DataSourceV2Strategy.scala | 4 +-
.../execution/streaming/MicroBatchExecution.scala | 12 ++--
.../sql/execution/streaming/ProgressReporter.scala | 4 +-
.../streaming/continuous/ContinuousExecution.scala | 14 ++--
.../sources/RateStreamProviderSuite.scala | 4 +-
.../streaming/sources/TextSocketStreamSuite.scala | 4 +-
.../apache/spark/sql/streaming/StreamSuite.scala | 8 +--
.../apache/spark/sql/streaming/StreamTest.scala | 4 +-
.../sql/streaming/StreamingQueryManagerSuite.scala | 4 +-
.../streaming/test/DataStreamTableAPISuite.scala | 2 +-
13 files changed, 99 insertions(+), 55 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index cee0d9a3dd72..fb5e71a1e7b8 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.TestUtils
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
ASYNC_PROGRESS_TRACKING_ENABLED}
@@ -125,7 +125,8 @@ abstract class KafkaSourceTest extends StreamTest with
SharedSparkSession with K
val sources: Seq[SparkDataStream] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _, _) => source
- case r: StreamingDataSourceV2Relation if
r.stream.isInstanceOf[KafkaMicroBatchStream] ||
+ case r: StreamingDataSourceV2ScanRelation
+ if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
r.stream.isInstanceOf[KafkaContinuousStream] =>
r.stream
}
@@ -1654,7 +1655,7 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
makeSureGetOffsetCalled,
AssertOnQuery { query =>
query.logicalPlan.exists {
- case r: StreamingDataSourceV2Relation =>
r.stream.isInstanceOf[KafkaMicroBatchStream]
+ case r: StreamingDataSourceV2ScanRelation =>
r.stream.isInstanceOf[KafkaMicroBatchStream]
case _ => false
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
index ab0352b606e5..c1d7daa6cfcf 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.streaming
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns,
LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier,
SupportsMetadataColumns, Table, TableProvider}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
@@ -36,7 +36,7 @@ case class StreamingRelationV2(
sourceName: String,
table: Table,
extraOptions: CaseInsensitiveStringMap,
- output: Seq[Attribute],
+ output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
v1Relation: Option[LogicalPlan])
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 9c7d776edc65..556283243f63 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -32,21 +32,21 @@ import org.apache.spark.util.Utils
/**
* A logical plan representing a data source v2 table.
*
- * @param table The table that this relation represents.
- * @param output the output attributes of this relation.
+ * @param table The table that this relation represents.
+ * @param output The output attributes of this relation.
* @param catalog catalogPlugin for the table. None if no catalog is specified.
- * @param identifier the identifier for the table. None if no identifier is
defined.
+ * @param identifier The identifier for the table. None if no identifier is
defined.
* @param options The options for this table operation. It's used to create
fresh
* [[org.apache.spark.sql.connector.read.ScanBuilder]] and
* [[org.apache.spark.sql.connector.write.WriteBuilder]].
*/
-case class DataSourceV2Relation(
+abstract class DataSourceV2RelationBase(
table: Table,
output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
- extends LeafNode with MultiInstanceRelation with NamedRelation with
ExposesMetadataColumns {
+ extends LeafNode with MultiInstanceRelation with NamedRelation {
import DataSourceV2Implicits._
@@ -54,14 +54,6 @@ case class DataSourceV2Relation(
case c: FunctionCatalog => c
}
- override lazy val metadataOutput: Seq[AttributeReference] = table match {
- case hasMeta: SupportsMetadataColumns =>
- metadataOutputWithOutConflicts(
- hasMeta.metadataColumns.toAttributes,
hasMeta.canRenameConflictingMetadataColumns)
- case _ =>
- Nil
- }
-
override def name: String = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
(catalog, identifier) match {
@@ -98,11 +90,34 @@ case class DataSourceV2Relation(
}
}
}
+}
+
+/**
+ * A specialization of [[DataSourceV2RelationBase]] that supports batch scan.
+ */
+case class DataSourceV2Relation(
+ table: Table,
+ override val output: Seq[AttributeReference],
+ catalog: Option[CatalogPlugin],
+ identifier: Option[Identifier],
+ options: CaseInsensitiveStringMap)
+ extends DataSourceV2RelationBase(table, output, catalog, identifier, options)
+ with ExposesMetadataColumns {
+
+ import DataSourceV2Implicits._
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
+ override lazy val metadataOutput: Seq[AttributeReference] = table match {
+ case hasMeta: SupportsMetadataColumns =>
+ metadataOutputWithOutConflicts(
+ hasMeta.metadataColumns.toAttributes,
hasMeta.canRenameConflictingMetadataColumns)
+ case _ =>
+ Nil
+ }
+
def withMetadataColumns(): DataSourceV2Relation = {
val newMetadata = metadataOutput.filterNot(outputSet.contains)
if (newMetadata.nonEmpty) {
@@ -152,21 +167,45 @@ case class DataSourceV2ScanRelation(
}
/**
- * A specialization of [[DataSourceV2Relation]] with the streaming bit set to
true.
- *
- * Note that, this plan has a mutable reader, so Spark won't apply operator
push-down for this plan,
- * to avoid making the plan mutable. We should consolidate this plan and
[[DataSourceV2Relation]]
- * after we figure out how to apply operator push-down for streaming data
sources.
+ * A specialization of [[DataSourceV2RelationBase]] that supports streaming
scan.
+ * It will be transformed to [[StreamingDataSourceV2ScanRelation]] during the
planning phase of
+ * [[MicrobatchExecution]].
*/
case class StreamingDataSourceV2Relation(
- output: Seq[Attribute],
- scan: Scan,
- stream: SparkDataStream,
+ table: Table,
+ override val output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
+ options: CaseInsensitiveStringMap,
+ metadataPath: String)
+ extends DataSourceV2RelationBase(table, output, catalog, identifier,
options) {
+
+ override def isStreaming: Boolean = true
+
+ override def newInstance(): StreamingDataSourceV2Relation = {
+ copy(output = output.map(_.newInstance()))
+ }
+}
+/**
+ * A specialization of [[DataSourceV2ScanRelation]] with the streaming bit set
to true, as well
+ * as start and end offsets for Microbatch processing.
+ */
+case class StreamingDataSourceV2ScanRelation(
+ relation: StreamingDataSourceV2Relation,
+ scan: Scan,
+ output: Seq[AttributeReference],
+ stream: SparkDataStream,
startOffset: Option[Offset] = None,
endOffset: Option[Offset] = None)
- extends LeafNode with MultiInstanceRelation {
+ extends LeafNode with MultiInstanceRelation with NamedRelation {
+
+ val (catalog, identifier) = (relation.catalog, relation.identifier)
+
+ override def name: String = relation.table.name()
+
+ override def simpleString(maxFields: Int): String = {
+ s"StreamingDataSourceV2ScanRelation${truncatedString(output, "[", ", ",
"]", maxFields)} $name"
+ }
override def isStreaming: Boolean = true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index fe3140c8030a..3cf311017e5e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -147,7 +147,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
StoragePartitionJoinParams(relation.keyGroupedPartitioning))
withProjectAndFilter(project, postScanFilters, batchExec,
!batchExec.supportsColumnar) :: Nil
- case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
+ case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isDefined =>
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
@@ -157,7 +157,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
// Add a Project here to make sure we produce unsafe rows.
withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
- case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
+ case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isEmpty =>
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 1bd59e818be5..8c98ad5c47dd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -32,7 +32,7 @@ import
org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset =
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
StreamingDataSourceV2Relation, StreamWriterCommitProgress,
WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation,
StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import
org.apache.spark.sql.execution.streaming.sources.{WriteToMicroBatchDataSource,
WriteToMicroBatchDataSourceV1}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.Trigger
@@ -103,7 +103,7 @@ class MicroBatchExecution(
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation,
StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2,
StreamingExecutionRelation]()
- val v2ToRelationMap = MutableMap[StreamingRelationV2,
StreamingDataSourceV2Relation]()
+ val v2ToRelationMap = MutableMap[StreamingRelationV2,
StreamingDataSourceV2ScanRelation]()
// We transform each distinct streaming relation into a
StreamingExecutionRelation, keeping a
// map as we go to ensure each identical relation gets the same
StreamingExecutionRelation
// object. For each microbatch, the StreamingExecutionRelation will be
replaced with a logical
@@ -140,7 +140,9 @@ class MicroBatchExecution(
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toMicroBatchStream(metadataPath)
- StreamingDataSourceV2Relation(output, scan, stream, catalog,
identifier)
+ val relation = StreamingDataSourceV2Relation(
+ table, output, catalog, identifier, options, metadataPath)
+ StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
})
} else if (v1.isEmpty) {
throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(
@@ -163,7 +165,7 @@ class MicroBatchExecution(
// v1 source
case s: StreamingExecutionRelation => s.source
// v2 source
- case r: StreamingDataSourceV2Relation => r.stream
+ case r: StreamingDataSourceV2ScanRelation => r.stream
}
// Initializing TriggerExecutor relies on `sources`, hence calling this
after initializing
@@ -706,7 +708,7 @@ class MicroBatchExecution(
}
// For v2 sources.
- case r: StreamingDataSourceV2Relation =>
+ case r: StreamingDataSourceV2ScanRelation =>
mutableNewData.get(r.stream).map {
case OffsetHolder(start, end) =>
r.copy(startOffset = Some(start), endOffset = Some(end))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index c01f156e3d70..ccbbf9a4d874 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream,
ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream}
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec,
StreamingDataSourceV2Relation, StreamWriterCommitProgress}
+import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec,
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent,
QueryProgressEvent}
import org.apache.spark.util.Clock
@@ -329,7 +329,7 @@ trait ProgressReporter extends Logging {
val onlyDataSourceV2Sources = {
// Check whether the streaming query's logical plan has only V2
micro-batch data sources
val allStreamingLeaves = logicalPlan.collect {
- case s: StreamingDataSourceV2Relation =>
s.stream.isInstanceOf[MicroBatchStream]
+ case s: StreamingDataSourceV2ScanRelation =>
s.stream.isInstanceOf[MicroBatchStream]
case _: StreamingExecutionRelation => false
}
allStreamingLeaves.forall(_ == true)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index a2d9a6705f97..1de05931faf5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Partitio
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering,
Write}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.SQLExecution
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation,
StreamingDataSourceV2ScanRelation}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.ArrayImplicits._
@@ -61,7 +61,7 @@ class ContinuousExecution(
private val failure: AtomicReference[Throwable] = new
AtomicReference[Throwable](null)
override val logicalPlan: WriteToContinuousDataSource = {
- val v2ToRelationMap = MutableMap[StreamingRelationV2,
StreamingDataSourceV2Relation]()
+ val v2ToRelationMap = MutableMap[StreamingRelationV2,
StreamingDataSourceV2ScanRelation]()
var nextSourceId = 0
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
@@ -79,12 +79,14 @@ class ContinuousExecution(
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toContinuousStream(metadataPath)
- StreamingDataSourceV2Relation(output, scan, stream, catalog,
identifier)
+ val relation = StreamingDataSourceV2Relation(
+ table, output, catalog, identifier, options, metadataPath)
+ StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
})
}
sources = _logicalPlan.collect {
- case r: StreamingDataSourceV2Relation =>
r.stream.asInstanceOf[ContinuousStream]
+ case r: StreamingDataSourceV2ScanRelation =>
r.stream.asInstanceOf[ContinuousStream]
}
uniqueSources = sources.distinct.map(s => s ->
ReadLimit.allAvailable()).toMap
@@ -197,7 +199,7 @@ class ContinuousExecution(
}
val withNewSources: LogicalPlan = logicalPlan transform {
- case relation: StreamingDataSourceV2Relation =>
+ case relation: StreamingDataSourceV2ScanRelation =>
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off =>
relation.stream.deserializeOffset(off.json))
val startOffset = realOffset.getOrElse(relation.stream.initialOffset)
@@ -227,7 +229,7 @@ class ContinuousExecution(
}
val stream = withNewSources.collect {
- case relation: StreamingDataSourceV2Relation =>
+ case relation: StreamingDataSourceV2ScanRelation =>
relation.stream.asInstanceOf[ContinuousStream]
}.head
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 1f5f3d76d104..69dc8c291c0b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.execution.datasources.DataSource
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.functions._
@@ -43,7 +43,7 @@ class RateStreamProviderSuite extends StreamTest {
override def addData(query: Option[StreamExecution]): (SparkDataStream,
Offset) = {
assert(query.nonEmpty)
val rateSource = query.get.logicalPlan.collect {
- case r: StreamingDataSourceV2Relation
+ case r: StreamingDataSourceV2ScanRelation
if r.stream.isInstanceOf[RateStreamMicroBatchStream] =>
r.stream.asInstanceOf[RateStreamMicroBatchStream]
}.head
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 15aa0a80c207..bfeca5851102 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.execution.datasources.DataSource
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
@@ -59,7 +59,7 @@ class TextSocketStreamSuite extends StreamTest with
SharedSparkSession {
"Cannot add data when there is no query for finding the active socket
source")
val sources = query.get.logicalPlan.collect {
- case r: StreamingDataSourceV2Relation
+ case r: StreamingDataSourceV2ScanRelation
if r.stream.isInstanceOf[TextSocketMicroBatchStream] =>
r.stream.asInstanceOf[TextSocketMicroBatchStream]
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b0e54737d104..1b0b53357e5b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -511,7 +511,7 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
- assert("StreamingDataSourceV2Relation".r
+ assert("StreamingDataSourceV2ScanRelation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
assert("BatchScan".r
.findAllMatchIn(explainWithoutExtended).size === 1)
@@ -521,7 +521,7 @@ class StreamSuite extends StreamTest {
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans
(Parsed/Optimized/Optimized) and 1 physical
// plan.
- assert("StreamingDataSourceV2Relation".r
+ assert("StreamingDataSourceV2ScanRelation".r
.findAllMatchIn(explainWithExtended).size === 3)
assert("BatchScan".r
.findAllMatchIn(explainWithExtended).size === 1)
@@ -566,7 +566,7 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
- assert("StreamingDataSourceV2Relation".r
+ assert("StreamingDataSourceV2ScanRelation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
assert("ContinuousScan".r
.findAllMatchIn(explainWithoutExtended).size === 1)
@@ -574,7 +574,7 @@ class StreamSuite extends StreamTest {
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans
(Parsed/Optimized/Optimized) and 1 physical
// plan.
- assert("StreamingDataSourceV2Relation".r
+ assert("StreamingDataSourceV2ScanRelation".r
.findAllMatchIn(explainWithExtended).size === 3)
assert("ContinuousScan".r
.findAllMatchIn(explainWithExtended).size === 1)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7ee66d18d481..f4816b04bbb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.physical.AllTuples
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2,
SparkDataStream}
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
EpochCoordinatorRef, IncrementAndGetEpoch}
import org.apache.spark.sql.execution.streaming.sources.MemorySink
@@ -702,7 +702,7 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
// v1 source
case r: StreamingExecutionRelation => r.source
// v2 source
- case r: StreamingDataSourceV2Relation => r.stream
+ case r: StreamingDataSourceV2ScanRelation => r.stream
// We can add data to memory stream before starting it. Then
the input plan has
// not been processed by the streaming engine and contains
`StreamingRelationV2`.
case r: StreamingRelationV2 if r.sourceName == "memory" =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 754f55202254..53cbbe6e786f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.{Dataset, Encoders}
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
@@ -467,7 +467,7 @@ class StreamingQueryManagerSuite extends StreamTest {
if (withError) {
logDebug(s"Terminating query ${queryToStop.name} with error")
queryToStop.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect
{
- case r: StreamingDataSourceV2Relation =>
+ case r: StreamingDataSourceV2ScanRelation =>
r.stream.asInstanceOf[MemoryStream[Int]].addData(0)
}
} else {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 5a4f386f1d1d..eecc9468649d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -433,7 +433,7 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
val explainWithExtended = sq.explainInternal(true)
// `extended = true` displays 3 logical plans
(Parsed/Analyzed/Optimized) and 1 physical
// plan.
- assert("StreamingDataSourceV2Relation".r
+ assert("StreamingDataSourceV2ScanRelation".r
.findAllMatchIn(explainWithExtended).size === 3)
// WriteToMicroBatchDataSource is used for both parsed and analyzed
logical plan
assert("WriteToMicroBatchDataSource".r
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]