This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f85ed9a [SPARK-26785][SQL] data source v2 API refactor: streaming write f85ed9a is described below commit f85ed9a3e55083b0de0e20a37775efa92d248a4f Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Feb 18 16:17:24 2019 -0800 [SPARK-26785][SQL] data source v2 API refactor: streaming write ## What changes were proposed in this pull request? Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing). The major changes: 1. rename `StreamingWriteSupport` to `StreamingWrite` 2. add `WriteBuilder.buildForStreaming` 3. update existing sinks, to move the creation of `StreamingWrite` to `Table` ## How was this patch tested? existing tests Closes #23702 from cloud-fan/stream-write. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../spark/sql/kafka010/KafkaSourceProvider.scala | 42 ++++++----- ...riteSupport.scala => KafkaStreamingWrite.scala} | 8 +- .../spark/sql/sources/v2/SessionConfigSupport.java | 4 +- .../sources/v2/StreamingWriteSupportProvider.java | 54 ------------- .../spark/sql/sources/v2/SupportsBatchWrite.java | 2 +- ...BatchWrite.java => SupportsStreamingWrite.java} | 9 ++- .../apache/spark/sql/sources/v2/TableProvider.java | 3 +- .../spark/sql/sources/v2/writer/WriteBuilder.java | 9 ++- .../sql/sources/v2/writer/WriterCommitMessage.java | 4 +- .../streaming/StreamingDataWriterFactory.java | 2 +- ...eamingWriteSupport.java => StreamingWrite.java} | 21 +++++- .../streaming/SupportsOutputMode.java} | 17 +++-- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../datasources/noop/NoopDataSource.scala | 26 +++---- .../datasources/v2/DataSourceV2StringFormat.scala | 88 ---------------------- .../datasources/v2/DataSourceV2Utils.scala | 43 +++++------ .../execution/streaming/MicroBatchExecution.scala | 20 +++-- .../execution/streaming/StreamingRelation.scala | 6 +- .../spark/sql/execution/streaming/console.scala | 43 ++++++++--- .../streaming/continuous/ContinuousExecution.scala | 25 +++--- .../streaming/continuous/EpochCoordinator.scala | 6 +- .../continuous/WriteToContinuousDataSource.scala | 6 +- .../WriteToContinuousDataSourceExec.scala | 13 ++-- ...onsoleWriteSupport.scala => ConsoleWrite.scala} | 6 +- ...portProvider.scala => ForeachWriterTable.scala} | 76 +++++++++++-------- .../streaming/sources/MicroBatchWrite.scala | 4 +- .../streaming/sources/RateStreamProvider.scala | 3 +- .../sources/TextSocketSourceProvider.scala | 3 +- .../sql/execution/streaming/sources/memoryV2.scala | 42 +++++++---- .../spark/sql/streaming/DataStreamReader.scala | 2 +- .../spark/sql/streaming/DataStreamWriter.scala | 50 ++++++------ .../sql/streaming/StreamingQueryManager.scala | 4 +- ...org.apache.spark.sql.sources.DataSourceRegister | 2 +- .../execution/streaming/MemorySinkV2Suite.scala | 6 +- .../sql/sources/v2/DataSourceV2UtilsSuite.scala | 4 +- .../sql/sources/v2/SimpleWritableDataSource.scala | 3 +- .../ContinuousQueuedDataReaderSuite.scala | 4 +- .../continuous/EpochCoordinatorSuite.scala | 6 +- .../sources/StreamingDataSourceV2Suite.scala | 70 ++++++++++------- 39 files changed, 345 insertions(+), 393 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 9238899b..6994517 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -47,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSinkProvider with RelationProvider with CreatableRelationProvider - with StreamingWriteSupportProvider with TableProvider with Logging { import KafkaSourceProvider._ @@ -180,20 +180,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } } - override def createStreamingWriteSupport( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { - import scala.collection.JavaConverters._ - - val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) - // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. - val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) - - new KafkaStreamingWriteSupport(topic, producerParams, schema) - } - private def strategy(caseInsensitiveParams: Map[String, String]) = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { case ("assign", value) => @@ -365,7 +351,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } class KafkaTable(strategy: => ConsumerStrategy) extends Table - with SupportsMicroBatchRead with SupportsContinuousRead { + with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite { override def name(): String = s"Kafka $strategy" @@ -374,6 +360,28 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder { override def build(): Scan = new KafkaScan(options) } + + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + new WriteBuilder with SupportsOutputMode { + private var inputSchema: StructType = _ + + override def withInputDataSchema(schema: StructType): WriteBuilder = { + this.inputSchema = schema + this + } + + override def outputMode(mode: OutputMode): WriteBuilder = this + + override def buildForStreaming(): StreamingWrite = { + import scala.collection.JavaConverters._ + + assert(inputSchema != null) + val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) + val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) + new KafkaStreamingWrite(topic, producerParams, inputSchema) + } + } + } } class KafkaScan(options: DataSourceOptions) extends Scan { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala similarity index 95% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index 0d831c3..e3101e1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType case object KafkaWriterCommitMessage extends WriterCommitMessage /** - * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory. + * A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory. * * @param topic The topic this writer is responsible for. If None, topic will be inferred from * a `topic` field in the incoming data. * @param producerParams Parameters for Kafka producers in each task. * @param schema The schema of the input data. */ -class KafkaStreamingWriteSupport( +class KafkaStreamingWrite( topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) - extends StreamingWriteSupport { + extends StreamingWrite { validateQuery(schema.toAttributes, producerParams, topic) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java index c00abd9..d27fbfd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java @@ -20,12 +20,12 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.Evolving; /** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * A mix-in interface for {@link TableProvider}. Data sources can implement this interface to * propagate session configs with the specified key-prefix to all data source operations in this * session. */ @Evolving -public interface SessionConfigSupport extends DataSourceV2 { +public interface SessionConfigSupport extends TableProvider { /** * Key prefix of the session configs to propagate, which is usually the data source name. Spark diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java deleted file mode 100644 index 8ac9c51..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.BaseStreamingSink; -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; -import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for structured streaming. - * - * This interface is used to create {@link StreamingWriteSupport} instances when end users run - * {@code Dataset.writeStream.format(...).option(...).start()}. - */ -@Evolving -public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink { - - /** - * Creates a {@link StreamingWriteSupport} instance to save the data to this data source, which is - * called by Spark at the beginning of each streaming query. - * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, and the returned - * {@link StreamingWriteSupport} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the output mode which determines what successive epoch output means to this - * sink, please refer to {@link OutputMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - * case-insensitive string-to-string map. - */ - StreamingWriteSupport createStreamingWriteSupport( - String queryId, - StructType schema, - OutputMode mode, - DataSourceOptions options); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java index 08caadd..b2cd97a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder; * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. * <p> * If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} + * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} * with {@link WriteBuilder#buildForBatch()} implemented. * </p> */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java similarity index 76% copy from sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java copy to sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java index 08caadd..1050d35 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java @@ -18,15 +18,16 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; /** - * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. + * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write. * <p> * If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} - * with {@link WriteBuilder#buildForBatch()} implemented. + * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} + * with {@link WriteBuilder#buildForStreaming()} implemented. * </p> */ @Evolving -public interface SupportsBatchWrite extends SupportsWrite {} +public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java index 855d5ef..a9b83b6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java @@ -29,8 +29,7 @@ import org.apache.spark.sql.types.StructType; * </p> */ @Evolving -// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. -public interface TableProvider extends DataSourceV2 { +public interface TableProvider { /** * Return a {@link Table} instance to do read/write with user-specified options. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java index e861c72..07529fe 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java @@ -20,6 +20,7 @@ package org.apache.spark.sql.sources.v2.writer; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.SupportsBatchWrite; import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite; import org.apache.spark.sql.types.StructType; /** @@ -64,6 +65,12 @@ public interface WriteBuilder { * {@link SupportsSaveMode}. */ default BatchWrite buildForBatch() { - throw new UnsupportedOperationException("Batch scans are not supported"); + throw new UnsupportedOperationException(getClass().getName() + + " does not support batch write"); + } + + default StreamingWrite buildForStreaming() { + throw new UnsupportedOperationException(getClass().getName() + + " does not support streaming write"); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java index 6334c8f..23e8580 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -20,12 +20,12 @@ package org.apache.spark.sql.sources.v2.writer; import java.io.Serializable; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite; /** * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side * as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or - * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}. + * {@link StreamingWrite#commit(long, WriterCommitMessage[])}. * * This is an empty interface, data sources should define their own message class and use it when * generating messages at executor side and handling the messages at driver side. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java index 7d3d21c..af2f03c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java @@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter; /** * A factory of {@link DataWriter} returned by - * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating + * {@link StreamingWrite#createStreamingWriterFactory()}, which is responsible for creating * and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java similarity index 73% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java index 84cfbf2..5617f1c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java @@ -22,13 +22,26 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; /** - * An interface that defines how to write the data to data source for streaming processing. + * An interface that defines how to write the data to data source in streaming queries. * - * Streaming queries are divided into intervals of data called epochs, with a monotonically - * increasing numeric ID. This writer handles commits and aborts for each successive epoch. + * The writing procedure is: + * 1. Create a writer factory by {@link #createStreamingWriterFactory()}, serialize and send it to + * all the partitions of the input data(RDD). + * 2. For each epoch in each partition, create the data writer, and write the data of the epoch in + * the partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. + * 3. If writers in all partitions of one epoch are successfully committed, call + * {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed + * with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}. + * + * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should + * do it manually in their Spark applications if they want to retry. + * + * Please refer to the documentation of commit/abort methods for detailed specifications. */ @Evolving -public interface StreamingWriteSupport { +public interface StreamingWrite { /** * Creates a writer factory which will be serialized and sent to executors. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java similarity index 67% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java index 43bdcca..832dcfa 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java @@ -15,12 +15,15 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2; +package org.apache.spark.sql.sources.v2.writer.streaming; -import org.apache.spark.annotation.Evolving; +import org.apache.spark.annotation.Unstable; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.streaming.OutputMode; -/** - * TODO: remove it when we finish the API refactor for streaming write side. - */ -@Evolving -public interface DataSourceV2 {} +// TODO: remove it when we have `SupportsTruncate` +@Unstable +public interface SupportsOutputMode extends WriteBuilder { + + WriteBuilder outputMode(OutputMode mode); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 713c9a9..e757785 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -205,7 +205,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { if (classOf[TableProvider].isAssignableFrom(cls)) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - ds = provider, conf = sparkSession.sessionState.conf) + source = provider, conf = sparkSession.sessionState.conf) val pathsOption = { val objectMapper = new ObjectMapper() DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 452ebbb..8f2072c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -30,30 +30,23 @@ import org.apache.spark.sql.types.StructType * This is no-op datasource. It does not do anything besides consuming its input. * This can be useful for benchmarking or to cache data without any additional overhead. */ -class NoopDataSource - extends DataSourceV2 - with TableProvider - with DataSourceRegister - with StreamingWriteSupportProvider { - +class NoopDataSource extends TableProvider with DataSourceRegister { override def shortName(): String = "noop" override def getTable(options: DataSourceOptions): Table = NoopTable - override def createStreamingWriteSupport( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = NoopStreamingWriteSupport } -private[noop] object NoopTable extends Table with SupportsBatchWrite { +private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite { override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder override def name(): String = "noop-table" override def schema(): StructType = new StructType() } -private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsSaveMode { - override def buildForBatch(): BatchWrite = NoopBatchWrite +private[noop] object NoopWriteBuilder extends WriteBuilder + with SupportsSaveMode with SupportsOutputMode { override def mode(mode: SaveMode): WriteBuilder = this + override def outputMode(mode: OutputMode): WriteBuilder = this + override def buildForBatch(): BatchWrite = NoopBatchWrite + override def buildForStreaming(): StreamingWrite = NoopStreamingWrite } private[noop] object NoopBatchWrite extends BatchWrite { @@ -72,7 +65,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] { override def abort(): Unit = {} } -private[noop] object NoopStreamingWriteSupport extends StreamingWriteSupport { +private[noop] object NoopStreamingWrite extends StreamingWrite { override def createStreamingWriterFactory(): StreamingDataWriterFactory = NoopStreamingDataWriterFactory override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} @@ -85,4 +78,3 @@ private[noop] object NoopStreamingDataWriterFactory extends StreamingDataWriterF taskId: Long, epochId: Long): DataWriter[InternalRow] = NoopWriter } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala deleted file mode 100644 index f11703c..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.v2 - -import org.apache.commons.lang3.StringUtils - -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.v2.DataSourceV2 -import org.apache.spark.util.Utils - -/** - * A trait that can be used by data source v2 related query plans(both logical and physical), to - * provide a string format of the data source information for explain. - */ -trait DataSourceV2StringFormat { - - /** - * The instance of this data source implementation. Note that we only consider its class in - * equals/hashCode, not the instance itself. - */ - def source: DataSourceV2 - - /** - * The output of the data source reader, w.r.t. column pruning. - */ - def output: Seq[Attribute] - - /** - * The options for this data source reader. - */ - def options: Map[String, String] - - /** - * The filters which have been pushed to the data source. - */ - def pushedFilters: Seq[Expression] - - private def sourceName: String = source match { - case registered: DataSourceRegister => registered.shortName() - // source.getClass.getSimpleName can cause Malformed class name error, - // call safer `Utils.getSimpleName` instead - case _ => Utils.getSimpleName(source.getClass) - } - - def metadataString(maxFields: Int): String = { - val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] - - if (pushedFilters.nonEmpty) { - entries += "Filters" -> pushedFilters.mkString("[", ", ", "]") - } - - // TODO: we should only display some standard options like path, table, etc. - if (options.nonEmpty) { - entries += "Options" -> Utils.redact(options).map { - case (k, v) => s"$k=$v" - }.mkString("[", ",", "]") - } - - val outputStr = truncatedString(output, "[", ", ", "]", maxFields) - - val entriesStr = if (entries.nonEmpty) { - truncatedString(entries.map { - case (key, value) => key + ": " + StringUtils.abbreviate(value, 100) - }, " (", ", ", ")", maxFields) - } else { - "" - } - - s"$sourceName$outputStr$entriesStr" - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index e9cc399..30897d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -21,8 +21,7 @@ import java.util.regex.Pattern import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport} +import org.apache.spark.sql.sources.v2.{SessionConfigSupport, TableProvider} private[sql] object DataSourceV2Utils extends Logging { @@ -34,34 +33,28 @@ private[sql] object DataSourceV2Utils extends Logging { * `spark.datasource.$keyPrefix`. A session config `spark.datasource.$keyPrefix.xxx -> yyy` will * be transformed into `xxx -> yyy`. * - * @param ds a [[DataSourceV2]] object + * @param source a [[TableProvider]] object * @param conf the session conf * @return an immutable map that contains all the extracted and transformed k/v pairs. */ - def extractSessionConfigs(ds: DataSourceV2, conf: SQLConf): Map[String, String] = ds match { - case cs: SessionConfigSupport => - val keyPrefix = cs.keyPrefix() - require(keyPrefix != null, "The data source config key prefix can't be null.") - - val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)") - - conf.getAllConfs.flatMap { case (key, value) => - val m = pattern.matcher(key) - if (m.matches() && m.groupCount() > 0) { - Seq((m.group(1), value)) - } else { - Seq.empty + def extractSessionConfigs(source: TableProvider, conf: SQLConf): Map[String, String] = { + source match { + case cs: SessionConfigSupport => + val keyPrefix = cs.keyPrefix() + require(keyPrefix != null, "The data source config key prefix can't be null.") + + val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)") + + conf.getAllConfs.flatMap { case (key, value) => + val m = pattern.matcher(key) + if (m.matches() && m.groupCount() > 0) { + Seq((m.group(1), value)) + } else { + Seq.empty + } } - } - - case _ => Map.empty - } - def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = { - val name = ds match { - case register: DataSourceRegister => register.shortName() - case _ => ds.getClass.getName + case _ => Map.empty } - throw new UnsupportedOperationException(name + " source does not support user-specified schema") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 2c33975..cca2790 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -513,13 +514,16 @@ class MicroBatchExecution( val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan - case s: StreamingWriteSupportProvider => - val writer = s.createStreamingWriteSupport( - s"$runId", - newAttributePlan.schema, - outputMode, - new DataSourceOptions(extraOptions.asJava)) - WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) + case s: SupportsStreamingWrite => + // TODO: we should translate OutputMode to concrete write actions like truncate, but + // the truncate action is being developed in SPARK-26666. + val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava)) + .withQueryId(runId.toString) + .withInputDataSchema(newAttributePlan.schema) + val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode] + .outputMode(outputMode) + .buildForStreaming() + WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } @@ -549,7 +553,7 @@ class MicroBatchExecution( SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { sink match { case s: Sink => s.addBatch(currentBatchId, nextBatch) - case _: StreamingWriteSupportProvider => + case _: SupportsStreamingWrite => // This doesn't accumulate any data - it just forces execution of the microbatch writer. nextBatch.collect() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 83d38dc..1b7aa54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.sources.v2.{DataSourceV2, Table} +import org.apache.spark.sql.sources.v2.{Table, TableProvider} object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { @@ -86,13 +86,13 @@ case class StreamingExecutionRelation( // know at read time whether the query is continuous or not, so we need to be able to // swap a V1 relation back in. /** - * Used to link a [[DataSourceV2]] into a streaming + * Used to link a [[TableProvider]] into a streaming * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating * a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]], * and should be converted before passing to [[StreamExecution]]. */ case class StreamingRelationV2( - dataSource: DataSourceV2, + source: TableProvider, sourceName: String, table: Table, extraOptions: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 9c5c16f..348bc76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming.sources.ConsoleWriteSupport +import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} -import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider} -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -30,17 +31,12 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) override def schema: StructType = data.schema } -class ConsoleSinkProvider extends DataSourceV2 - with StreamingWriteSupportProvider +class ConsoleSinkProvider extends TableProvider with DataSourceRegister with CreatableRelationProvider { - override def createStreamingWriteSupport( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { - new ConsoleWriteSupport(schema, options) + override def getTable(options: DataSourceOptions): Table = { + ConsoleTable } def createRelation( @@ -60,3 +56,28 @@ class ConsoleSinkProvider extends DataSourceV2 def shortName(): String = "console" } + +object ConsoleTable extends Table with SupportsStreamingWrite { + + override def name(): String = "console" + + override def schema(): StructType = StructType(Nil) + + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + new WriteBuilder with SupportsOutputMode { + private var inputSchema: StructType = _ + + override def withInputDataSchema(schema: StructType): WriteBuilder = { + this.inputSchema = schema + this + } + + override def outputMode(mode: OutputMode): WriteBuilder = this + + override def buildForStreaming(): StreamingWrite = { + assert(inputSchema != null) + new ConsoleWrite(inputSchema, options) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index b22795d..20101c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 -import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -42,7 +43,7 @@ class ContinuousExecution( name: String, checkpointRoot: String, analyzedPlan: LogicalPlan, - sink: StreamingWriteSupportProvider, + sink: SupportsStreamingWrite, trigger: Trigger, triggerClock: Clock, outputMode: OutputMode, @@ -174,12 +175,15 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - val writer = sink.createStreamingWriteSupport( - s"$runId", - withNewSources.schema, - outputMode, - new DataSourceOptions(extraOptions.asJava)) - val planWithSink = WriteToContinuousDataSource(writer, withNewSources) + // TODO: we should translate OutputMode to concrete write actions like truncate, but + // the truncate action is being developed in SPARK-26666. + val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava)) + .withQueryId(runId.toString) + .withInputDataSchema(withNewSources.schema) + val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode] + .outputMode(outputMode) + .buildForStreaming() + val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources) reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( @@ -214,9 +218,8 @@ class ContinuousExecution( trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString) // Use the parent Spark session for the endpoint since it's where this query ID is registered. - val epochEndpoint = - EpochCoordinatorRef.create( - writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) + val epochEndpoint = EpochCoordinatorRef.create( + streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) val epochUpdateThread = new Thread(new Runnable { override def run: Unit = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index d1bda79..a998422 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeR import org.apache.spark.sql.SparkSession import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.util.RpcUtils private[continuous] sealed trait EpochCoordinatorMessage extends Serializable @@ -82,7 +82,7 @@ private[sql] object EpochCoordinatorRef extends Logging { * Create a reference to a new [[EpochCoordinator]]. */ def create( - writeSupport: StreamingWriteSupport, + writeSupport: StreamingWrite, stream: ContinuousStream, query: ContinuousExecution, epochCoordinatorId: String, @@ -115,7 +115,7 @@ private[sql] object EpochCoordinatorRef extends Logging { * have both committed and reported an end offset for a given epoch. */ private[continuous] class EpochCoordinator( - writeSupport: StreamingWriteSupport, + writeSupport: StreamingWrite, stream: ContinuousStream, query: ContinuousExecution, startEpoch: Long, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala index 7ad21cc..54f484c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite /** * The logical plan for writing data in a continuous stream. */ -case class WriteToContinuousDataSource( - writeSupport: StreamingWriteSupport, query: LogicalPlan) extends LogicalPlan { +case class WriteToContinuousDataSource(write: StreamingWrite, query: LogicalPlan) + extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index 2178466..2f3af6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -26,21 +26,22 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite /** - * The physical plan for writing data into a continuous processing [[StreamingWriteSupport]]. + * The physical plan for writing data into a continuous processing [[StreamingWrite]]. */ -case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, query: SparkPlan) - extends UnaryExecNode with Logging { +case class WriteToContinuousDataSourceExec(write: StreamingWrite, query: SparkPlan) + extends UnaryExecNode with Logging { + override def child: SparkPlan = query override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = writeSupport.createStreamingWriterFactory() + val writerFactory = write.createStreamingWriterFactory() val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) - logInfo(s"Start processing data source write support: $writeSupport. " + + logInfo(s"Start processing data source write support: $write. " + s"The input RDD has ${rdd.partitions.length} partitions.") EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala similarity index 94% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala index 833e62f..f2ff30b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala @@ -22,12 +22,12 @@ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** Common methods used to create writes for the the console sink */ -class ConsoleWriteSupport(schema: StructType, options: DataSourceOptions) - extends StreamingWriteSupport with Logging { +class ConsoleWrite(schema: StructType, options: DataSourceOptions) + extends StreamingWrite with Logging { // Number of rows to display, by default 20 rows protected val numRowsToShow = options.getInt("numRows", 20) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala similarity index 66% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 4218fd5..6fbb59c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -22,63 +22,73 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter -import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider} -import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** - * A [[org.apache.spark.sql.sources.v2.DataSourceV2]] for forwarding data into the specified - * [[ForeachWriter]]. + * A write-only table for forwarding data into the specified [[ForeachWriter]]. * * @param writer The [[ForeachWriter]] to process all data. * @param converter An object to convert internal rows to target type T. Either it can be * a [[ExpressionEncoder]] or a direct converter function. * @tparam T The expected type of the sink. */ -case class ForeachWriteSupportProvider[T]( +case class ForeachWriterTable[T]( writer: ForeachWriter[T], converter: Either[ExpressionEncoder[T], InternalRow => T]) - extends StreamingWriteSupportProvider { - - override def createStreamingWriteSupport( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { - new StreamingWriteSupport { - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - - override def createStreamingWriterFactory(): StreamingDataWriterFactory = { - val rowConverter: InternalRow => T = converter match { - case Left(enc) => - val boundEnc = enc.resolveAndBind( - schema.toAttributes, - SparkSession.getActiveSession.get.sessionState.analyzer) - boundEnc.fromRow - case Right(func) => - func - } - ForeachWriterFactory(writer, rowConverter) + extends Table with SupportsStreamingWrite { + + override def name(): String = "ForeachSink" + + override def schema(): StructType = StructType(Nil) + + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + new WriteBuilder with SupportsOutputMode { + private var inputSchema: StructType = _ + + override def withInputDataSchema(schema: StructType): WriteBuilder = { + this.inputSchema = schema + this } - override def toString: String = "ForeachSink" + override def outputMode(mode: OutputMode): WriteBuilder = this + + override def buildForStreaming(): StreamingWrite = { + new StreamingWrite { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createStreamingWriterFactory(): StreamingDataWriterFactory = { + val rowConverter: InternalRow => T = converter match { + case Left(enc) => + val boundEnc = enc.resolveAndBind( + inputSchema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) + boundEnc.fromRow + case Right(func) => + func + } + ForeachWriterFactory(writer, rowConverter) + } + } + } } } } -object ForeachWriteSupportProvider { +object ForeachWriterTable { def apply[T]( writer: ForeachWriter[T], - encoder: ExpressionEncoder[T]): ForeachWriteSupportProvider[_] = { + encoder: ExpressionEncoder[T]): ForeachWriterTable[_] = { writer match { case pythonWriter: PythonForeachWriter => - new ForeachWriteSupportProvider[UnsafeRow]( + new ForeachWriterTable[UnsafeRow]( pythonWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow])) case _ => - new ForeachWriteSupportProvider[T](writer, Left(encoder)) + new ForeachWriterTable[T](writer, Left(encoder)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala index 143235e..f395189 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} /** * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped * streaming write support. */ -class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWrite { +class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWrite) extends BatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = { writeSupport.commit(eppchId, messages) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala index 075c6b9..3a00825 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala @@ -40,8 +40,7 @@ import org.apache.spark.sql.types._ * generated rows. The source will try its best to reach `rowsPerSecond`, but the query may * be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. */ -class RateStreamProvider extends DataSourceV2 - with TableProvider with DataSourceRegister { +class RateStreamProvider extends TableProvider with DataSourceRegister { import RateStreamProvider._ override def getTable(options: DataSourceOptions): Table = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index c3b24a8..8ac5bfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -31,8 +31,7 @@ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} -class TextSocketSourceProvider extends DataSourceV2 - with TableProvider with DataSourceRegister with Logging { +class TextSocketSourceProvider extends TableProvider with DataSourceRegister with Logging { private def checkParameters(params: DataSourceOptions): Unit = { logWarning("The socket source should not be used for production applications! " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index c50dc7b..3fc2cbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -32,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} -import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -42,15 +42,31 @@ import org.apache.spark.sql.types.StructType * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. */ -class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider - with MemorySinkBase with Logging { - - override def createStreamingWriteSupport( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { - new MemoryStreamingWriteSupport(this, mode, schema) +class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Logging { + + override def name(): String = "MemorySinkV2" + + override def schema(): StructType = StructType(Nil) + + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + new WriteBuilder with SupportsOutputMode { + private var mode: OutputMode = _ + private var inputSchema: StructType = _ + + override def outputMode(mode: OutputMode): WriteBuilder = { + this.mode = mode + this + } + + override def withInputDataSchema(schema: StructType): WriteBuilder = { + this.inputSchema = schema + this + } + + override def buildForStreaming(): StreamingWrite = { + new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema) + } + } } private case class AddedData(batchId: Long, data: Array[Row]) @@ -122,9 +138,9 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} -class MemoryStreamingWriteSupport( +class MemoryStreamingWrite( val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType) - extends StreamingWriteSupport { + extends StreamingWrite { override def createStreamingWriterFactory: MemoryWriterFactory = { MemoryWriterFactory(outputMode, schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 8666818..ef21caa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -173,7 +173,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo ds match { case provider: TableProvider => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - ds = provider, conf = sparkSession.sessionState.conf) + source = provider, conf = sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions val dsOptions = new DataSourceOptions(options.asJava) val table = userSpecifiedSchema match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index ea596ba..9841994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.execution.streaming.sources._ -import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, TableProvider} /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { query } else if (source == "foreach") { assertNotPartitioned("foreach") - val sink = ForeachWriteSupportProvider[T](foreachWriter, ds.exprEnc) + val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc) df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), @@ -304,30 +304,29 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { useTempCheckpointLocation = true, trigger = trigger) } else { - val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) + val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") - var options = extraOptions.toMap - val sink = ds.getConstructor().newInstance() match { - case w: StreamingWriteSupportProvider - if !disabledSources.contains(w.getClass.getCanonicalName) => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - w, df.sparkSession.sessionState.conf) - options = sessionOptions ++ extraOptions - w - case _ => - val ds = DataSource( - df.sparkSession, - className = source, - options = options, - partitionColumns = normalizedParCols.getOrElse(Nil)) - ds.createSink(outputMode) + val useV1Source = disabledSources.contains(cls.getCanonicalName) + + val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + source = provider, conf = df.sparkSession.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { + case s: SupportsStreamingWrite => s + case _ => createV1Sink() + } + } else { + createV1Sink() } df.sparkSession.sessionState.streamingQueryManager.startQuery( - options.get("queryName"), - options.get("checkpointLocation"), + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), df, - options, + extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = source == "console" || source == "noop", @@ -336,6 +335,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } } + private def createV1Sink(): BaseStreamingSink = { + val ds = DataSource( + df.sparkSession, + className = source, + options = extraOptions.toMap, + partitionColumns = normalizedParCols.getOrElse(Nil)) + ds.createSink(outputMode) + } + /** * Sets the output of the streaming query to be processed using the provided writer object. * object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 0bd8a92..a7fa800 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS -import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.sources.v2.SupportsStreamingWrite import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -261,7 +261,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } (sink, trigger) match { - case (v2Sink: StreamingWriteSupportProvider, trigger: ContinuousTrigger) => + case (v2Sink: SupportsStreamingWrite, trigger: ContinuousTrigger) => if (operationCheckEnabled) { UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) } diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index a36b0cf..914af58 100644 --- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -9,6 +9,6 @@ org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly org.apache.spark.sql.streaming.sources.FakeReadBothModes org.apache.spark.sql.streaming.sources.FakeReadNeitherMode -org.apache.spark.sql.streaming.sources.FakeWriteSupportProvider +org.apache.spark.sql.streaming.sources.FakeWriteOnly org.apache.spark.sql.streaming.sources.FakeNoWrite org.apache.spark.sql.streaming.sources.FakeWriteSupportProviderV1Fallback diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index 6185736..e804377 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -43,9 +43,9 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("streaming writer") { val sink = new MemorySinkV2 - val writeSupport = new MemoryStreamingWriteSupport( + val write = new MemoryStreamingWrite( sink, OutputMode.Append(), new StructType().add("i", "int")) - writeSupport.commit(0, + write.commit(0, Array( MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), @@ -53,7 +53,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { )) assert(sink.latestBatchId.contains(0)) assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) - writeSupport.commit(19, + write.commit(19, Array( MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), MemoryWriterCommitMessage(0, Seq(Row(33))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala index f903c17..0b1e3b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala @@ -33,8 +33,8 @@ class DataSourceV2UtilsSuite extends SparkFunSuite { conf.setConfString(s"spark.sql.$keyPrefix.config.name", "false") conf.setConfString("spark.datasource.another.config.name", "123") conf.setConfString(s"spark.datasource.$keyPrefix.", "123") - val cs = classOf[DataSourceV2WithSessionConfig].getConstructor().newInstance() - val confs = DataSourceV2Utils.extractSessionConfigs(cs.asInstanceOf[DataSourceV2], conf) + val source = new DataSourceV2WithSessionConfig + val confs = DataSourceV2Utils.extractSessionConfigs(source, conf) assert(confs.size == 2) assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0) assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index daca65f..c56a545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -38,8 +38,7 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ -class SimpleWritableDataSource extends DataSourceV2 - with TableProvider with SessionConfigSupport { +class SimpleWritableDataSource extends TableProvider with SessionConfigSupport { private val tableSchema = new StructType().add("i", "long").add("j", "long") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala index d3d210c..bad2259 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset} -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types.{DataType, IntegerType, StructType} @@ -43,7 +43,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar { override def beforeEach(): Unit = { super.beforeEach() epochEndpoint = EpochCoordinatorRef.create( - mock[StreamingWriteSupport], + mock[StreamingWrite], mock[ContinuousStream], mock[ContinuousExecution], coordinatorId, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index a0b56ec..f74285f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.test.TestSparkSession class EpochCoordinatorSuite @@ -40,13 +40,13 @@ class EpochCoordinatorSuite private var epochCoordinator: RpcEndpointRef = _ - private var writeSupport: StreamingWriteSupport = _ + private var writeSupport: StreamingWrite = _ private var query: ContinuousExecution = _ private var orderVerifier: InOrder = _ override def beforeEach(): Unit = { val stream = mock[ContinuousStream] - writeSupport = mock[StreamingWriteSupport] + writeSupport = mock[StreamingWrite] query = mock[ContinuousExecution] orderVerifier = inOrder(writeSupport, query) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 62f1666..c841793 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.WriteBuilder import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -71,13 +71,10 @@ trait FakeContinuousReadTable extends Table with SupportsContinuousRead { override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder } -trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider { - override def createStreamingWriteSupport( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { - LastWriteOptions.options = options +trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite { + override def name(): String = "fake" + override def schema(): StructType = StructType(Seq()) + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { throw new IllegalStateException("fake sink - cannot actually write") } } @@ -129,20 +126,33 @@ class FakeReadNeitherMode extends DataSourceRegister with TableProvider { } } -class FakeWriteSupportProvider +class FakeWriteOnly extends DataSourceRegister - with FakeStreamingWriteSupportProvider + with TableProvider with SessionConfigSupport { override def shortName(): String = "fake-write-microbatch-continuous" override def keyPrefix: String = shortName() + + override def getTable(options: DataSourceOptions): Table = { + LastWriteOptions.options = options + new Table with FakeStreamingWriteTable { + override def name(): String = "fake" + override def schema(): StructType = StructType(Nil) + } + } } -class FakeNoWrite extends DataSourceRegister { +class FakeNoWrite extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-write-neither-mode" + override def getTable(options: DataSourceOptions): Table = { + new Table { + override def name(): String = "fake" + override def schema(): StructType = StructType(Nil) + } + } } - case class FakeWriteV1FallbackException() extends Exception class FakeSink extends Sink { @@ -150,17 +160,24 @@ class FakeSink extends Sink { } class FakeWriteSupportProviderV1Fallback extends DataSourceRegister - with FakeStreamingWriteSupportProvider with StreamSinkProvider { + with TableProvider with StreamSinkProvider { override def createSink( - sqlContext: SQLContext, - parameters: Map[String, String], - partitionColumns: Seq[String], - outputMode: OutputMode): Sink = { + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { new FakeSink() } override def shortName(): String = "fake-write-v1-fallback" + + override def getTable(options: DataSourceOptions): Table = { + new Table with FakeStreamingWriteTable { + override def name(): String = "fake" + override def schema(): StructType = StructType(Nil) + } + } } object LastReadOptions { @@ -260,7 +277,7 @@ class StreamingDataSourceV2Suite extends StreamTest { testPositiveCaseWithQuery( "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query => assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink - .isInstanceOf[FakeWriteSupportProviderV1Fallback]) + .isInstanceOf[Table]) } // Ensure we create a V1 sink with the config. Note the config is a comma separated @@ -319,19 +336,20 @@ class StreamingDataSourceV2Suite extends StreamTest { for ((read, write, trigger) <- cases) { testQuietly(s"stream with read format $read, write format $write, trigger $trigger") { - val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor() + val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor() + .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty()) + + val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor() .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty()) - val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf). - getConstructor().newInstance() - (table, writeSource, trigger) match { + (sourceTable, sinkTable, trigger) match { // Valid microbatch queries. - case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t) + case (_: SupportsMicroBatchRead, _: SupportsStreamingWrite, t) if !t.isInstanceOf[ContinuousTrigger] => testPositiveCase(read, write, trigger) // Valid continuous queries. - case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider, + case (_: SupportsContinuousRead, _: SupportsStreamingWrite, _: ContinuousTrigger) => testPositiveCase(read, write, trigger) @@ -342,12 +360,12 @@ class StreamingDataSourceV2Suite extends StreamTest { s"Data source $read does not support streamed reading") // Invalid - can't write - case (_, w, _) if !w.isInstanceOf[StreamingWriteSupportProvider] => + case (_, w, _) if !w.isInstanceOf[SupportsStreamingWrite] => testNegativeCase(read, write, trigger, s"Data source $write does not support streamed writing") // Invalid - trigger is continuous but reader is not - case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger) + case (r, _: SupportsStreamingWrite, _: ContinuousTrigger) if !r.isInstanceOf[SupportsContinuousRead] => testNegativeCase(read, write, trigger, s"Data source $read does not support continuous processing") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org