[SPARK-23260][SPARK-23262][SQL] several data source v2 naming cleanup ## What changes were proposed in this pull request?
All other classes in the reader/writer package doesn't have `V2` in their names, and the streaming reader/writer don't have `V2` either. It's more consistent to remove `V2` from `DataSourceV2Reader` and `DataSourceVWriter`. Also rename `DataSourceV2Option` to remote the `V2`, we should only have `V2` in the root interface: `DataSourceV2`. This PR also fixes some places that the mix-in interface doesn't extend the interface it aimed to mix in. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #20427 from cloud-fan/ds-v2. (cherry picked from commit 0a9ac0248b6514a1e83ff7e4c522424f01b8b78d) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3e623b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3e623b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3e623b1 Branch: refs/heads/branch-2.3 Commit: d3e623b19231e6d59793b86afa01f169fb2dedb2 Parents: 107d4e2 Author: Wenchen Fan <[email protected]> Authored: Tue Jan 30 19:43:17 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Tue Jan 30 19:43:42 2018 +0800 ---------------------------------------------------------------------- .../sql/kafka010/KafkaContinuousReader.scala | 2 +- .../sql/kafka010/KafkaSourceProvider.scala | 6 +- .../spark/sql/sources/v2/DataSourceOptions.java | 100 +++++++++++++++++++ .../sql/sources/v2/DataSourceV2Options.java | 100 ------------------- .../spark/sql/sources/v2/ReadSupport.java | 8 +- .../sql/sources/v2/ReadSupportWithSchema.java | 8 +- .../sql/sources/v2/SessionConfigSupport.java | 2 +- .../spark/sql/sources/v2/WriteSupport.java | 12 +-- .../sources/v2/reader/DataReaderFactory.java | 2 +- .../sql/sources/v2/reader/DataSourceReader.java | 80 +++++++++++++++ .../sources/v2/reader/DataSourceV2Reader.java | 79 --------------- .../reader/SupportsPushDownCatalystFilters.java | 4 +- .../v2/reader/SupportsPushDownFilters.java | 4 +- .../reader/SupportsPushDownRequiredColumns.java | 6 +- .../v2/reader/SupportsReportPartitioning.java | 4 +- .../v2/reader/SupportsReportStatistics.java | 4 +- .../v2/reader/SupportsScanColumnarBatch.java | 6 +- .../v2/reader/SupportsScanUnsafeRow.java | 6 +- .../v2/streaming/ContinuousReadSupport.java | 4 +- .../v2/streaming/MicroBatchReadSupport.java | 4 +- .../v2/streaming/StreamWriteSupport.java | 10 +- .../v2/streaming/reader/ContinuousReader.java | 6 +- .../v2/streaming/reader/MicroBatchReader.java | 6 +- .../v2/streaming/writer/StreamWriter.java | 6 +- .../sources/v2/writer/DataSourceV2Writer.java | 94 ----------------- .../sql/sources/v2/writer/DataSourceWriter.java | 94 +++++++++++++++++ .../spark/sql/sources/v2/writer/DataWriter.java | 12 +-- .../sources/v2/writer/DataWriterFactory.java | 2 +- .../v2/writer/SupportsWriteInternalRow.java | 4 +- .../sources/v2/writer/WriterCommitMessage.java | 4 +- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../datasources/v2/DataSourceReaderHolder.scala | 2 +- .../datasources/v2/DataSourceV2Relation.scala | 6 +- .../datasources/v2/DataSourceV2ScanExec.scala | 2 +- .../datasources/v2/WriteToDataSourceV2.scala | 4 +- .../streaming/MicroBatchExecution.scala | 6 +- .../streaming/RateSourceProvider.scala | 2 +- .../spark/sql/execution/streaming/console.scala | 4 +- .../continuous/ContinuousExecution.scala | 6 +- .../continuous/ContinuousRateStreamSource.scala | 7 +- .../streaming/sources/ConsoleWriter.scala | 4 +- .../streaming/sources/MicroBatchWriter.scala | 8 +- .../sources/PackedRowWriterFactory.scala | 4 +- .../streaming/sources/RateStreamSourceV2.scala | 6 +- .../execution/streaming/sources/memoryV2.scala | 6 +- .../spark/sql/streaming/DataStreamReader.scala | 4 +- .../sources/v2/JavaAdvancedDataSourceV2.java | 6 +- .../sql/sources/v2/JavaBatchDataSourceV2.java | 6 +- .../v2/JavaPartitionAwareDataSource.java | 6 +- .../v2/JavaSchemaRequiredDataSource.java | 8 +- .../sql/sources/v2/JavaSimpleDataSourceV2.java | 8 +- .../sources/v2/JavaUnsafeRowDataSourceV2.java | 6 +- .../execution/streaming/RateSourceV2Suite.scala | 18 ++-- .../sql/sources/v2/DataSourceOptionsSuite.scala | 82 +++++++++++++++ .../sources/v2/DataSourceV2OptionsSuite.scala | 82 --------------- .../sql/sources/v2/DataSourceV2Suite.scala | 24 ++--- .../sources/v2/SimpleWritableDataSource.scala | 12 +-- .../sources/StreamingDataSourceV2Suite.scala | 8 +- 59 files changed, 510 insertions(+), 510 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 9125cf5..8c73342 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -41,7 +41,7 @@ import org.apache.spark.unsafe.types.UTF8String * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be * read by per-task consumers generated later. * @param kafkaParams String params for per-task Kafka consumers. - * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which + * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which * are not Kafka consumer params. * @param metadataPath Path to a directory this reader can use for writing metadata. * @param initialOffsets The Kafka offsets to start reading data at. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 2deb7fa..85e96b6 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.sources.v2.DataSourceV2Options +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport} import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter import org.apache.spark.sql.streaming.OutputMode @@ -109,7 +109,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def createContinuousReader( schema: Optional[StructType], metadataPath: String, - options: DataSourceV2Options): KafkaContinuousReader = { + options: DataSourceOptions): KafkaContinuousReader = { val parameters = options.asMap().asScala.toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned @@ -227,7 +227,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister queryId: String, schema: StructType, mode: OutputMode, - options: DataSourceV2Options): StreamWriter = { + options: DataSourceOptions): StreamWriter = { import scala.collection.JavaConverters._ val spark = SparkSession.getActiveSession.get http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java new file mode 100644 index 0000000..c320535 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An immutable string-to-string map in which keys are case-insensitive. This is used to represent + * data source options. + */ [email protected] +public class DataSourceOptions { + private final Map<String, String> keyLowerCasedMap; + + private String toLowerCase(String key) { + return key.toLowerCase(Locale.ROOT); + } + + public static DataSourceOptions empty() { + return new DataSourceOptions(new HashMap<>()); + } + + public DataSourceOptions(Map<String, String> originalMap) { + keyLowerCasedMap = new HashMap<>(originalMap.size()); + for (Map.Entry<String, String> entry : originalMap.entrySet()) { + keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue()); + } + } + + public Map<String, String> asMap() { + return new HashMap<>(keyLowerCasedMap); + } + + /** + * Returns the option value to which the specified key is mapped, case-insensitively. + */ + public Optional<String> get(String key) { + return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key))); + } + + /** + * Returns the boolean value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public boolean getBoolean(String key, boolean defaultValue) { + String lcaseKey = toLowerCase(key); + return keyLowerCasedMap.containsKey(lcaseKey) ? + Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue; + } + + /** + * Returns the integer value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public int getInt(String key, int defaultValue) { + String lcaseKey = toLowerCase(key); + return keyLowerCasedMap.containsKey(lcaseKey) ? + Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue; + } + + /** + * Returns the long value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public long getLong(String key, long defaultValue) { + String lcaseKey = toLowerCase(key); + return keyLowerCasedMap.containsKey(lcaseKey) ? + Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue; + } + + /** + * Returns the double value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public double getDouble(String key, double defaultValue) { + String lcaseKey = toLowerCase(key); + return keyLowerCasedMap.containsKey(lcaseKey) ? + Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java deleted file mode 100644 index ddc2acc..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An immutable string-to-string map in which keys are case-insensitive. This is used to represent - * data source options. - */ [email protected] -public class DataSourceV2Options { - private final Map<String, String> keyLowerCasedMap; - - private String toLowerCase(String key) { - return key.toLowerCase(Locale.ROOT); - } - - public static DataSourceV2Options empty() { - return new DataSourceV2Options(new HashMap<>()); - } - - public DataSourceV2Options(Map<String, String> originalMap) { - keyLowerCasedMap = new HashMap<>(originalMap.size()); - for (Map.Entry<String, String> entry : originalMap.entrySet()) { - keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue()); - } - } - - public Map<String, String> asMap() { - return new HashMap<>(keyLowerCasedMap); - } - - /** - * Returns the option value to which the specified key is mapped, case-insensitively. - */ - public Optional<String> get(String key) { - return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key))); - } - - /** - * Returns the boolean value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public boolean getBoolean(String key, boolean defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * Returns the integer value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public int getInt(String key, int defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * Returns the long value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public long getLong(String key, long defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * Returns the double value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public double getDouble(String key, double defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java index 948e20b..0ea4dc6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java @@ -18,17 +18,17 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to * provide data reading ability and scan the data from the data source. */ @InterfaceStability.Evolving -public interface ReadSupport { +public interface ReadSupport extends DataSourceV2 { /** - * Creates a {@link DataSourceV2Reader} to scan the data from this data source. + * Creates a {@link DataSourceReader} to scan the data from this data source. * * If this method fails (by throwing an exception), the action would fail and no Spark job was * submitted. @@ -36,5 +36,5 @@ public interface ReadSupport { * @param options the options for the returned data source reader, which is an immutable * case-insensitive string-to-string map. */ - DataSourceV2Reader createReader(DataSourceV2Options options); + DataSourceReader createReader(DataSourceOptions options); } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java index b69c6be..3801402 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.types.StructType; /** @@ -30,10 +30,10 @@ import org.apache.spark.sql.types.StructType; * supports both schema inference and user-specified schema. */ @InterfaceStability.Evolving -public interface ReadSupportWithSchema { +public interface ReadSupportWithSchema extends DataSourceV2 { /** - * Create a {@link DataSourceV2Reader} to scan the data from this data source. + * Create a {@link DataSourceReader} to scan the data from this data source. * * If this method fails (by throwing an exception), the action would fail and no Spark job was * submitted. @@ -45,5 +45,5 @@ public interface ReadSupportWithSchema { * @param options the options for the returned data source reader, which is an immutable * case-insensitive string-to-string map. */ - DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options); + DataSourceReader createReader(StructType schema, DataSourceOptions options); } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java index 3cb020d..9d66805 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java @@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability; * session. */ @InterfaceStability.Evolving -public interface SessionConfigSupport { +public interface SessionConfigSupport extends DataSourceV2 { /** * Key prefix of the session configs to propagate. Spark will extract all session configs that http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java index 1e3b644..cab5645 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java @@ -21,7 +21,7 @@ import java.util.Optional; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.types.StructType; /** @@ -29,17 +29,17 @@ import org.apache.spark.sql.types.StructType; * provide data writing ability and save the data to the data source. */ @InterfaceStability.Evolving -public interface WriteSupport { +public interface WriteSupport extends DataSourceV2 { /** - * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data * sources can return None if there is no writing needed to be done according to the save mode. * * If this method fails (by throwing an exception), the action would fail and no Spark job was * submitted. * * @param jobId A unique string for the writing job. It's possible that there are many writing - * jobs running at the same time, and the returned {@link DataSourceV2Writer} can + * jobs running at the same time, and the returned {@link DataSourceWriter} can * use this job id to distinguish itself from other jobs. * @param schema the schema of the data to be written. * @param mode the save mode which determines what to do when the data are already in this data @@ -47,6 +47,6 @@ public interface WriteSupport { * @param options the options for the returned data source writer, which is an immutable * case-insensitive string-to-string map. */ - Optional<DataSourceV2Writer> createWriter( - String jobId, StructType schema, SaveMode mode, DataSourceV2Options options); + Optional<DataSourceWriter> createWriter( + String jobId, StructType schema, SaveMode mode, DataSourceOptions options); } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java index 077b95b..32e98e8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java @@ -22,7 +22,7 @@ import java.io.Serializable; import org.apache.spark.annotation.InterfaceStability; /** - * A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is + * A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is * responsible for creating the actual data reader. The relationship between * {@link DataReaderFactory} and {@link DataReader} * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java new file mode 100644 index 0000000..a470bcc --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.List; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; +import org.apache.spark.sql.types.StructType; + +/** + * A data source reader that is returned by + * {@link ReadSupport#createReader(DataSourceOptions)} or + * {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}. + * It can mix in various query optimization interfaces to speed up the data scan. The actual scan + * logic is delegated to {@link DataReaderFactory}s that are returned by + * {@link #createDataReaderFactories()}. + * + * There are mainly 3 kinds of query optimizations: + * 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column + * pruning), etc. Names of these interfaces start with `SupportsPushDown`. + * 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. + * Names of these interfaces start with `SupportsReporting`. + * 3. Special scans. E.g, columnar scan, unsafe row scan, etc. + * Names of these interfaces start with `SupportsScan`. Note that a reader should only + * implement at most one of the special scans, if more than one special scans are implemented, + * only one of them would be respected, according to the priority list from high to low: + * {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}. + * + * If an exception was throw when applying any of these query optimizations, the action would fail + * and no Spark job was submitted. + * + * Spark first applies all operator push-down optimizations that this data source supports. Then + * Spark collects information this data source reported for further optimizations. Finally Spark + * issues the scan request and does the actual data reading. + */ [email protected] +public interface DataSourceReader { + + /** + * Returns the actual schema of this data source reader, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + StructType readSchema(); + + /** + * Returns a list of reader factories. Each factory is responsible for creating a data reader to + * output data for one RDD partition. That means the number of factories returned here is same as + * the number of RDD partitions this scan outputs. + * + * Note that, this may not be a full scan if the data source reader mixes in other optimization + * interfaces like column pruning, filter push-down, etc. These optimizations are applied before + * Spark issues the scan request. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + List<DataReaderFactory<Row>> createDataReaderFactories(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java deleted file mode 100644 index 0180cd9..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import java.util.List; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.types.StructType; - -/** - * A data source reader that is returned by - * {@link org.apache.spark.sql.sources.v2.ReadSupport#createReader( - * org.apache.spark.sql.sources.v2.DataSourceV2Options)} or - * {@link org.apache.spark.sql.sources.v2.ReadSupportWithSchema#createReader( - * StructType, org.apache.spark.sql.sources.v2.DataSourceV2Options)}. - * It can mix in various query optimization interfaces to speed up the data scan. The actual scan - * logic is delegated to {@link DataReaderFactory}s that are returned by - * {@link #createDataReaderFactories()}. - * - * There are mainly 3 kinds of query optimizations: - * 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column - * pruning), etc. Names of these interfaces start with `SupportsPushDown`. - * 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. - * Names of these interfaces start with `SupportsReporting`. - * 3. Special scans. E.g, columnar scan, unsafe row scan, etc. - * Names of these interfaces start with `SupportsScan`. Note that a reader should only - * implement at most one of the special scans, if more than one special scans are implemented, - * only one of them would be respected, according to the priority list from high to low: - * {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}. - * - * If an exception was throw when applying any of these query optimizations, the action would fail - * and no Spark job was submitted. - * - * Spark first applies all operator push-down optimizations that this data source supports. Then - * Spark collects information this data source reported for further optimizations. Finally Spark - * issues the scan request and does the actual data reading. - */ [email protected] -public interface DataSourceV2Reader { - - /** - * Returns the actual schema of this data source reader, which may be different from the physical - * schema of the underlying storage, as column pruning or other optimizations may happen. - * - * If this method fails (by throwing an exception), the action would fail and no Spark job was - * submitted. - */ - StructType readSchema(); - - /** - * Returns a list of reader factories. Each factory is responsible for creating a data reader to - * output data for one RDD partition. That means the number of factories returned here is same as - * the number of RDD partitions this scan outputs. - * - * Note that, this may not be a full scan if the data source reader mixes in other optimization - * interfaces like column pruning, filter push-down, etc. These optimizations are applied before - * Spark issues the scan request. - * - * If this method fails (by throwing an exception), the action would fail and no Spark job was - * submitted. - */ - List<DataReaderFactory<Row>> createDataReaderFactories(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java index f76c687..9822410 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java @@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.expressions.Expression; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to push down arbitrary expressions as predicates to the data source. * This is an experimental and unstable interface as {@link Expression} is not public and may get * changed in the future Spark versions. @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression; * process this interface. */ @InterfaceStability.Unstable -public interface SupportsPushDownCatalystFilters { +public interface SupportsPushDownCatalystFilters extends DataSourceReader { /** * Pushes down filters, and returns unsupported filters. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index 6b0c9d4..f35c711 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java @@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.Filter; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to push down filters to the data source and reduce the size of the data to be read. * * Note that, if data source readers implement both this interface and @@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.Filter; * {@link SupportsPushDownCatalystFilters}. */ @InterfaceStability.Evolving -public interface SupportsPushDownFilters { +public interface SupportsPushDownFilters extends DataSourceReader { /** * Pushes down filters, and returns unsupported filters. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java index fe0ac8e..427b4d0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java @@ -21,12 +21,12 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to push down required columns to the data source and only read these columns during * scan to reduce the size of the data to be read. */ @InterfaceStability.Evolving -public interface SupportsPushDownRequiredColumns { +public interface SupportsPushDownRequiredColumns extends DataSourceReader { /** * Applies column pruning w.r.t. the given requiredSchema. @@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns { * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. * - * Note that, data source readers should update {@link DataSourceV2Reader#readSchema()} after + * Note that, data source readers should update {@link DataSourceReader#readSchema()} after * applying column pruning. */ void pruneColumns(StructType requiredSchema); http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java index f786472..a2383a9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java @@ -20,11 +20,11 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; /** - * A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this * interface to report data partitioning and try to avoid shuffle at Spark side. */ @InterfaceStability.Evolving -public interface SupportsReportPartitioning { +public interface SupportsReportPartitioning extends DataSourceReader { /** * Returns the output data partitioning that this reader guarantees. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java index c019d2f..11bb13f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java @@ -20,11 +20,11 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; /** - * A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this * interface to report statistics to Spark. */ @InterfaceStability.Evolving -public interface SupportsReportStatistics { +public interface SupportsReportStatistics extends DataSourceReader { /** * Returns the basic statistics of this data source. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java index 67da555..2e5cfa7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java @@ -24,11 +24,11 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.vectorized.ColumnarBatch; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to output {@link ColumnarBatch} and make the scan faster. */ @InterfaceStability.Evolving -public interface SupportsScanColumnarBatch extends DataSourceV2Reader { +public interface SupportsScanColumnarBatch extends DataSourceReader { @Override default List<DataReaderFactory<Row>> createDataReaderFactories() { throw new IllegalStateException( @@ -36,7 +36,7 @@ public interface SupportsScanColumnarBatch extends DataSourceV2Reader { } /** - * Similar to {@link DataSourceV2Reader#createDataReaderFactories()}, but returns columnar data + * Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data * in batches. */ List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories(); http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java index 156af69..9cd749e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java @@ -24,13 +24,13 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to output {@link UnsafeRow} directly and avoid the row copy at Spark side. * This is an experimental and unstable interface, as {@link UnsafeRow} is not public and may get * changed in the future Spark versions. */ @InterfaceStability.Unstable -public interface SupportsScanUnsafeRow extends DataSourceV2Reader { +public interface SupportsScanUnsafeRow extends DataSourceReader { @Override default List<DataReaderFactory<Row>> createDataReaderFactories() { @@ -39,7 +39,7 @@ public interface SupportsScanUnsafeRow extends DataSourceV2Reader { } /** - * Similar to {@link DataSourceV2Reader#createDataReaderFactories()}, + * Similar to {@link DataSourceReader#createDataReaderFactories()}, * but returns data in unsafe row format. */ List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories(); http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java index 9a93a80..f79424e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java @@ -21,7 +21,7 @@ import java.util.Optional; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader; import org.apache.spark.sql.types.StructType; @@ -44,5 +44,5 @@ public interface ContinuousReadSupport extends DataSourceV2 { ContinuousReader createContinuousReader( Optional<StructType> schema, String checkpointLocation, - DataSourceV2Options options); + DataSourceOptions options); } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java index 3b357c0..22660e4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java @@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2.streaming; import java.util.Optional; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.DataSourceV2Options; import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader; import org.apache.spark.sql.types.StructType; @@ -50,5 +50,5 @@ public interface MicroBatchReadSupport extends DataSourceV2 { MicroBatchReader createMicroBatchReader( Optional<StructType> schema, String checkpointLocation, - DataSourceV2Options options); + DataSourceOptions options); } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java index 6cd219c..7c5f304 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java @@ -19,10 +19,10 @@ package org.apache.spark.sql.sources.v2.streaming; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.DataSourceV2Options; import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter; -import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.types.StructType; @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType; * provide data writing ability for structured streaming. */ @InterfaceStability.Evolving -public interface StreamWriteSupport extends BaseStreamingSink { +public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink { /** * Creates an optional {@link StreamWriter} to save the data to this data source. Data @@ -39,7 +39,7 @@ public interface StreamWriteSupport extends BaseStreamingSink { * * @param queryId A unique string for the writing query. It's possible that there are many * writing queries running at the same time, and the returned - * {@link DataSourceV2Writer} can use this id to distinguish itself from others. + * {@link DataSourceWriter} can use this id to distinguish itself from others. * @param schema the schema of the data to be written. * @param mode the output mode which determines what successive epoch output means to this * sink, please refer to {@link OutputMode} for more details. @@ -50,5 +50,5 @@ public interface StreamWriteSupport extends BaseStreamingSink { String queryId, StructType schema, OutputMode mode, - DataSourceV2Options options); + DataSourceOptions options); } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java index 3ac979c..6e5177e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java @@ -19,12 +19,12 @@ package org.apache.spark.sql.sources.v2.streaming.reader; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.execution.streaming.BaseStreamingSource; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import java.util.Optional; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to allow reading in a continuous processing mode stream. * * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}. @@ -33,7 +33,7 @@ import java.util.Optional; * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. */ @InterfaceStability.Evolving -public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader { +public interface ContinuousReader extends BaseStreamingSource, DataSourceReader { /** * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each * partition to a single global offset. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java index 68887e5..fcec446 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java @@ -18,20 +18,20 @@ package org.apache.spark.sql.sources.v2.streaming.reader; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.execution.streaming.BaseStreamingSource; import java.util.Optional; /** - * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this * interface to indicate they allow micro-batch streaming reads. * * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. */ @InterfaceStability.Evolving -public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource { +public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource { /** * Set the desired offset range for reader factories created from this reader. Reader factories * will generate only data within (`start`, `end`]; that is, from the first record after `start` http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java index 3156c88..915ee6c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java @@ -18,19 +18,19 @@ package org.apache.spark.sql.sources.v2.streaming.writer; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; /** - * A {@link DataSourceV2Writer} for use with structured streaming. This writer handles commits and + * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and * aborts relative to an epoch ID determined by the execution engine. * * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs, * and so must reset any internal state after a successful commit. */ @InterfaceStability.Evolving -public interface StreamWriter extends DataSourceV2Writer { +public interface StreamWriter extends DataSourceWriter { /** * Commits this writing job for the specified epoch with a list of commit messages. The commit * messages are collected from successful data writers and are produced by http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java deleted file mode 100644 index 8048f50..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.DataSourceV2Options; -import org.apache.spark.sql.sources.v2.WriteSupport; -import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.types.StructType; - -/** - * A data source writer that is returned by - * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/ - * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter( - * String, StructType, OutputMode, DataSourceV2Options)}. - * It can mix in various writing optimization interfaces to speed up the data saving. The actual - * writing logic is delegated to {@link DataWriter}. - * - * If an exception was throw when applying any of these writing optimizations, the action would fail - * and no Spark job was submitted. - * - * The writing procedure is: - * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the - * partitions of the input data(RDD). - * 2. For each partition, create the data writer, and write the data of the partition with this - * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If - * exception happens during the writing, call {@link DataWriter#abort()}. - * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If - * some writers are aborted, or the job failed with an unknown reason, call - * {@link #abort(WriterCommitMessage[])}. - * - * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should - * do it manually in their Spark applications if they want to retry. - * - * Please refer to the documentation of commit/abort methods for detailed specifications. - */ [email protected] -public interface DataSourceV2Writer { - - /** - * Creates a writer factory which will be serialized and sent to executors. - * - * If this method fails (by throwing an exception), the action would fail and no Spark job was - * submitted. - */ - DataWriterFactory<Row> createWriterFactory(); - - /** - * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. - * - * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination - * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. - * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. - */ - void commit(WriterCommitMessage[] messages); - - /** - * Aborts this writing job because some data writers are failed and keep failing when retry, or - * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. - * - * If this method fails (by throwing an exception), the underlying data source may require manual - * cleanup. - * - * Unless the abort is triggered by the failure of commit, the given messages should have some - * null slots as there maybe only a few data writers that are committed before the abort - * happens, or some data writers were committed but their commit messages haven't reached the - * driver when the abort is triggered. So this is just a "best effort" for data sources to - * clean up the data left by data writers. - */ - void abort(WriterCommitMessage[] messages); -} http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java new file mode 100644 index 0000000..d89d27d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/ + * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter( + * String, StructType, OutputMode, DataSourceOptions)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link DataWriter}. + * + * If an exception was throw when applying any of these writing optimizations, the action would fail + * and no Spark job was submitted. + * + * The writing procedure is: + * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. + * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call + * {@link #abort(WriterCommitMessage[])}. + * + * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should + * do it manually in their Spark applications if they want to retry. + * + * Please refer to the documentation of commit/abort methods for detailed specifications. + */ [email protected] +public interface DataSourceWriter { + + /** + * Creates a writer factory which will be serialized and sent to executors. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + */ + DataWriterFactory<Row> createWriterFactory(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * successful data writers and are produced by {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination + * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. + * + * Note that, one partition may have multiple committed data writers because of speculative tasks. + * Spark will pick the first successful one and get its commit message. Implementations should be + * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data + * writer can commit, or have a way to clean up the data of already-committed writers. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed and keep failing when retry, or + * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * + * If this method fails (by throwing an exception), the underlying data source may require manual + * cleanup. + * + * Unless the abort is triggered by the failure of commit, the given messages should have some + * null slots as there maybe only a few data writers that are committed before the abort + * happens, or some data writers were committed but their commit messages haven't reached the + * driver when the abort is triggered. So this is just a "best effort" for data sources to + * clean up the data left by data writers. + */ + void abort(WriterCommitMessage[] messages); +} http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 04b03e6..53941a8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -33,11 +33,11 @@ import org.apache.spark.annotation.InterfaceStability; * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to - * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data + * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark will retry this writing task for some times, * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`, - * and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail. + * and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task * takes too long to finish. Different from retried tasks, which are launched one by one after the @@ -69,11 +69,11 @@ public interface DataWriter<T> { /** * Commits this writer after all records are written successfully, returns a commit message which * will be sent back to driver side and passed to - * {@link DataSourceV2Writer#commit(WriterCommitMessage[])}. + * {@link DataSourceWriter#commit(WriterCommitMessage[])}. * * The written data should only be visible to data source readers after - * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} succeeds, which means this method - * should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to + * {@link DataSourceWriter#commit(WriterCommitMessage[])} succeeds, which means this method + * should still "hide" the written data and ask the {@link DataSourceWriter} at driver side to * do the final commit via {@link WriterCommitMessage}. * * If this method fails (by throwing an exception), {@link #abort()} will be called and this @@ -91,7 +91,7 @@ public interface DataWriter<T> { * failed. * * If this method fails(by throwing an exception), the underlying data source may have garbage - * that need to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually, + * that need to be cleaned by {@link DataSourceWriter#abort(WriterCommitMessage[])} or manually, * but these garbage should not be visible to data source readers. * * @throws IOException if failure happens during disk/network IO like writing files. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 18ec792..ea95442 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -22,7 +22,7 @@ import java.io.Serializable; import org.apache.spark.annotation.InterfaceStability; /** - * A factory of {@link DataWriter} returned by {@link DataSourceV2Writer#createWriterFactory()}, + * A factory of {@link DataWriter} returned by {@link DataSourceWriter#createWriterFactory()}, * which is responsible for creating and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java index 3e05188..d2cf7e0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java @@ -22,14 +22,14 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; /** - * A mix-in interface for {@link DataSourceV2Writer}. Data source writers can implement this + * A mix-in interface for {@link DataSourceWriter}. Data source writers can implement this * interface to write {@link InternalRow} directly and avoid the row conversion at Spark side. * This is an experimental and unstable interface, as {@link InternalRow} is not public and may get * changed in the future Spark versions. */ @InterfaceStability.Unstable -public interface SupportsWriteInternalRow extends DataSourceV2Writer { +public interface SupportsWriteInternalRow extends DataSourceWriter { @Override default DataWriterFactory<Row> createWriterFactory() { http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java index 082d6b5..9e38836 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -23,10 +23,10 @@ import org.apache.spark.annotation.InterfaceStability; /** * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side - * as the input parameter of {@link DataSourceV2Writer#commit(WriterCommitMessage[])}. + * as the input parameter of {@link DataSourceWriter#commit(WriterCommitMessage[])}. * * This is an empty interface, data sources should define their own message class and use it in - * their {@link DataWriter#commit()} and {@link DataSourceV2Writer#commit(WriterCommitMessage[])} + * their {@link DataWriter#commit()} and {@link DataSourceWriter#commit(WriterCommitMessage[])} * implementations. */ @InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b714a46..46b5f54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -186,7 +186,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance() - val options = new DataSourceV2Options((extraOptions ++ + val options = new DataSourceOptions((extraOptions ++ DataSourceV2Utils.extractSessionConfigs( ds = ds.asInstanceOf[DataSourceV2], conf = sparkSession.sessionState.conf)).asJava) http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5c02eae..ed7a910 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -243,7 +243,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val ds = cls.newInstance() ds match { case ws: WriteSupport => - val options = new DataSourceV2Options((extraOptions ++ + val options = new DataSourceOptions((extraOptions ++ DataSourceV2Utils.extractSessionConfigs( ds = ds.asInstanceOf[DataSourceV2], conf = df.sparkSession.sessionState.conf)).asJava) http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala index 6093df2..6460c97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala @@ -35,7 +35,7 @@ trait DataSourceReaderHolder { /** * The held data source reader. */ - def reader: DataSourceV2Reader + def reader: DataSourceReader /** * The metadata of this data source reader that can be used for equality test. http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index cba20dd..3d4c649 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.sources.v2.reader._ case class DataSourceV2Relation( fullOutput: Seq[AttributeReference], - reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { + reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] @@ -41,12 +41,12 @@ case class DataSourceV2Relation( */ class StreamingDataSourceV2Relation( fullOutput: Seq[AttributeReference], - reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) { + reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) { override def isStreaming: Boolean = true } object DataSourceV2Relation { - def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { + def apply(reader: DataSourceReader): DataSourceV2Relation = { new DataSourceV2Relation(reader.readSchema().toAttributes, reader) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 3f808fb..ee08582 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType */ case class DataSourceV2ScanExec( fullOutput: Seq[AttributeReference], - @transient reader: DataSourceV2Reader) + @transient reader: DataSourceReader) extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index cd6b3e9..c544adb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils /** * The logical plan for writing data into data source v2. */ -case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) extends LogicalPlan { +case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil } @@ -43,7 +43,7 @@ case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) e /** * The physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) extends SparkPlan { +case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 9759752..93572f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter} -import org.apache.spark.sql.sources.v2.DataSourceV2Options +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport} import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow @@ -89,7 +89,7 @@ class MicroBatchExecution( val reader = source.createMicroBatchReader( Optional.empty(), // user specified schema metadataPath, - new DataSourceV2Options(options.asJava)) + new DataSourceOptions(options.asJava)) nextSourceId += 1 StreamingExecutionRelation(reader, output)(sparkSession) }) @@ -447,7 +447,7 @@ class MicroBatchExecution( s"$runId", newAttributePlan.schema, outputMode, - new DataSourceV2Options(extraOptions.asJava)) + new DataSourceOptions(extraOptions.asJava)) if (writer.isInstanceOf[SupportsWriteInternalRow]) { WriteToDataSourceV2( new InternalRowMicroBatchWriter(currentBatchId, writer), newAttributePlan) http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala index 66eb016..5e3fee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala @@ -111,7 +111,7 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister override def createContinuousReader( schema: Optional[StructType], checkpointLocation: String, - options: DataSourceV2Options): ContinuousReader = { + options: DataSourceOptions): ContinuousReader = { new RateStreamContinuousReader(options) } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index d5ac0bd..3f5bb48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} -import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2} import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter import org.apache.spark.sql.streaming.OutputMode @@ -40,7 +40,7 @@ class ConsoleSinkProvider extends DataSourceV2 queryId: String, schema: StructType, mode: OutputMode, - options: DataSourceV2Options): StreamWriter = { + options: DataSourceOptions): StreamWriter = { new ConsoleWriter(schema, options) } http://git-wip-us.apache.org/repos/asf/spark/blob/d3e623b1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 60f880f..9402d7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} -import org.apache.spark.sql.sources.v2.DataSourceV2Options +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport} import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} @@ -160,7 +160,7 @@ class ContinuousExecution( dataSource.createContinuousReader( java.util.Optional.empty[StructType](), metadataPath, - new DataSourceV2Options(extraReaderOptions.asJava)) + new DataSourceOptions(extraReaderOptions.asJava)) } uniqueSources = continuousSources.distinct @@ -198,7 +198,7 @@ class ContinuousExecution( s"$runId", triggerLogicalPlan.schema, outputMode, - new DataSourceV2Options(extraOptions.asJava)) + new DataSourceOptions(extraOptions.asJava)) val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan) val reader = withSink.collect { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
