This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 098a2c4 [SPARK-26520][SQL] data source v2 API refactor (micro-batch
read)
098a2c4 is described below
commit 098a2c41fcc46ba079106b2c9eb31197c8cb3bc9
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Jan 21 14:29:12 2019 -0800
[SPARK-26520][SQL] data source v2 API refactor (micro-batch read)
## What changes were proposed in this pull request?
Following https://github.com/apache/spark/pull/23086, this PR does the API
refactor for micro-batch read, w.r.t. the
[doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
The major changes:
1. rename `XXXMicroBatchReadSupport` to `XXXMicroBatchReadStream`
2. implement `TableProvider`, `Table`, `ScanBuilder` and `Scan` for
streaming sources
3. at the beginning of micro-batch streaming execution, convert
`StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of
`StreamingExecutionRelation`.
followup:
support operator pushdown for stream sources
## How was this patch tested?
existing tests
Closes #23430 from cloud-fan/micro-batch.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
---
...adSupport.scala => KafkaMicroBatchStream.scala} | 28 ++----
.../spark/sql/kafka010/KafkaSourceProvider.scala | 86 ++++++++++-------
.../sql/kafka010/KafkaContinuousSourceSuite.scala | 4 +-
.../spark/sql/kafka010/KafkaContinuousTest.scala | 4 +-
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 28 +++---
.../sources/v2/MicroBatchReadSupportProvider.java | 70 --------------
.../sql/sources/v2/SupportsMicroBatchRead.java} | 27 +++---
.../apache/spark/sql/sources/v2/reader/Scan.java | 18 ++++
.../v2/reader/streaming/MicroBatchReadSupport.java | 60 ------------
.../v2/reader/streaming/MicroBatchStream.java | 57 +++++++++++
.../sql/sources/v2/reader/streaming/Offset.java | 2 +-
...eamingReadSupport.java => SparkDataStream.java} | 16 ++--
.../v2/reader/streaming/StreamingReadSupport.java | 2 +-
...mingScanExec.scala => ContinuousScanExec.scala} | 12 +--
.../datasources/v2/DataSourceV2Relation.scala | 27 +++++-
.../datasources/v2/DataSourceV2Strategy.scala | 16 +++-
.../datasources/v2/MicroBatchScanExec.scala | 92 ++++++++++++++++++
.../execution/streaming/MicroBatchExecution.scala | 105 +++++++++++----------
.../sql/execution/streaming/ProgressReporter.scala | 18 ++--
.../execution/streaming/StreamingRelation.scala | 3 +-
.../streaming/continuous/ContinuousExecution.scala | 10 +-
.../spark/sql/execution/streaming/memory.scala | 64 ++++++++++---
.../streaming/sources/ContinuousMemoryStream.scala | 3 +-
...ort.scala => RateControlMicroBatchStream.scala} | 6 +-
...port.scala => RateStreamMicroBatchStream.scala} | 45 +++------
.../streaming/sources/RateStreamProvider.scala | 72 +++++++++-----
...cket.scala => TextSocketMicroBatchStream.scala} | 94 +++---------------
.../sources/TextSocketSourceProvider.scala | 104 ++++++++++++++++++++
.../spark/sql/streaming/DataStreamReader.scala | 98 +++++++++----------
.../sources/RateStreamProviderSuite.scala | 88 ++++++++---------
.../streaming/sources/TextSocketStreamSuite.scala | 33 +++----
.../apache/spark/sql/streaming/StreamSuite.scala | 21 ++---
.../apache/spark/sql/streaming/StreamTest.scala | 18 +++-
.../sql/streaming/StreamingQueryManagerSuite.scala | 9 +-
.../spark/sql/streaming/StreamingQuerySuite.scala | 6 +-
.../sql/streaming/continuous/ContinuousSuite.scala | 4 +-
.../sources/StreamingDataSourceV2Suite.scala | 73 ++++++++++----
37 files changed, 800 insertions(+), 623 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
similarity index 93%
rename from
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
rename to
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 1c1d26a..3ae9bd3 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -30,17 +30,16 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog,
SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
-import
org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog,
SerializedOffset}
+import
org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset}
import org.apache.spark.util.UninterruptibleThread
/**
- * A [[MicroBatchReadSupport]] that reads data from Kafka.
+ * A [[MicroBatchStream]] that reads data from Kafka.
*
* The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source
that contains
* a map of TopicPartition -> offset. Note that this offset is 1 + (available
offset). For
@@ -55,13 +54,13 @@ import org.apache.spark.util.UninterruptibleThread
* To avoid this issue, you should make sure stopping the query before
stopping the Kafka brokers
* and not use wrong broker addresses.
*/
-private[kafka010] class KafkaMicroBatchReadSupport(
+private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
options: DataSourceOptions,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
- failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with
Logging {
+ failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
@@ -94,16 +93,9 @@ private[kafka010] class KafkaMicroBatchReadSupport(
endPartitionOffsets
}
- override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
- override def newScanConfigBuilder(start: Offset, end: Offset):
ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
- }
-
- override def planInputPartitions(config: ScanConfig): Array[InputPartition]
= {
- val sc = config.asInstanceOf[SimpleStreamingScanConfig]
- val startPartitionOffsets =
sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
- val endPartitionOffsets =
sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ val startPartitionOffsets =
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+ val endPartitionOffsets =
end.asInstanceOf[KafkaSourceOffset].partitionToOffsets
// Find the new partitions, and get their earliest offsets
val newPartitions =
endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
@@ -168,7 +160,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
}.toArray
}
- override def createReaderFactory(config: ScanConfig): PartitionReaderFactory
= {
+ override def createReaderFactory(): PartitionReaderFactory = {
KafkaMicroBatchReaderFactory
}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index b59f21a..58c90b8 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -47,7 +49,7 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
with CreatableRelationProvider
with StreamingWriteSupportProvider
with ContinuousReadSupportProvider
- with MicroBatchReadSupportProvider
+ with TableProvider
with Logging {
import KafkaSourceProvider._
@@ -101,40 +103,8 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}
- /**
- * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to
read
- * batches of Kafka data in a micro-batch streaming query.
- */
- override def createMicroBatchReadSupport(
- metadataPath: String,
- options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
- val parameters = options.asMap().asScala.toMap
- validateStreamOptions(parameters)
- // Each running query should use its own group id. Otherwise, the query
may be only assigned
- // partial data since Kafka will assign partitions to multiple consumers
having the same group
- // id. Hence, we should generate a unique id for each query.
- val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
-
- val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams = convertToSpecifiedParams(parameters)
-
- val startingStreamOffsets =
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
- STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
- val kafkaOffsetReader = new KafkaOffsetReader(
- strategy(caseInsensitiveParams),
- kafkaParamsForDriver(specifiedKafkaParams),
- parameters,
- driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
- new KafkaMicroBatchReadSupport(
- kafkaOffsetReader,
- kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
- options,
- metadataPath,
- startingStreamOffsets,
- failOnDataLoss(caseInsensitiveParams))
+ override def getTable(options: DataSourceOptions): KafkaTable = {
+ new KafkaTable(strategy(options.asMap().asScala.toMap))
}
/**
@@ -434,6 +404,52 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
logWarning("maxOffsetsPerTrigger option ignored in batch queries")
}
}
+
+ class KafkaTable(strategy: => ConsumerStrategy) extends Table
+ with SupportsMicroBatchRead {
+
+ override def name(): String = s"Kafka $strategy"
+
+ override def schema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new
ScanBuilder {
+ override def build(): Scan = new KafkaScan(options)
+ }
+ }
+
+ class KafkaScan(options: DataSourceOptions) extends Scan {
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ val parameters = options.asMap().asScala.toMap
+ validateStreamOptions(parameters)
+ // Each running query should use its own group id. Otherwise, the query
may be only assigned
+ // partial data since Kafka will assign partitions to multiple consumers
having the same group
+ // id. Hence, we should generate a unique id for each query.
+ val uniqueGroupId = streamingUniqueGroupId(parameters,
checkpointLocation)
+
+ val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
+
+ val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
+ caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)
+
+ val kafkaOffsetReader = new KafkaOffsetReader(
+ strategy(parameters),
+ kafkaParamsForDriver(specifiedKafkaParams),
+ parameters,
+ driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
+ new KafkaMicroBatchStream(
+ kafkaOffsetReader,
+ kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+ options,
+ checkpointLocation,
+ startingStreamOffsets,
+ failOnDataLoss(caseInsensitiveParams))
+ }
+ }
}
private[kafka010] object KafkaSourceProvider extends Logging {
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 9ba066a..2f7fd7f 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.sql.Dataset
-import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
+import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger
@@ -208,7 +208,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends
KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.executedPlan.collectFirst {
- case scan: DataSourceV2StreamingScanExec
+ case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists { config =>
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index 5549e82..fa3b623 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd,
SparkListenerTaskStart}
-import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
+import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
@@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.executedPlan.collectFirst {
- case scan: DataSourceV2StreamingScanExec
+ case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists(_.knownPartitions.size == newCount),
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index cb45384..90b5015 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -35,7 +35,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation,
StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@@ -118,11 +118,13 @@ abstract class KafkaSourceTest extends StreamTest with
SharedSQLContext with Kaf
val sources: Seq[BaseStreamingSource] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _) => source
- case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport,
_) => source
+ case r: StreamingDataSourceV2Relation
+ if r.stream.isInstanceOf[KafkaMicroBatchStream] =>
+ r.stream.asInstanceOf[KafkaMicroBatchStream]
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
- case r: StreamingDataSourceV2Relation
+ case r: OldStreamingDataSourceV2Relation
if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
}
@@ -1062,9 +1064,10 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
testStream(kafka)(
makeSureGetOffsetCalled,
AssertOnQuery { query =>
- query.logicalPlan.collect {
- case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) =>
true
- }.nonEmpty
+ query.logicalPlan.find {
+ case r: StreamingDataSourceV2Relation =>
r.stream.isInstanceOf[KafkaMicroBatchStream]
+ case _ => false
+ }.isDefined
}
)
}
@@ -1088,13 +1091,12 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
- val readSupport = provider.createMicroBatchReadSupport(
- dir.getAbsolutePath, new DataSourceOptions(options.asJava))
- val config = readSupport.newScanConfigBuilder(
+ val dsOptions = new DataSourceOptions(options.asJava)
+ val table = provider.getTable(dsOptions)
+ val stream =
table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
+ val inputPartitions = stream.planInputPartitions(
KafkaSourceOffset(Map(tp -> 0L)),
- KafkaSourceOffset(Map(tp -> 100L))).build()
- val inputPartitions = readSupport.planInputPartitions(config)
- .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
+ KafkaSourceOffset(Map(tp ->
100L))).map(_.asInstanceOf[KafkaMicroBatchInputPartition])
withClue(s"minPartitions = $minPartitions generated factories
$inputPartitions\n\t") {
assert(inputPartitions.size == numPartitionsGenerated)
inputPartitions.foreach { f => assert(f.reuseKafkaConsumer ==
reusesConsumers) }
@@ -1410,7 +1412,7 @@ abstract class KafkaSourceSuiteBase extends
KafkaSourceTest {
val reader = spark
.readStream
.format("kafka")
- .option("startingOffsets", s"latest")
+ .option("startingOffsets", "latest")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("failOnDataLoss", failOnDataLoss.toString)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
deleted file mode 100644
index c4d9ef8..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
- * provide data reading ability for micro-batch stream processing.
- *
- * This interface is used to create {@link MicroBatchReadSupport} instances
when end users run
- * {@code SparkSession.readStream.format(...).option(...).load()} with a
micro-batch trigger.
- */
-@Evolving
-public interface MicroBatchReadSupportProvider extends DataSourceV2 {
-
- /**
- * Creates a {@link MicroBatchReadSupport} instance to scan the data from
this streaming data
- * source with a user specified schema, which is called by Spark at the
beginning of each
- * micro-batch streaming query.
- *
- * By default this method throws {@link UnsupportedOperationException},
implementations should
- * override this method to handle user specified schema.
- *
- * @param schema the user provided schema.
- * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
- * recovery. Readers for the same logical source
in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is
an immutable
- * case-insensitive string-to-string map.
- */
- default MicroBatchReadSupport createMicroBatchReadSupport(
- StructType schema,
- String checkpointLocation,
- DataSourceOptions options) {
- return DataSourceV2Utils.failForUserSpecifiedSchema(this);
- }
-
- /**
- * Creates a {@link MicroBatchReadSupport} instance to scan the data from
this streaming data
- * source, which is called by Spark at the beginning of each micro-batch
streaming query.
- *
- * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
- * recovery. Readers for the same logical source
in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is
an immutable
- * case-insensitive string-to-string map.
- */
- MicroBatchReadSupport createMicroBatchReadSupport(
- String checkpointLocation,
- DataSourceOptions options);
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
similarity index 55%
copy from
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
copy to
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
index 90680ea..9408e32 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
@@ -15,17 +15,20 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.streaming.sources
+package org.apache.spark.sql.sources.v2;
-import
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
-// A special `MicroBatchReadSupport` that can get latestOffset with a start
offset.
-trait RateControlMicroBatchReadSupport extends MicroBatchReadSupport {
-
- override def latestOffset(): Offset = {
- throw new IllegalAccessException(
- "latestOffset should not be called for RateControlMicroBatchReadSupport")
- }
-
- def latestOffset(start: Offset): Offset
-}
+/**
+ * An empty mix-in interface for {@link Table}, to indicate this table
supports streaming scan with
+ * micro-batch mode.
+ * <p>
+ * If a {@link Table} implements this interface, the
+ * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link
ScanBuilder} that
+ * builds {@link Scan} with {@link Scan#toMicroBatchStream(String)}
implemented.
+ * </p>
+ */
+@Evolving
+public interface SupportsMicroBatchRead extends SupportsRead { }
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
index 4d84fb1..c60fb2b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -18,8 +18,10 @@
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
/**
@@ -65,4 +67,20 @@ public interface Scan {
default Batch toBatch() {
throw new UnsupportedOperationException("Batch scans are not supported");
}
+
+ /**
+ * Returns the physical representation of this scan for streaming query with
micro-batch mode. By
+ * default this method throws exception, data sources must overwrite this
method to provide an
+ * implementation, if the {@link Table} that creates this scan implements
+ * {@link SupportsMicroBatchRead}.
+ *
+ * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
+ * recovery. Data streams for the same logical
source in the same query
+ * will be given the same checkpointLocation.
+ *
+ * @throws UnsupportedOperationException
+ */
+ default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
+ throw new UnsupportedOperationException("Micro-batch scans are not
supported");
+ }
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
deleted file mode 100644
index f56066c..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.*;
-
-/**
- * An interface that defines how to scan the data from data source for
micro-batch streaming
- * processing.
- *
- * The execution engine will get an instance of this interface from a data
source provider
- * (e.g. {@link
org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of
a
- * streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)}
and create an instance
- * of {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be
used to create input
- * partitions and reader factory to scan a micro-batch with a Spark job. At
the end {@link #stop()}
- * will be called when the streaming execution is completed. Note that a
single query may have
- * multiple executions due to restart or failure recovery.
- */
-@Evolving
-public interface MicroBatchReadSupport extends StreamingReadSupport,
BaseStreamingSource {
-
- /**
- * Returns a builder of {@link ScanConfig}. Spark will call this method and
create a
- * {@link ScanConfig} for each data scanning job.
- *
- * The builder can take some query specific information to do operators
pushdown, store streaming
- * offsets, etc., and keep these information in the created {@link
ScanConfig}.
- *
- * This is the first step of the data scan. All other methods in {@link
MicroBatchReadSupport}
- * needs to take {@link ScanConfig} as an input.
- */
- ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
-
- /**
- * Returns a factory, which produces one {@link PartitionReader} for one
{@link InputPartition}.
- */
- PartitionReaderFactory createReaderFactory(ScanConfig config);
-
- /**
- * Returns the most recent offset available.
- */
- Offset latestOffset();
-}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
new file mode 100644
index 0000000..2fb3957
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+
+/**
+ * A {@link SparkDataStream} for streaming queries with micro-batch mode.
+ */
+@Evolving
+public interface MicroBatchStream extends SparkDataStream {
+
+ /**
+ * Returns the most recent offset available.
+ */
+ Offset latestOffset();
+
+ /**
+ * Returns a list of {@link InputPartition input partitions} given the start
and end offsets. Each
+ * {@link InputPartition} represents a data split that can be processed by
one Spark task. The
+ * number of input partitions returned here is the same as the number of RDD
partitions this scan
+ * outputs.
+ * <p>
+ * If the {@link Scan} supports filter pushdown, this stream is likely
configured with a filter
+ * and is responsible for creating splits for that filter, which is not a
full scan.
+ * </p>
+ * <p>
+ * This method will be called multiple times, to launch one Spark job for
each micro-batch in this
+ * data stream.
+ * </p>
+ */
+ InputPartition[] planInputPartitions(Offset start, Offset end);
+
+ /**
+ * Returns a factory, which produces one {@link PartitionReader} for one
{@link InputPartition}.
+ */
+ PartitionReaderFactory createReaderFactory();
+}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
index 6104175..67bff0c 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader.streaming;
import org.apache.spark.annotation.Evolving;
/**
- * An abstract representation of progress through a {@link
MicroBatchReadSupport} or
+ * An abstract representation of progress through a {@link MicroBatchStream} or
* {@link ContinuousReadSupport}.
* During execution, offsets provided by the data source implementation will
be logged and used as
* restart checkpoints. Each source should provide an offset implementation
which the source can use
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
similarity index 78%
copy from
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
copy to
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
index bd39fc8..8ea34be 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
@@ -17,17 +17,17 @@
package org.apache.spark.sql.sources.v2.reader.streaming;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
/**
- * A base interface for streaming read support. Data sources should implement
concrete streaming
- * read support interfaces: {@link MicroBatchReadSupport} or {@link
ContinuousReadSupport}.
- * This is exposed for a testing purpose.
+ * The base interface representing a readable data stream in a Spark streaming
query. It's
+ * responsible to manage the offsets of the streaming source in the streaming
query.
+ *
+ * Data sources should implement concrete data stream interfaces: {@link
MicroBatchStream}.
*/
-@VisibleForTesting
-public interface StreamingReadSupport extends ReadSupport {
+@Evolving
+public interface SparkDataStream extends BaseStreamingSource {
/**
* Returns the initial offset for a streaming query to start reading from.
Note that the
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
index bd39fc8..9a8c1bd 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
@@ -23,7 +23,7 @@ import org.apache.spark.sql.sources.v2.reader.ReadSupport;
/**
* A base interface for streaming read support. Data sources should implement
concrete streaming
- * read support interfaces: {@link MicroBatchReadSupport} or {@link
ContinuousReadSupport}.
+ * read support interfaces: {@link ContinuousReadSupport}.
* This is exposed for a testing purpose.
*/
@VisibleForTesting
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
similarity index 91%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
index be75fe4..c735b0e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
@@ -26,14 +26,13 @@ import org.apache.spark.sql.execution.{ColumnarBatchScan,
LeafExecNode, WholeSta
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
ContinuousReadSupport, MicroBatchReadSupport}
+import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
ContinuousReadSupport}
/**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning data from a streaming data source with
continuous mode.
*/
-// TODO: micro-batch should be handled by `DataSourceV2ScanExec`, after we
finish the API refactor
-// completely.
-case class DataSourceV2StreamingScanExec(
+// TODO: merge it and `MicroBatchScanExec`.
+case class ContinuousScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient options: Map[String, String],
@@ -46,7 +45,7 @@ case class DataSourceV2StreamingScanExec(
// TODO: unify the equal/hashCode implementation for all data source v2
query plans.
override def equals(other: Any): Boolean = other match {
- case other: DataSourceV2StreamingScanExec =>
+ case other: ContinuousScanExec =>
output == other.output && readSupport.getClass ==
other.readSupport.getClass &&
options == other.options
case _ => false
@@ -70,7 +69,6 @@ case class DataSourceV2StreamingScanExec(
private lazy val partitions: Seq[InputPartition] =
readSupport.planInputPartitions(scanConfig)
private lazy val readerFactory = readSupport match {
- case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig)
case r: ContinuousReadSupport =>
r.createContinuousReaderFactory(scanConfig)
case _ => throw new IllegalStateException("unknown read support: " +
readSupport)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 6321578..63e97e6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -23,11 +23,12 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation,
NamedRelation}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan,
Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset,
SparkDataStream}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
@@ -92,6 +93,28 @@ case class DataSourceV2Relation(
* after we figure out how to apply operator push-down for streaming data
sources.
*/
case class StreamingDataSourceV2Relation(
+ output: Seq[Attribute],
+ scanDesc: String,
+ stream: SparkDataStream,
+ startOffset: Option[Offset] = None,
+ endOffset: Option[Offset] = None)
+ extends LeafNode with MultiInstanceRelation {
+
+ override def isStreaming: Boolean = true
+
+ override def newInstance(): LogicalPlan = copy(output =
output.map(_.newInstance()))
+
+ override def computeStats(): Statistics = stream match {
+ case r: SupportsReportStatistics =>
+ val statistics = r.estimateStatistics()
+ Statistics(sizeInBytes =
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+ case _ =>
+ Statistics(sizeInBytes = conf.defaultSizeInBytes)
+ }
+}
+
+// TODO: remove it after finish API refactor for continuous streaming.
+case class OldStreamingDataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
options: Map[String, String],
@@ -111,7 +134,7 @@ case class StreamingDataSourceV2Relation(
// TODO: unify the equal/hashCode implementation for all data source v2
query plans.
override def equals(other: Any): Boolean = other match {
- case other: StreamingDataSourceV2Relation =>
+ case other: OldStreamingDataSourceV2Relation =>
output == other.output && readSupport.getClass ==
other.readSupport.getClass &&
options == other.options
case _ => false
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 79540b0..b4c5471 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{FilterExec,
ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec,
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
+import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport,
MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
object DataSourceV2Strategy extends Strategy {
@@ -125,12 +125,19 @@ object DataSourceV2Strategy extends Strategy {
// always add the projection, which will produce unsafe rows required by
some operators
ProjectExec(project, withFilter) :: Nil
- case r: StreamingDataSourceV2Relation =>
+ case r: StreamingDataSourceV2Relation if r.startOffset.isDefined &&
r.endOffset.isDefined =>
+ val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
+ // ensure there is a projection, which will produce unsafe rows required
by some operators
+ ProjectExec(r.output,
+ MicroBatchScanExec(
+ r.output, r.scanDesc, microBatchStream, r.startOffset.get,
r.endOffset.get)) :: Nil
+
+ case r: OldStreamingDataSourceV2Relation =>
// TODO: support operator pushdown for streaming data sources.
val scanConfig = r.scanConfigBuilder.build()
// ensure there is a projection, which will produce unsafe rows required
by some operators
ProjectExec(r.output,
- DataSourceV2StreamingScanExec(
+ ContinuousScanExec(
r.output, r.source, r.options, r.pushedFilters, r.readSupport,
scanConfig)) :: Nil
case WriteToDataSourceV2(writer, query) =>
@@ -151,7 +158,8 @@ object DataSourceV2Strategy extends Strategy {
case Repartition(1, false, child) =>
val isContinuous = child.find {
- case s: StreamingDataSourceV2Relation =>
s.readSupport.isInstanceOf[ContinuousReadSupport]
+ case s: OldStreamingDataSourceV2Relation =>
+ s.readSupport.isInstanceOf[ContinuousReadSupport]
case _ => false
}.isDefined
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
new file mode 100644
index 0000000..feea8bc
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode,
WholeStageCodegenExec}
+import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset}
+
+/**
+ * Physical plan node for scanning a micro-batch of data from a data source.
+ */
+case class MicroBatchScanExec(
+ output: Seq[Attribute],
+ scanDesc: String,
+ @transient stream: MicroBatchStream,
+ @transient start: Offset,
+ @transient end: Offset) extends LeafExecNode with ColumnarBatchScan {
+
+ override def simpleString(maxFields: Int): String = {
+ s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
+ }
+
+ // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: MicroBatchScanExec => this.stream == other.stream
+ case _ => false
+ }
+
+ override def hashCode(): Int = stream.hashCode()
+
+ private lazy val partitions = stream.planInputPartitions(start, end)
+
+ private lazy val readerFactory = stream.createReaderFactory()
+
+ override def outputPartitioning: physical.Partitioning = stream match {
+ case _ if partitions.length == 1 =>
+ SinglePartition
+
+ case s: SupportsReportPartitioning =>
+ new DataSourcePartitioning(
+ s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
+
+ case _ => super.outputPartitioning
+ }
+
+ override def supportsBatch: Boolean = {
+ require(partitions.forall(readerFactory.supportColumnarReads) ||
+ !partitions.exists(readerFactory.supportColumnarReads),
+ "Cannot mix row-based and columnar input partitions.")
+
+ partitions.exists(readerFactory.supportColumnarReads)
+ }
+
+ private lazy val inputRDD: RDD[InternalRow] = {
+ new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (supportsBatch) {
+ WholeStageCodegenExec(this)(codegenStageId = 0).execute()
+ } else {
+ val numOutputRows = longMetric("numOutputRows")
+ inputRDD.map { r =>
+ numOutputRows += 1
+ r
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index db1bf32..64270e1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -22,15 +22,15 @@ import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias,
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation,
LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation,
StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite,
RateControlMicroBatchReadSupport}
+import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite,
RateControlMicroBatchStream}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset
=> OffsetV2}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset => OffsetV2}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@@ -51,9 +51,6 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
- private val readSupportToDataSourceMap =
- MutableMap.empty[MicroBatchReadSupport, (DataSourceV2, Map[String,
String])]
-
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
@@ -69,6 +66,7 @@ class MicroBatchExecution(
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation,
StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2,
StreamingExecutionRelation]()
+ val v2ToRelationMap = MutableMap[StreamingRelationV2,
StreamingDataSourceV2Relation]()
// We transform each distinct streaming relation into a
StreamingExecutionRelation, keeping a
// map as we go to ensure each identical relation gets the same
StreamingExecutionRelation
// object. For each microbatch, the StreamingExecutionRelation will be
replaced with a logical
@@ -90,36 +88,39 @@ class MicroBatchExecution(
logInfo(s"Using Source [$source] from DataSourceV1 named
'$sourceName' [$dataSourceV1]")
StreamingExecutionRelation(source, output)(sparkSession)
})
- case s @ StreamingRelationV2(
- dataSourceV2: MicroBatchReadSupportProvider, sourceName, options,
output, _) if
- !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) =>
- v2ToExecutionRelationMap.getOrElseUpdate(s, {
+ case s @ StreamingRelationV2(ds, dsName, table: SupportsMicroBatchRead,
options, output, _)
+ if !disabledSources.contains(ds.getClass.getCanonicalName) =>
+ v2ToRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- val readSupport = dataSourceV2.createMicroBatchReadSupport(
- metadataPath,
- new DataSourceOptions(options.asJava))
nextSourceId += 1
- readSupportToDataSourceMap(readSupport) = dataSourceV2 -> options
- logInfo(s"Using MicroBatchReadSupport [$readSupport] from " +
- s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
- StreamingExecutionRelation(readSupport, output)(sparkSession)
+ logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName'
[$ds]")
+ val dsOptions = new DataSourceOptions(options.asJava)
+ // TODO: operator pushdown.
+ val scan = table.newScanBuilder(dsOptions).build()
+ val stream = scan.toMicroBatchStream(metadataPath)
+ StreamingDataSourceV2Relation(output, scan.description(), stream)
})
- case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output,
v1Relation) =>
+ case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
if (v1Relation.isEmpty) {
throw new UnsupportedOperationException(
- s"Data source $sourceName does not support microbatch
processing.")
+ s"Data source $dsName does not support microbatch processing.")
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
- logInfo(s"Using Source [$source] from DataSourceV2 named
'$sourceName' [$dataSourceV2]")
+ logInfo(s"Using Source [$source] from DataSourceV2 named '$dsName'
[$ds]")
StreamingExecutionRelation(source, output)(sparkSession)
})
}
- sources = _logicalPlan.collect { case s: StreamingExecutionRelation =>
s.source }
+ sources = _logicalPlan.collect {
+ // v1 source
+ case s: StreamingExecutionRelation => s.source
+ // v2 source
+ case r: StreamingDataSourceV2Relation => r.stream
+ }
uniqueSources = sources.distinct
_logicalPlan
}
@@ -350,7 +351,7 @@ class MicroBatchExecution(
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
- case s: RateControlMicroBatchReadSupport =>
+ case s: RateControlMicroBatchStream =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
val startOffset = availableOffsets
@@ -358,7 +359,7 @@ class MicroBatchExecution(
.getOrElse(s.initialOffset())
(s, Option(s.latestOffset(startOffset)))
}
- case s: MicroBatchReadSupport =>
+ case s: MicroBatchStream =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
(s, Option(s.latestOffset()))
@@ -402,8 +403,8 @@ class MicroBatchExecution(
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src: Source, off) => src.commit(off)
- case (readSupport: MicroBatchReadSupport, off) =>
- readSupport.commit(readSupport.deserializeOffset(off.json))
+ case (stream: MicroBatchStream, off) =>
+ stream.commit(stream.deserializeOffset(off.json))
case (src, _) =>
throw new IllegalArgumentException(
s"Unknown source is found at constructNextBatch: $src")
@@ -448,39 +449,30 @@ class MicroBatchExecution(
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch.logicalPlan)
- // TODO(cloud-fan): for data source v2, the new batch is just a new
`ScanConfigBuilder`, but
- // to be compatible with streaming source v1, we return a logical plan
as a new batch here.
- case (readSupport: MicroBatchReadSupport, available)
- if committedOffsets.get(readSupport).map(_ !=
available).getOrElse(true) =>
- val current = committedOffsets.get(readSupport).map {
- off => readSupport.deserializeOffset(off.json)
+ case (stream: MicroBatchStream, available)
+ if committedOffsets.get(stream).map(_ != available).getOrElse(true)
=>
+ val current = committedOffsets.get(stream).map {
+ off => stream.deserializeOffset(off.json)
}
val endOffset: OffsetV2 = available match {
- case v1: SerializedOffset => readSupport.deserializeOffset(v1.json)
+ case v1: SerializedOffset => stream.deserializeOffset(v1.json)
case v2: OffsetV2 => v2
}
- val startOffset = current.getOrElse(readSupport.initialOffset)
- val scanConfigBuilder =
readSupport.newScanConfigBuilder(startOffset, endOffset)
- logDebug(s"Retrieving data from $readSupport: $current ->
$endOffset")
-
- val (source, options) = readSupport match {
- // `MemoryStream` is special. It's for test only and doesn't have
a `DataSourceV2`
- // implementation. We provide a fake one here for explain.
- case _: MemoryStream[_] => MemoryStreamDataSource ->
Map.empty[String, String]
- // Provide a fake value here just in case something went wrong,
e.g. the reader gives
- // a wrong `equals` implementation.
- case _ => readSupportToDataSourceMap.getOrElse(readSupport, {
- FakeDataSourceV2 -> Map.empty[String, String]
- })
- }
- Some(readSupport -> StreamingDataSourceV2Relation(
- readSupport.fullSchema().toAttributes, source, options,
readSupport, scanConfigBuilder))
+ val startOffset = current.getOrElse(stream.initialOffset)
+ logDebug(s"Retrieving data from $stream: $current -> $endOffset")
+
+ // To be compatible with the v1 source, the `newData` is represented
as a logical plan,
+ // while the `newData` of v2 source is just the start and end
offsets. Here we return a
+ // fake logical plan to carry the offsets.
+ Some(stream -> OffsetHolder(startOffset, endOffset))
+
case _ => None
}
}
// Replace sources in the logical plan with data that has arrived since
the last batch.
val newBatchesPlan = logicalPlan transform {
+ // For v1 sources.
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { dataPlan =>
val maxFields = SQLConf.get.maxToStringFields
@@ -495,6 +487,15 @@ class MicroBatchExecution(
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
+
+ // For v2 sources.
+ case r: StreamingDataSourceV2Relation =>
+ newData.get(r.stream).map {
+ case OffsetHolder(start, end) =>
+ r.copy(startOffset = Some(start), endOffset = Some(end))
+ }.getOrElse {
+ LocalRelation(r.output, isStreaming = true)
+ }
}
// Rewire the plan to use the new attributes that were returned by the
source.
@@ -580,6 +581,6 @@ object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"
}
-object MemoryStreamDataSource extends DataSourceV2
-
-object FakeDataSourceV2 extends DataSourceV2
+case class OffsetHolder(start: OffsetV2, end: OffsetV2) extends LeafNode {
+ override def output: Seq[Attribute] = Nil
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d1f3f74..2528351 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark,
LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
-import
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec,
StreamWriterCommitProgress}
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec,
StreamingDataSourceV2Relation, StreamWriterCommitProgress}
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
@@ -247,10 +247,12 @@ trait ProgressReporter extends Logging {
}
val onlyDataSourceV2Sources = {
- // Check whether the streaming query's logical plan has only V2 data
sources
- val allStreamingLeaves =
- logicalPlan.collect { case s: StreamingExecutionRelation => s }
- allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReadSupport]
}
+ // Check whether the streaming query's logical plan has only V2
micro-batch data sources
+ val allStreamingLeaves = logicalPlan.collect {
+ case s: StreamingDataSourceV2Relation =>
s.stream.isInstanceOf[MicroBatchStream]
+ case _: StreamingExecutionRelation => false
+ }
+ allStreamingLeaves.forall(_ == true)
}
if (onlyDataSourceV2Sources) {
@@ -258,9 +260,9 @@ trait ProgressReporter extends Logging {
// (can happen with self-unions or self-joins). This means the source is
scanned multiple
// times in the query, we should count the numRows for each scan.
val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
- case s: DataSourceV2StreamingScanExec if
s.readSupport.isInstanceOf[BaseStreamingSource] =>
+ case s: MicroBatchScanExec =>
val numRows =
s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
- val source = s.readSupport.asInstanceOf[BaseStreamingSource]
+ val source = s.stream.asInstanceOf[BaseStreamingSource]
source -> numRows
}
logDebug("Source -> # input rows\n\t" +
sourceToInputRowsTuples.mkString("\n\t"))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 4b696df..535fa1c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan,
Statistics}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider,
DataSourceV2}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider,
DataSourceV2, Table}
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
@@ -94,6 +94,7 @@ case class StreamingExecutionRelation(
case class StreamingRelationV2(
dataSource: DataSourceV2,
sourceName: String,
+ table: Table,
extraOptions: Map[String, String],
output: Seq[Attribute],
v1Relation: Option[StreamingRelation])(session: SparkSession)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 89033b7..c74fa14 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap, Curre
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
-import
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec,
StreamingDataSourceV2Relation}
+import org.apache.spark.sql.execution.datasources.v2.{ContinuousScanExec,
OldStreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation,
StreamingRelationV2, _}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2
@@ -64,12 +64,12 @@ class ContinuousExecution(
val toExecutionRelationMap = MutableMap[StreamingRelationV2,
ContinuousExecutionRelation]()
analyzedPlan.transform {
case r @ StreamingRelationV2(
- source: ContinuousReadSupportProvider, _, extraReaderOptions,
output, _) =>
+ source: ContinuousReadSupportProvider, _, _, extraReaderOptions,
output, _) =>
// TODO: shall we create `ContinuousReadSupport` here instead of each
reconfiguration?
toExecutionRelationMap.getOrElseUpdate(r, {
ContinuousExecutionRelation(source, extraReaderOptions,
output)(sparkSession)
})
- case StreamingRelationV2(_, sourceName, _, _, _) =>
+ case StreamingRelationV2(_, sourceName, _, _, _, _) =>
throw new UnsupportedOperationException(
s"Data source $sourceName does not support continuous processing.")
}
@@ -177,7 +177,7 @@ class ContinuousExecution(
val realOffset = loggedOffset.map(off =>
readSupport.deserializeOffset(off.json))
val startOffset = realOffset.getOrElse(readSupport.initialOffset)
val scanConfigBuilder = readSupport.newScanConfigBuilder(startOffset)
- StreamingDataSourceV2Relation(newOutput, source, options, readSupport,
scanConfigBuilder)
+ OldStreamingDataSourceV2Relation(newOutput, source, options,
readSupport, scanConfigBuilder)
}
// Rewire the plan to use the new attributes that were returned by the
source.
@@ -211,7 +211,7 @@ class ContinuousExecution(
}
val (readSupport, scanConfig) = lastExecution.executedPlan.collect {
- case scan: DataSourceV2StreamingScanExec
+ case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[ContinuousReadSupport] =>
scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig
}.head
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 13b75ae..5406679 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -33,8 +33,9 @@ import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.{DataSourceOptions,
SupportsMicroBatchRead, Table, TableProvider}
import org.apache.spark.sql.sources.v2.reader._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset
=> OffsetV2}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset => OffsetV2}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -50,7 +51,7 @@ object MemoryStream {
* A base class for memory stream implementations. Supports adding data and
resetting.
*/
abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends
BaseStreamingSource {
- protected val encoder = encoderFor[A]
+ val encoder = encoderFor[A]
protected val attributes = encoder.schema.toAttributes
def toDS(): Dataset[A] = {
@@ -72,16 +73,56 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext:
SQLContext) extends Bas
def addData(data: TraversableOnce[A]): Offset
}
+// This class is used to indicate the memory stream data source. We don't
actually use it, as
+// memory stream is for test only and we never look it up by name.
+object MemoryStreamTableProvider extends TableProvider {
+ override def getTable(options: DataSourceOptions): Table = {
+ throw new IllegalStateException("MemoryStreamTableProvider should not be
used.")
+ }
+}
+
+class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with
SupportsMicroBatchRead {
+
+ override def name(): String = "MemoryStreamDataSource"
+
+ override def schema(): StructType = stream.fullSchema()
+
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ new MemoryStreamScanBuilder(stream)
+ }
+}
+
+class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder
with Scan {
+
+ override def build(): Scan = this
+
+ override def description(): String = "MemoryStreamDataSource"
+
+ override def readSchema(): StructType = stream.fullSchema()
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ stream.asInstanceOf[MemoryStream[_]]
+ }
+}
+
/**
* A [[Source]] that produces value stored in memory as they are added by the
user. This [[Source]]
* is intended for use in unit tests as it can only replay data when the
object is still
* available.
*/
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
- extends MemoryStreamBase[A](sqlContext) with MicroBatchReadSupport with
Logging {
+ extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging
{
+
+ protected val logicalPlan: LogicalPlan = {
+ StreamingRelationV2(
+ MemoryStreamTableProvider,
+ "memory",
+ new MemoryStreamTable(this),
+ Map.empty,
+ attributes,
+ None)(sqlContext.sparkSession)
+ }
- protected val logicalPlan: LogicalPlan =
- StreamingExecutionRelation(this, attributes)(sqlContext.sparkSession)
protected val output = logicalPlan.output
/**
@@ -130,14 +171,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext:
SQLContext)
if (currentOffset.offset == -1) null else currentOffset
}
- override def newScanConfigBuilder(start: OffsetV2, end: OffsetV2):
ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
- }
-
- override def planInputPartitions(config: ScanConfig): Array[InputPartition]
= {
- val sc = config.asInstanceOf[SimpleStreamingScanConfig]
- val startOffset = sc.start.asInstanceOf[LongOffset]
- val endOffset = sc.end.get.asInstanceOf[LongOffset]
+ override def planInputPartitions(start: OffsetV2, end: OffsetV2):
Array[InputPartition] = {
+ val startOffset = start.asInstanceOf[LongOffset]
+ val endOffset = end.asInstanceOf[LongOffset]
synchronized {
// Compute the internal batch numbers to fetch: [startOrdinal,
endOrdinal)
val startOrdinal = startOffset.offset.toInt + 1
@@ -159,7 +195,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext:
SQLContext)
}
}
- override def createReaderFactory(config: ScanConfig): PartitionReaderFactory
= {
+ override def createReaderFactory(): PartitionReaderFactory = {
MemoryStreamReaderFactory
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index dbcc448..8c5c9ef 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -50,7 +50,8 @@ class ContinuousMemoryStream[A : Encoder](id: Int,
sqlContext: SQLContext, numPa
private implicit val formats = Serialization.formats(NoTypeHints)
protected val logicalPlan =
- StreamingRelationV2(this, "memory", Map(), attributes,
None)(sqlContext.sparkSession)
+ // TODO: don't pass null as table after finish API refactor for continuous
stream.
+ StreamingRelationV2(this, "memory", null, Map(), attributes,
None)(sqlContext.sparkSession)
// ContinuousReader implementation
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
similarity index 86%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
index 90680ea..6a66f52 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.execution.streaming.sources
-import
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset}
-// A special `MicroBatchReadSupport` that can get latestOffset with a start
offset.
-trait RateControlMicroBatchReadSupport extends MicroBatchReadSupport {
+// A special `MicroBatchStream` that can get latestOffset with a start offset.
+trait RateControlMicroBatchStream extends MicroBatchStream {
override def latestOffset(): Offset = {
throw new IllegalAccessException(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
similarity index 82%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
index f536404..a8feed3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReadSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
@@ -24,19 +24,23 @@ import java.util.concurrent.TimeUnit
import org.apache.commons.io.IOUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset}
import org.apache.spark.util.{ManualClock, SystemClock}
-class RateStreamMicroBatchReadSupport(options: DataSourceOptions,
checkpointLocation: String)
- extends MicroBatchReadSupport with Logging {
+class RateStreamMicroBatchStream(
+ rowsPerSecond: Long,
+ // The default values here are used in tests.
+ rampUpTimeSeconds: Long = 0,
+ numPartitions: Int = 1,
+ options: DataSourceOptions,
+ checkpointLocation: String)
+ extends MicroBatchStream with Logging {
import RateStreamProvider._
private[sources] val clock = {
@@ -44,14 +48,6 @@ class RateStreamMicroBatchReadSupport(options:
DataSourceOptions, checkpointLoca
if (options.getBoolean("useManualClock", false)) new ManualClock else new
SystemClock
}
- private val rowsPerSecond =
- options.get(ROWS_PER_SECOND).orElse("1").toLong
-
- private val rampUpTimeSeconds =
- Option(options.get(RAMP_UP_TIME).orElse(null.asInstanceOf[String]))
- .map(JavaUtils.timeStringAsSec(_))
- .getOrElse(0L)
-
private val maxSeconds = Long.MaxValue / rowsPerSecond
if (rampUpTimeSeconds > maxSeconds) {
@@ -117,16 +113,10 @@ class RateStreamMicroBatchReadSupport(options:
DataSourceOptions, checkpointLoca
LongOffset(json.toLong)
}
- override def fullSchema(): StructType = SCHEMA
- override def newScanConfigBuilder(start: Offset, end: Offset):
ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
- }
-
- override def planInputPartitions(config: ScanConfig): Array[InputPartition]
= {
- val sc = config.asInstanceOf[SimpleStreamingScanConfig]
- val startSeconds = sc.start.asInstanceOf[LongOffset].offset
- val endSeconds = sc.end.get.asInstanceOf[LongOffset].offset
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ val startSeconds = start.asInstanceOf[LongOffset].offset
+ val endSeconds = end.asInstanceOf[LongOffset].offset
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) >
endSeconds($endSeconds)")
if (endSeconds > maxSeconds) {
throw new ArithmeticException("Integer overflow. Max offset with " +
@@ -148,21 +138,14 @@ class RateStreamMicroBatchReadSupport(options:
DataSourceOptions, checkpointLoca
val localStartTimeMs = creationTimeMs +
TimeUnit.SECONDS.toMillis(startSeconds)
val relativeMsPerValue =
TimeUnit.SECONDS.toMillis(endSeconds - startSeconds).toDouble /
(rangeEnd - rangeStart)
- val numPartitions = {
- val activeSession = SparkSession.getActiveSession
- require(activeSession.isDefined)
- Option(options.get(NUM_PARTITIONS).orElse(null.asInstanceOf[String]))
- .map(_.toInt)
- .getOrElse(activeSession.get.sparkContext.defaultParallelism)
- }
(0 until numPartitions).map { p =>
- new RateStreamMicroBatchInputPartition(
+ RateStreamMicroBatchInputPartition(
p, numPartitions, rangeStart, rangeEnd, localStartTimeMs,
relativeMsPerValue)
}.toArray
}
- override def createReaderFactory(config: ScanConfig): PartitionReaderFactory
= {
+ override def createReaderFactory(): PartitionReaderFactory = {
RateStreamMicroBatchReaderFactory
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 6942dfb..8d334f0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -18,10 +18,12 @@
package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.SparkSession
import
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReadSupport
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport,
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
+import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport,
MicroBatchStream}
import org.apache.spark.sql.types._
/**
@@ -39,38 +41,31 @@ import org.apache.spark.sql.types._
* be resource constrained, and `numPartitions` can be tweaked to help
reach the desired speed.
*/
class RateStreamProvider extends DataSourceV2
- with MicroBatchReadSupportProvider with ContinuousReadSupportProvider with
DataSourceRegister {
+ with TableProvider with ContinuousReadSupportProvider with
DataSourceRegister {
import RateStreamProvider._
- override def createMicroBatchReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): MicroBatchReadSupport = {
- if (options.get(ROWS_PER_SECOND).isPresent) {
- val rowsPerSecond = options.get(ROWS_PER_SECOND).get().toLong
- if (rowsPerSecond <= 0) {
- throw new IllegalArgumentException(
- s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' must be
positive")
- }
+ override def getTable(options: DataSourceOptions): Table = {
+ val rowsPerSecond = options.getLong(ROWS_PER_SECOND, 1)
+ if (rowsPerSecond <= 0) {
+ throw new IllegalArgumentException(
+ s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' must be
positive")
}
- if (options.get(RAMP_UP_TIME).isPresent) {
- val rampUpTimeSeconds =
- JavaUtils.timeStringAsSec(options.get(RAMP_UP_TIME).get())
- if (rampUpTimeSeconds < 0) {
- throw new IllegalArgumentException(
- s"Invalid value '$rampUpTimeSeconds'. The option 'rampUpTime' must
not be negative")
- }
+ val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME).orElse(null))
+ .map(JavaUtils.timeStringAsSec)
+ .getOrElse(0L)
+ if (rampUpTimeSeconds < 0) {
+ throw new IllegalArgumentException(
+ s"Invalid value '$rampUpTimeSeconds'. The option 'rampUpTime' must not
be negative")
}
- if (options.get(NUM_PARTITIONS).isPresent) {
- val numPartitions = options.get(NUM_PARTITIONS).get().toInt
- if (numPartitions <= 0) {
- throw new IllegalArgumentException(
- s"Invalid value '$numPartitions'. The option 'numPartitions' must be
positive")
- }
+ val numPartitions = options.getInt(
+ NUM_PARTITIONS, SparkSession.active.sparkContext.defaultParallelism)
+ if (numPartitions <= 0) {
+ throw new IllegalArgumentException(
+ s"Invalid value '$numPartitions'. The option 'numPartitions' must be
positive")
}
-
- new RateStreamMicroBatchReadSupport(options, checkpointLocation)
+ new RateStreamTable(rowsPerSecond, rampUpTimeSeconds, numPartitions)
}
override def createContinuousReadSupport(
@@ -82,6 +77,31 @@ class RateStreamProvider extends DataSourceV2
override def shortName(): String = "rate"
}
+class RateStreamTable(
+ rowsPerSecond: Long,
+ rampUpTimeSeconds: Long,
+ numPartitions: Int)
+ extends Table with SupportsMicroBatchRead {
+
+ override def name(): String = {
+ s"RateStream(rowsPerSecond=$rowsPerSecond,
rampUpTimeSeconds=$rampUpTimeSeconds, " +
+ s"numPartitions=$numPartitions)"
+ }
+
+ override def schema(): StructType = RateStreamProvider.SCHEMA
+
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new
ScanBuilder {
+ override def build(): Scan = new Scan {
+ override def readSchema(): StructType = RateStreamProvider.SCHEMA
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ new RateStreamMicroBatchStream(
+ rowsPerSecond, rampUpTimeSeconds, numPartitions, options,
checkpointLocation)
+ }
+ }
+ }
+}
+
object RateStreamProvider {
val SCHEMA =
StructType(StructField("timestamp", TimestampType) :: StructField("value",
LongType) :: Nil)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
similarity index 62%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index b2a573e..ddf398b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -19,44 +19,29 @@ package org.apache.spark.sql.execution.streaming.sources
import java.io.{BufferedReader, InputStreamReader, IOException}
import java.net.Socket
-import java.text.SimpleDateFormat
-import java.util.{Calendar, Locale}
+import java.util.Calendar
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ListBuffer
-import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql._
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming.{LongOffset,
SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
-import
org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider,
DataSourceOptions, DataSourceV2, MicroBatchReadSupportProvider}
-import org.apache.spark.sql.sources.v2.reader._
-import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport,
MicroBatchReadSupport, Offset}
-import org.apache.spark.sql.types.{StringType, StructField, StructType,
TimestampType}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition,
PartitionReader, PartitionReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream,
Offset}
import org.apache.spark.unsafe.types.UTF8String
-object TextSocketReader {
- val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
- val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
- StructField("timestamp", TimestampType) :: Nil)
- val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
-}
-
/**
* A MicroBatchReadSupport that reads text lines through a TCP socket,
designed only for tutorials
* and debugging. This MicroBatchReadSupport will *not* work in production
applications due to
* multiple reasons, including no support for fault recovery.
*/
-class TextSocketMicroBatchReadSupport(options: DataSourceOptions)
- extends MicroBatchReadSupport with Logging {
-
- private val host: String = options.get("host").get()
- private val port: Int = options.get("port").get().toInt
+class TextSocketMicroBatchStream(host: String, port: Int, options:
DataSourceOptions)
+ extends MicroBatchStream with Logging {
@GuardedBy("this")
private var socket: Socket = null
@@ -99,7 +84,7 @@ class TextSocketMicroBatchReadSupport(options:
DataSourceOptions)
logWarning(s"Stream closed by $host:$port")
return
}
- TextSocketMicroBatchReadSupport.this.synchronized {
+ TextSocketMicroBatchStream.this.synchronized {
val newData = (
UTF8String.fromString(line),
DateTimeUtils.fromMillis(Calendar.getInstance().getTimeInMillis)
@@ -124,22 +109,9 @@ class TextSocketMicroBatchReadSupport(options:
DataSourceOptions)
LongOffset(json.toLong)
}
- override def fullSchema(): StructType = {
- if (options.getBoolean("includeTimestamp", false)) {
- TextSocketReader.SCHEMA_TIMESTAMP
- } else {
- TextSocketReader.SCHEMA_REGULAR
- }
- }
-
- override def newScanConfigBuilder(start: Offset, end: Offset):
ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
- }
-
- override def planInputPartitions(config: ScanConfig): Array[InputPartition]
= {
- val sc = config.asInstanceOf[SimpleStreamingScanConfig]
- val startOrdinal = sc.start.asInstanceOf[LongOffset].offset.toInt + 1
- val endOrdinal = sc.end.get.asInstanceOf[LongOffset].offset.toInt + 1
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ val startOrdinal = start.asInstanceOf[LongOffset].offset.toInt + 1
+ val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
// Internal buffer only holds the batches after lastOffsetCommitted
val rawList = synchronized {
@@ -164,7 +136,7 @@ class TextSocketMicroBatchReadSupport(options:
DataSourceOptions)
slices.map(TextSocketInputPartition)
}
- override def createReaderFactory(config: ScanConfig): PartitionReaderFactory
= {
+ override def createReaderFactory(): PartitionReaderFactory = {
new PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
val slice = partition.asInstanceOf[TextSocketInputPartition].slice
@@ -220,43 +192,3 @@ class TextSocketMicroBatchReadSupport(options:
DataSourceOptions)
}
case class TextSocketInputPartition(slice: ListBuffer[(UTF8String, Long)])
extends InputPartition
-
-class TextSocketSourceProvider extends DataSourceV2
- with MicroBatchReadSupportProvider with ContinuousReadSupportProvider
- with DataSourceRegister with Logging {
-
- private def checkParameters(params: DataSourceOptions): Unit = {
- logWarning("The socket source should not be used for production
applications! " +
- "It does not support recovery.")
- if (!params.get("host").isPresent) {
- throw new AnalysisException("Set a host to read from with
option(\"host\", ...).")
- }
- if (!params.get("port").isPresent) {
- throw new AnalysisException("Set a port to read from with
option(\"port\", ...).")
- }
- Try {
- params.get("includeTimestamp").orElse("false").toBoolean
- } match {
- case Success(_) =>
- case Failure(_) =>
- throw new AnalysisException("includeTimestamp must be set to either
\"true\" or \"false\"")
- }
- }
-
- override def createMicroBatchReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): MicroBatchReadSupport = {
- checkParameters(options)
- new TextSocketMicroBatchReadSupport(options)
- }
-
- override def createContinuousReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): ContinuousReadSupport = {
- checkParameters(options)
- new TextSocketContinuousReadSupport(options)
- }
-
- /** String that represents the format that this data source provider uses. */
- override def shortName(): String = "socket"
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
new file mode 100644
index 0000000..3500778
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.text.SimpleDateFormat
+import java.util.Locale
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import
org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
+import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport,
MicroBatchStream}
+import org.apache.spark.sql.types.{StringType, StructField, StructType,
TimestampType}
+
+class TextSocketSourceProvider extends DataSourceV2
+ with TableProvider with ContinuousReadSupportProvider
+ with DataSourceRegister with Logging {
+
+ private def checkParameters(params: DataSourceOptions): Unit = {
+ logWarning("The socket source should not be used for production
applications! " +
+ "It does not support recovery.")
+ if (!params.get("host").isPresent) {
+ throw new AnalysisException("Set a host to read from with
option(\"host\", ...).")
+ }
+ if (!params.get("port").isPresent) {
+ throw new AnalysisException("Set a port to read from with
option(\"port\", ...).")
+ }
+ Try {
+ params.get("includeTimestamp").orElse("false").toBoolean
+ } match {
+ case Success(_) =>
+ case Failure(_) =>
+ throw new AnalysisException("includeTimestamp must be set to either
\"true\" or \"false\"")
+ }
+ }
+
+ override def getTable(options: DataSourceOptions): Table = {
+ checkParameters(options)
+ new TextSocketTable(
+ options.get("host").get,
+ options.getInt("port", -1),
+ options.getBoolean("includeTimestamp", false))
+ }
+
+ override def createContinuousReadSupport(
+ checkpointLocation: String,
+ options: DataSourceOptions): ContinuousReadSupport = {
+ checkParameters(options)
+ new TextSocketContinuousReadSupport(options)
+ }
+
+ /** String that represents the format that this data source provider uses. */
+ override def shortName(): String = "socket"
+}
+
+class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
+ extends Table with SupportsMicroBatchRead {
+
+ override def name(): String = s"Socket[$host:$port]"
+
+ override def schema(): StructType = {
+ if (includeTimestamp) {
+ TextSocketReader.SCHEMA_TIMESTAMP
+ } else {
+ TextSocketReader.SCHEMA_REGULAR
+ }
+ }
+
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new
ScanBuilder {
+ override def build(): Scan = new Scan {
+ override def readSchema(): StructType = schema()
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ new TextSocketMicroBatchStream(host, port, options)
+ }
+ }
+ }
+}
+
+object TextSocketReader {
+ val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+ val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+ StructField("timestamp", TimestampType) :: Nil)
+ val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 98589da..417dd55 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -29,8 +29,8 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming.{StreamingRelation,
StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider,
DataSourceOptions, MicroBatchReadSupportProvider}
-import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport,
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -173,60 +173,54 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
case _ => None
}
ds match {
- case s: MicroBatchReadSupportProvider =>
+ case provider: TableProvider =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- ds = s, conf = sparkSession.sessionState.conf)
+ ds = provider, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
- val dataSourceOptions = new DataSourceOptions(options.asJava)
- var tempReadSupport: MicroBatchReadSupport = null
- val schema = try {
- val tmpCheckpointPath = Utils.createTempDir(namePrefix =
s"tempCP").getCanonicalPath
- tempReadSupport = if (userSpecifiedSchema.isDefined) {
- s.createMicroBatchReadSupport(
- userSpecifiedSchema.get, tmpCheckpointPath, dataSourceOptions)
- } else {
- s.createMicroBatchReadSupport(tmpCheckpointPath, dataSourceOptions)
- }
- tempReadSupport.fullSchema()
- } finally {
- // Stop tempReader to avoid side-effect thing
- if (tempReadSupport != null) {
- tempReadSupport.stop()
- tempReadSupport = null
- }
+ val dsOptions = new DataSourceOptions(options.asJava)
+ val table = userSpecifiedSchema match {
+ case Some(schema) => provider.getTable(dsOptions, schema)
+ case _ => provider.getTable(dsOptions)
}
- Dataset.ofRows(
- sparkSession,
- StreamingRelationV2(
- s, source, options,
- schema.toAttributes, v1Relation)(sparkSession))
- case s: ContinuousReadSupportProvider =>
- val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- ds = s, conf = sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
- val dataSourceOptions = new DataSourceOptions(options.asJava)
- var tempReadSupport: ContinuousReadSupport = null
- val schema = try {
- val tmpCheckpointPath = Utils.createTempDir(namePrefix =
s"tempCP").getCanonicalPath
- tempReadSupport = if (userSpecifiedSchema.isDefined) {
- s.createContinuousReadSupport(
- userSpecifiedSchema.get, tmpCheckpointPath, dataSourceOptions)
- } else {
- s.createContinuousReadSupport(tmpCheckpointPath, dataSourceOptions)
- }
- tempReadSupport.fullSchema()
- } finally {
- // Stop tempReader to avoid side-effect thing
- if (tempReadSupport != null) {
- tempReadSupport.stop()
- tempReadSupport = null
- }
+ table match {
+ case s: SupportsMicroBatchRead =>
+ Dataset.ofRows(
+ sparkSession,
+ StreamingRelationV2(
+ provider, source, s, options,
+ table.schema.toAttributes, v1Relation)(sparkSession))
+
+ case _ if ds.isInstanceOf[ContinuousReadSupportProvider] =>
+ val provider = ds.asInstanceOf[ContinuousReadSupportProvider]
+ var tempReadSupport: ContinuousReadSupport = null
+ val schema = try {
+ val tmpCheckpointPath = Utils.createTempDir(namePrefix =
s"tempCP").getCanonicalPath
+ tempReadSupport = if (userSpecifiedSchema.isDefined) {
+ provider.createContinuousReadSupport(
+ userSpecifiedSchema.get, tmpCheckpointPath, dsOptions)
+ } else {
+ provider.createContinuousReadSupport(tmpCheckpointPath,
dsOptions)
+ }
+ tempReadSupport.fullSchema()
+ } finally {
+ // Stop tempReader to avoid side-effect thing
+ if (tempReadSupport != null) {
+ tempReadSupport.stop()
+ tempReadSupport = null
+ }
+ }
+ Dataset.ofRows(
+ sparkSession,
+ // TODO: do not pass null as table after finish the API refactor
for continuous
+ // stream.
+ StreamingRelationV2(
+ provider, source, table = null, options,
+ schema.toAttributes, v1Relation)(sparkSession))
+
+ // fallback to v1
+ case _ => Dataset.ofRows(sparkSession,
StreamingRelation(v1DataSource))
}
- Dataset.ofRows(
- sparkSession,
- StreamingRelationV2(
- s, source, options,
- schema.toAttributes, v1Relation)(sparkSession))
+
case _ =>
// Code path for data source v1.
Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index be3efed..d40a1fd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -25,15 +25,16 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.DataSource
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider,
DataSourceOptions, MicroBatchReadSupportProvider}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider,
DataSourceOptions}
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.util.ManualClock
-class RateSourceSuite extends StreamTest {
+class RateStreamProviderSuite extends StreamTest {
import testImplicits._
@@ -41,7 +42,9 @@ class RateSourceSuite extends StreamTest {
override def addData(query: Option[StreamExecution]):
(BaseStreamingSource, Offset) = {
assert(query.nonEmpty)
val rateSource = query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source:
RateStreamMicroBatchReadSupport, _) => source
+ case r: StreamingDataSourceV2Relation
+ if r.stream.isInstanceOf[RateStreamMicroBatchStream] =>
+ r.stream.asInstanceOf[RateStreamMicroBatchStream]
}.head
rateSource.clock.asInstanceOf[ManualClock].advance(TimeUnit.SECONDS.toMillis(seconds))
@@ -51,28 +54,16 @@ class RateSourceSuite extends StreamTest {
}
}
- test("microbatch in registry") {
- withTempDir { temp =>
- DataSource.lookupDataSource("rate", spark.sqlContext.conf).
- getConstructor().newInstance() match {
- case ds: MicroBatchReadSupportProvider =>
- val readSupport = ds.createMicroBatchReadSupport(
- temp.getCanonicalPath, DataSourceOptions.empty())
- assert(readSupport.isInstanceOf[RateStreamMicroBatchReadSupport])
- case _ =>
- throw new IllegalStateException("Could not find read support for
rate")
- }
- }
+ test("RateStreamProvider in registry") {
+ val ds = DataSource.lookupDataSource("rate",
spark.sqlContext.conf).newInstance()
+ assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source")
}
test("compatible with old path in registry") {
-
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.RateSourceProvider",
- spark.sqlContext.conf).getConstructor().newInstance() match {
- case ds: MicroBatchReadSupportProvider =>
- assert(ds.isInstanceOf[RateStreamProvider])
- case _ =>
- throw new IllegalStateException("Could not find read support for rate")
- }
+ val ds = DataSource.lookupDataSource(
+ "org.apache.spark.sql.execution.streaming.RateSourceProvider",
+ spark.sqlContext.conf).newInstance()
+ assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source")
}
test("microbatch - basic") {
@@ -142,17 +133,17 @@ class RateSourceSuite extends StreamTest {
test("microbatch - infer offsets") {
withTempDir { temp =>
- val readSupport = new RateStreamMicroBatchReadSupport(
- new DataSourceOptions(
- Map("numPartitions" -> "1", "rowsPerSecond" -> "100",
"useManualClock" -> "true").asJava),
- temp.getCanonicalPath)
- readSupport.clock.asInstanceOf[ManualClock].advance(100000)
- val startOffset = readSupport.initialOffset()
+ val stream = new RateStreamMicroBatchStream(
+ rowsPerSecond = 100,
+ options = new DataSourceOptions(Map("useManualClock" ->
"true").asJava),
+ checkpointLocation = temp.getCanonicalPath)
+ stream.clock.asInstanceOf[ManualClock].advance(100000)
+ val startOffset = stream.initialOffset()
startOffset match {
case r: LongOffset => assert(r.offset === 0L)
case _ => throw new IllegalStateException("unexpected offset type")
}
- readSupport.latestOffset() match {
+ stream.latestOffset() match {
case r: LongOffset => assert(r.offset >= 100)
case _ => throw new IllegalStateException("unexpected offset type")
}
@@ -161,16 +152,14 @@ class RateSourceSuite extends StreamTest {
test("microbatch - predetermined batch size") {
withTempDir { temp =>
- val readSupport = new RateStreamMicroBatchReadSupport(
- new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" ->
"20").asJava),
- temp.getCanonicalPath)
- val startOffset = LongOffset(0L)
- val endOffset = LongOffset(1L)
- val config = readSupport.newScanConfigBuilder(startOffset,
endOffset).build()
- val tasks = readSupport.planInputPartitions(config)
- val readerFactory = readSupport.createReaderFactory(config)
- assert(tasks.size == 1)
- val dataReader = readerFactory.createReader(tasks(0))
+ val stream = new RateStreamMicroBatchStream(
+ rowsPerSecond = 20,
+ options = DataSourceOptions.empty(),
+ checkpointLocation = temp.getCanonicalPath)
+ val partitions = stream.planInputPartitions(LongOffset(0L),
LongOffset(1L))
+ val readerFactory = stream.createReaderFactory()
+ assert(partitions.size == 1)
+ val dataReader = readerFactory.createReader(partitions(0))
val data = ArrayBuffer[InternalRow]()
while (dataReader.next()) {
data.append(dataReader.get())
@@ -181,17 +170,16 @@ class RateSourceSuite extends StreamTest {
test("microbatch - data read") {
withTempDir { temp =>
- val readSupport = new RateStreamMicroBatchReadSupport(
- new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" ->
"33").asJava),
- temp.getCanonicalPath)
- val startOffset = LongOffset(0L)
- val endOffset = LongOffset(1L)
- val config = readSupport.newScanConfigBuilder(startOffset,
endOffset).build()
- val tasks = readSupport.planInputPartitions(config)
- val readerFactory = readSupport.createReaderFactory(config)
- assert(tasks.size == 11)
-
- val readData = tasks
+ val stream = new RateStreamMicroBatchStream(
+ rowsPerSecond = 33,
+ numPartitions = 11,
+ options = DataSourceOptions.empty(),
+ checkpointLocation = temp.getCanonicalPath)
+ val partitions = stream.planInputPartitions(LongOffset(0L),
LongOffset(1L))
+ val readerFactory = stream.createReaderFactory()
+ assert(partitions.size == 11)
+
+ val readData = partitions
.map(readerFactory.createReader)
.flatMap { reader =>
val buf = scala.collection.mutable.ListBuffer[InternalRow]()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 7db31f1..cf069d5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -30,10 +30,11 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.datasources.DataSource
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.{DataSourceOptions,
MicroBatchReadSupportProvider}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
@@ -59,7 +60,9 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
"Cannot add data when there is no query for finding the active socket
source")
val sources = query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source:
TextSocketMicroBatchReadSupport, _) => source
+ case r: StreamingDataSourceV2Relation
+ if r.stream.isInstanceOf[TextSocketMicroBatchStream] =>
+ r.stream.asInstanceOf[TextSocketMicroBatchStream]
}
if (sources.isEmpty) {
throw new Exception(
@@ -83,13 +86,10 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
}
test("backward compatibility with old path") {
-
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
- spark.sqlContext.conf).getConstructor().newInstance() match {
- case ds: MicroBatchReadSupportProvider =>
- assert(ds.isInstanceOf[TextSocketSourceProvider])
- case _ =>
- throw new IllegalStateException("Could not find socket source")
- }
+ val ds = DataSource.lookupDataSource(
+ "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+ spark.sqlContext.conf).newInstance()
+ assert(ds.isInstanceOf[TextSocketSourceProvider], "Could not find socket
source")
}
test("basic usage") {
@@ -175,16 +175,13 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
test("params not given") {
val provider = new TextSocketSourceProvider
intercept[AnalysisException] {
- provider.createMicroBatchReadSupport(
- "", new DataSourceOptions(Map.empty[String, String].asJava))
+ provider.getTable(new DataSourceOptions(Map.empty[String,
String].asJava))
}
intercept[AnalysisException] {
- provider.createMicroBatchReadSupport(
- "", new DataSourceOptions(Map("host" -> "localhost").asJava))
+ provider.getTable(new DataSourceOptions(Map("host" ->
"localhost").asJava))
}
intercept[AnalysisException] {
- provider.createMicroBatchReadSupport(
- "", new DataSourceOptions(Map("port" -> "1234").asJava))
+ provider.getTable(new DataSourceOptions(Map("port" -> "1234").asJava))
}
}
@@ -192,8 +189,7 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
val provider = new TextSocketSourceProvider
val params = Map("host" -> "localhost", "port" -> "1234",
"includeTimestamp" -> "fasle")
intercept[AnalysisException] {
- val a = new DataSourceOptions(params.asJava)
- provider.createMicroBatchReadSupport("", a)
+ provider.getTable(new DataSourceOptions(params.asJava))
}
}
@@ -204,8 +200,7 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
StructField("area", StringType) :: Nil)
val params = Map("host" -> "localhost", "port" -> "1234")
val exception = intercept[UnsupportedOperationException] {
- provider.createMicroBatchReadSupport(
- userSpecifiedSchema, "", new DataSourceOptions(params.asJava))
+ provider.getTable(new DataSourceOptions(params.asJava),
userSpecifiedSchema)
}
assert(exception.getMessage.contains(
"socket source does not support user-specified schema"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 55fdcee..72321c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.execution.streaming.state.{StateStore,
StateStoreConf, StateStoreId, StateStoreProvider}
import org.apache.spark.sql.functions._
@@ -96,18 +95,16 @@ class StreamSuite extends StreamTest {
val streamingRelation = spark.readStream.format("rate").load().logicalPlan
collect {
case s: StreamingRelationV2 => s
}
- assert(streamingRelation.nonEmpty, "cannot find
StreamingExecutionRelation")
+ assert(streamingRelation.nonEmpty, "cannot find StreamingRelationV2")
assert(
streamingRelation.head.computeStats.sizeInBytes ==
spark.sessionState.conf.defaultSizeInBytes)
}
test("StreamingExecutionRelation.computeStats") {
- val streamingExecutionRelation = MemoryStream[Int].toDF.logicalPlan
collect {
- case s: StreamingExecutionRelation => s
- }
- assert(streamingExecutionRelation.nonEmpty, "cannot find
StreamingExecutionRelation")
- assert(streamingExecutionRelation.head.computeStats.sizeInBytes
- == spark.sessionState.conf.defaultSizeInBytes)
+ val memoryStream = MemoryStream[Int]
+ val executionRelation = StreamingExecutionRelation(
+ memoryStream,
memoryStream.encoder.schema.toAttributes)(memoryStream.sqlContext.sparkSession)
+ assert(executionRelation.computeStats.sizeInBytes ==
spark.sessionState.conf.defaultSizeInBytes)
}
test("explain join with a normal source") {
@@ -495,9 +492,9 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
- assert("Streaming RelationV2 MemoryStreamDataSource".r
+ assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
- assert("ScanV2 MemoryStreamDataSource".r
+ assert("ScanV2".r
.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming
physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))
@@ -505,9 +502,9 @@ class StreamSuite extends StreamTest {
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans
(Parsed/Optimized/Optimized) and 1 physical
// plan.
- assert("Streaming RelationV2 MemoryStreamDataSource".r
+ assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithExtended).size === 3)
- assert("ScanV2 MemoryStreamDataSource".r
+ assert("ScanV2".r
.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming
physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index d878c34..b4bd6f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -39,12 +39,12 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor,
ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.AllTuples
import org.apache.spark.sql.catalyst.util._
-import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import
org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation,
StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
EpochCoordinatorRef, IncrementAndGetEpoch}
import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -688,8 +688,20 @@ trait StreamTest extends QueryTest with SharedSQLContext
with TimeLimits with Be
def findSourceIndex(plan: LogicalPlan): Option[Int] = {
plan
.collect {
+ // v1 source
case r: StreamingExecutionRelation => r.source
- case r: StreamingDataSourceV2Relation => r.readSupport
+ // v2 source
+ case r: StreamingDataSourceV2Relation => r.stream
+ case r: OldStreamingDataSourceV2Relation => r.readSupport
+ // We can add data to memory stream before starting it. Then
the input plan has
+ // not been processed by the streaming engine and contains
`StreamingRelationV2`.
+ case r: StreamingRelationV2 if r.sourceName == "memory" =>
+ // TODO: remove this null hack after finish API refactor
for continuous stream.
+ if (r.table == null) {
+ r.dataSource.asInstanceOf[ContinuousReadSupport]
+ } else {
+ r.table.asInstanceOf[MemoryStreamTable].stream
+ }
}
.zipWithIndex
.find(_._1 == source)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 46eec73..13b8866 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -24,15 +24,14 @@ import scala.util.Random
import scala.util.control.NonFatal
import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Dataset}
+import org.apache.spark.sql.Dataset
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils
@@ -304,8 +303,8 @@ class StreamingQueryManagerSuite extends StreamTest with
BeforeAndAfter {
if (withError) {
logDebug(s"Terminating query ${queryToStop.name} with error")
queryToStop.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect
{
- case StreamingExecutionRelation(source, _) =>
- source.asInstanceOf[MemoryStream[Int]].addData(0)
+ case r: StreamingDataSourceV2Relation =>
+ r.stream.asInstanceOf[MemoryStream[Int]].addData(0)
}
} else {
logDebug(s"Stopping query ${queryToStop.name}")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 29b8164..62fde98 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -220,10 +220,10 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
}
// getBatch should take 100 ms the first time it is called
- override def planInputPartitions(config: ScanConfig):
Array[InputPartition] = {
+ override def planInputPartitions(start: OffsetV2, end: OffsetV2):
Array[InputPartition] = {
synchronized {
clock.waitTillTime(1150)
- super.planInputPartitions(config)
+ super.planInputPartitions(start, end)
}
}
}
@@ -906,7 +906,7 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))
testStream(df)(
-
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
+
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingDataSourceV2Relation"))
)
testStream(df, useV2Sink = true)(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 756092f..f85cae9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming.continuous
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
-import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
+import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
@@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure
query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
- case DataSourceV2StreamingScanExec(_, _, _, _, r:
RateStreamContinuousReadSupport, _) => r
+ case ContinuousScanExec(_, _, _, _, r:
RateStreamContinuousReadSupport, _) => r
}.get
val deltaMs = numTriggers * 1000 + 300
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 31fce46..d98cc41 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -24,26 +24,35 @@ import
org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.{InputPartition,
PartitionReaderFactory, ScanConfig, ScanConfigBuilder}
+import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest,
Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
-case class FakeReadSupport() extends MicroBatchReadSupport with
ContinuousReadSupport {
+class FakeDataStream extends MicroBatchStream {
override def deserializeOffset(json: String): Offset =
RateStreamOffset(Map())
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}
- override def mergeOffsets(offsets: Array[PartitionOffset]): Offset =
RateStreamOffset(Map())
- override def fullSchema(): StructType = StructType(Seq())
- override def newScanConfigBuilder(start: Offset, end: Offset):
ScanConfigBuilder = null
override def initialOffset(): Offset = RateStreamOffset(Map())
override def latestOffset(): Offset = RateStreamOffset(Map())
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
- override def createReaderFactory(config: ScanConfig): PartitionReaderFactory
= {
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ throw new IllegalStateException("fake source - cannot actually read")
+ }
+ override def createReaderFactory(): PartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
}
+}
+
+case class FakeReadSupport() extends ContinuousReadSupport {
+ override def deserializeOffset(json: String): Offset =
RateStreamOffset(Map())
+ override def commit(end: Offset): Unit = {}
+ override def stop(): Unit = {}
+ override def mergeOffsets(offsets: Array[PartitionOffset]): Offset =
RateStreamOffset(Map())
+ override def fullSchema(): StructType = StructType(Seq())
+ override def initialOffset(): Offset = RateStreamOffset(Map())
+ override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
override def createContinuousReaderFactory(
config: ScanConfig): ContinuousPartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
@@ -53,13 +62,16 @@ case class FakeReadSupport() extends MicroBatchReadSupport
with ContinuousReadSu
}
}
-trait FakeMicroBatchReadSupportProvider extends MicroBatchReadSupportProvider {
- override def createMicroBatchReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): MicroBatchReadSupport = {
- LastReadOptions.options = options
- FakeReadSupport()
- }
+class FakeScanBuilder extends ScanBuilder with Scan {
+ override def build(): Scan = this
+ override def readSchema(): StructType = StructType(Seq())
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = new FakeDataStream
+}
+
+class FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Seq())
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new
FakeScanBuilder
}
trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
@@ -84,25 +96,38 @@ trait FakeStreamingWriteSupportProvider extends
StreamingWriteSupportProvider {
class FakeReadMicroBatchOnly
extends DataSourceRegister
- with FakeMicroBatchReadSupportProvider
+ with TableProvider
with SessionConfigSupport {
override def shortName(): String = "fake-read-microbatch-only"
override def keyPrefix: String = shortName()
+
+ override def getTable(options: DataSourceOptions): Table = {
+ LastReadOptions.options = options
+ new FakeMicroBatchReadTable {}
+ }
}
class FakeReadContinuousOnly
extends DataSourceRegister
+ with TableProvider
with FakeContinuousReadSupportProvider
with SessionConfigSupport {
override def shortName(): String = "fake-read-continuous-only"
override def keyPrefix: String = shortName()
+
+ override def getTable(options: DataSourceOptions): Table = new Table {
+ override def schema(): StructType = StructType(Seq())
+ override def name(): String = "fake"
+ }
}
class FakeReadBothModes extends DataSourceRegister
- with FakeMicroBatchReadSupportProvider with
FakeContinuousReadSupportProvider {
+ with TableProvider with FakeContinuousReadSupportProvider {
override def shortName(): String = "fake-read-microbatch-continuous"
+
+ override def getTable(options: DataSourceOptions): Table = new
FakeMicroBatchReadTable {}
}
class FakeReadNeitherMode extends DataSourceRegister {
@@ -303,10 +328,18 @@ class StreamingDataSourceV2Suite extends StreamTest {
getConstructor().newInstance()
val writeSource = DataSource.lookupDataSource(write,
spark.sqlContext.conf).
getConstructor().newInstance()
+
+ def isMicroBatch(ds: Any): Boolean = ds match {
+ case provider: TableProvider =>
+ val table = provider.getTable(DataSourceOptions.empty())
+ table.isInstanceOf[SupportsMicroBatchRead]
+ case _ => false
+ }
+
(readSource, writeSource, trigger) match {
// Valid microbatch queries.
- case (_: MicroBatchReadSupportProvider, _:
StreamingWriteSupportProvider, t)
- if !t.isInstanceOf[ContinuousTrigger] =>
+ case (_: TableProvider, _: StreamingWriteSupportProvider, t)
+ if isMicroBatch(readSource) && !t.isInstanceOf[ContinuousTrigger] =>
testPositiveCase(read, write, trigger)
// Valid continuous queries.
@@ -316,7 +349,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
// Invalid - can't read at all
case (r, _, _)
- if !r.isInstanceOf[MicroBatchReadSupportProvider]
+ if !r.isInstanceOf[TableProvider]
&& !r.isInstanceOf[ContinuousReadSupportProvider] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support streamed reading")
@@ -334,7 +367,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
// Invalid - trigger is microbatch but reader is not
case (r, _, t)
- if !r.isInstanceOf[MicroBatchReadSupportProvider] &&
+ if !isMicroBatch(r) &&
!t.isInstanceOf[ContinuousTrigger] =>
testPostCreationNegativeCase(read, write, trigger,
s"Data source $read does not support microbatch processing")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]