http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java deleted file mode 100644 index 9a3ad2e..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.execution.streaming.BaseStreamingSource; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.ScanConfig; -import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; - -/** - * An interface that defines how to load the data from data source for continuous streaming - * processing. - * - * The execution engine will get an instance of this interface from a data source provider - * (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a - * streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of - * {@link ScanConfig} for the duration of the streaming query or until - * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create - * input partitions and reader factory to scan data with a Spark job for its duration. At the end - * {@link #stop()} will be called when the streaming execution is completed. Note that a single - * query may have multiple executions due to restart or failure recovery. - */ -@InterfaceStability.Evolving -public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { - - /** - * Returns a builder of {@link ScanConfig}. Spark will call this method and create a - * {@link ScanConfig} for each data scanning job. - * - * The builder can take some query specific information to do operators pushdown, store streaming - * offsets, etc., and keep these information in the created {@link ScanConfig}. - * - * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} - * needs to take {@link ScanConfig} as an input. - */ - ScanConfigBuilder newScanConfigBuilder(Offset start); - - /** - * Returns a factory, which produces one {@link ContinuousPartitionReader} for one - * {@link InputPartition}. - */ - ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config); - - /** - * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances - * for each partition to a single global offset. - */ - Offset mergeOffsets(PartitionOffset[] offsets); - - /** - * The execution engine will call this method in every epoch to determine if new input - * partitions need to be generated, which may be required if for example the underlying - * source system has had partitions added or removed. - * - * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} - * instance. - */ - default boolean needsReconfiguration(ScanConfig config) { - return false; - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java new file mode 100644 index 0000000..6e960be --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to allow reading in a continuous processing mode stream. + * + * Implementations must ensure each partition reader is a {@link ContinuousInputPartitionReader}. + * + * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with + * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. + */ +@InterfaceStability.Evolving +public interface ContinuousReader extends BaseStreamingSource, DataSourceReader { + /** + * Merge partitioned offsets coming from {@link ContinuousInputPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Set the desired start offset for partitions created from this reader. The scan will + * start from the first record after the provided offset, or from an implementation-defined + * inferred starting point if no offset is provided. + */ + void setStartOffset(Optional<Offset> start); + + /** + * Return the specified or inferred start offset for this reader. + * + * @throws IllegalStateException if setStartOffset has not been called + */ + Offset getStartOffset(); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new reader. + */ + default boolean needsReconfiguration() { + return false; + } + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java deleted file mode 100644 index edb0db1..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.execution.streaming.BaseStreamingSource; -import org.apache.spark.sql.sources.v2.reader.*; - -/** - * An interface that defines how to scan the data from data source for micro-batch streaming - * processing. - * - * The execution engine will get an instance of this interface from a data source provider - * (e.g. {@link org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of a - * streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} and create an instance - * of {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be used to create input - * partitions and reader factory to scan a micro-batch with a Spark job. At the end {@link #stop()} - * will be called when the streaming execution is completed. Note that a single query may have - * multiple executions due to restart or failure recovery. - */ -@InterfaceStability.Evolving -public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource { - - /** - * Returns a builder of {@link ScanConfig}. Spark will call this method and create a - * {@link ScanConfig} for each data scanning job. - * - * The builder can take some query specific information to do operators pushdown, store streaming - * offsets, etc., and keep these information in the created {@link ScanConfig}. - * - * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport} - * needs to take {@link ScanConfig} as an input. - */ - ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end); - - /** - * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. - */ - PartitionReaderFactory createReaderFactory(ScanConfig config); - - /** - * Returns the most recent offset available. - */ - Offset latestOffset(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java new file mode 100644 index 0000000..0159c73 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to indicate they allow micro-batch streaming reads. + * + * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with + * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. + */ +@InterfaceStability.Evolving +public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource { + /** + * Set the desired offset range for input partitions created from this reader. Partition readers + * will generate only data within (`start`, `end`]; that is, from the first record after `start` + * to the record with offset `end`. + * + * @param start The initial offset to scan from. If not specified, scan from an + * implementation-specified start point, such as the earliest available record. + * @param end The last offset to include in the scan. If not specified, scan up to an + * implementation-defined endpoint, such as the last available offset + * or the start offset plus a target batch size. + */ + void setOffsetRange(Optional<Offset> start, Optional<Offset> end); + + /** + * Returns the specified (if explicitly set through setOffsetRange) or inferred start offset + * for this reader. + * + * @throws IllegalStateException if setOffsetRange has not been called + */ + Offset getStartOffset(); + + /** + * Return the specified (if explicitly set through setOffsetRange) or inferred end offset + * for this reader. + * + * @throws IllegalStateException if setOffsetRange has not been called + */ + Offset getEndOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java index 6cf2773..e41c035 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java @@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2.reader.streaming; import org.apache.spark.annotation.InterfaceStability; /** - * An abstract representation of progress through a {@link MicroBatchReadSupport} or - * {@link ContinuousReadSupport}. + * An abstract representation of progress through a {@link MicroBatchReader} or + * {@link ContinuousReader}. * During execution, offsets provided by the data source implementation will be logged and used as * restart checkpoints. Each source should provide an offset implementation which the source can use * to reconstruct a position in the stream up to which data has been seen/processed. http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java deleted file mode 100644 index 84872d1..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import org.apache.spark.sql.sources.v2.reader.ReadSupport; - -/** - * A base interface for streaming read support. This is package private and is invisible to data - * sources. Data sources should implement concrete streaming read support interfaces: - * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. - */ -interface StreamingReadSupport extends ReadSupport { - - /** - * Returns the initial offset for a streaming query to start reading from. Note that the - * streaming data source should not assume that it will start reading from its initial offset: - * if Spark is restarting an existing query, it will restart from the check-pointed offset rather - * than the initial one. - */ - Offset initialOffset(); - - /** - * Deserialize a JSON string into an Offset of the implementation-defined offset type. - * - * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader - */ - Offset deserializeOffset(String json); - - /** - * Informs the source that Spark has completed processing all data for offsets less than or - * equal to `end` and will only request offsets greater than `end` in the future. - */ - void commit(Offset end); -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java deleted file mode 100644 index 0ec9e05..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An interface that defines how to write the data to data source for batch processing. - * - * The writing procedure is: - * 1. Create a writer factory by {@link #createBatchWriterFactory()}, serialize and send it to all - * the partitions of the input data(RDD). - * 2. For each partition, create the data writer, and write the data of the partition with this - * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If - * exception happens during the writing, call {@link DataWriter#abort()}. - * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If - * some writers are aborted, or the job failed with an unknown reason, call - * {@link #abort(WriterCommitMessage[])}. - * - * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should - * do it manually in their Spark applications if they want to retry. - * - * Please refer to the documentation of commit/abort methods for detailed specifications. - */ -@InterfaceStability.Evolving -public interface BatchWriteSupport { - - /** - * Creates a writer factory which will be serialized and sent to executors. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - */ - DataWriterFactory createBatchWriterFactory(); - - /** - * Returns whether Spark should use the commit coordinator to ensure that at most one task for - * each partition commits. - * - * @return true if commit coordinator should be used, false otherwise. - */ - default boolean useCommitCoordinator() { - return true; - } - - /** - * Handles a commit message on receiving from a successful data writer. - * - * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. - */ - default void onDataWriterCommit(WriterCommitMessage message) {} - - /** - * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. - * - * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination - * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. - * - * Note that speculative execution may cause multiple tasks to run for a partition. By default, - * Spark uses the commit coordinator to allow at most one task to commit. Implementations can - * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple - * tasks may have committed successfully and one successful commit message per task will be - * passed to this commit method. The remaining commit messages are ignored by Spark. - */ - void commit(WriterCommitMessage[] messages); - - /** - * Aborts this writing job because some data writers are failed and keep failing when retry, - * or the Spark job fails with some unknown reasons, - * or {@link #onDataWriterCommit(WriterCommitMessage)} fails, - * or {@link #commit(WriterCommitMessage[])} fails. - * - * If this method fails (by throwing an exception), the underlying data source may require manual - * cleanup. - * - * Unless the abort is triggered by the failure of commit, the given messages should have some - * null slots as there maybe only a few data writers that are committed before the abort - * happens, or some data writers were committed but their commit messages haven't reached the - * driver when the abort is triggered. So this is just a "best effort" for data sources to - * clean up the data left by data writers. - */ - void abort(WriterCommitMessage[] messages); -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java new file mode 100644 index 0000000..385fc29 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.StreamWriteSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/ + * {@link StreamWriteSupport#createStreamWriter( + * String, StructType, OutputMode, DataSourceOptions)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link DataWriter}. + * + * If an exception was throw when applying any of these writing optimizations, the action will fail + * and no Spark job will be submitted. + * + * The writing procedure is: + * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. + * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call + * {@link #abort(WriterCommitMessage[])}. + * + * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should + * do it manually in their Spark applications if they want to retry. + * + * Please refer to the documentation of commit/abort methods for detailed specifications. + */ +@InterfaceStability.Evolving +public interface DataSourceWriter { + + /** + * Creates a writer factory which will be serialized and sent to executors. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + DataWriterFactory<InternalRow> createWriterFactory(); + + /** + * Returns whether Spark should use the commit coordinator to ensure that at most one task for + * each partition commits. + * + * @return true if commit coordinator should be used, false otherwise. + */ + default boolean useCommitCoordinator() { + return true; + } + + /** + * Handles a commit message on receiving from a successful data writer. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. + */ + default void onDataWriterCommit(WriterCommitMessage message) {} + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * successful data writers and are produced by {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination + * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. + * + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow at most one task to commit. Implementations can + * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple + * tasks may have committed successfully and one successful commit message per task will be + * passed to this commit method. The remaining commit messages are ignored by Spark. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed and keep failing when retry, + * or the Spark job fails with some unknown reasons, + * or {@link #onDataWriterCommit(WriterCommitMessage)} fails, + * or {@link #commit(WriterCommitMessage[])} fails. + * + * If this method fails (by throwing an exception), the underlying data source may require manual + * cleanup. + * + * Unless the abort is triggered by the failure of commit, the given messages should have some + * null slots as there maybe only a few data writers that are committed before the abort + * happens, or some data writers were committed but their commit messages haven't reached the + * driver when the abort is triggered. So this is just a "best effort" for data sources to + * clean up the data left by data writers. + */ + void abort(WriterCommitMessage[] messages); +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 5fb0679..27dc5ea 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.spark.annotation.InterfaceStability; /** - * A data writer returned by {@link DataWriterFactory#createWriter(int, long)} and is + * A data writer returned by {@link DataWriterFactory#createDataWriter(int, long, long)} and is * responsible for writing data for an input RDD partition. * * One Spark task has one exclusive data writer, so there is no thread-safe concern. @@ -36,11 +36,11 @@ import org.apache.spark.annotation.InterfaceStability; * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data + * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark may retry this writing task a few times. - * In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a - * different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])} + * In each retry, {@link DataWriterFactory#createDataWriter(int, long, long)} will receive a + * different `taskId`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])} * when the configured number of retries is exhausted. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task @@ -71,11 +71,11 @@ public interface DataWriter<T> { /** * Commits this writer after all records are written successfully, returns a commit message which * will be sent back to driver side and passed to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])}. + * {@link DataSourceWriter#commit(WriterCommitMessage[])}. * * The written data should only be visible to data source readers after - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method - * should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to + * {@link DataSourceWriter#commit(WriterCommitMessage[])} succeeds, which means this method + * should still "hide" the written data and ask the {@link DataSourceWriter} at driver side to * do the final commit via {@link WriterCommitMessage}. * * If this method fails (by throwing an exception), {@link #abort()} will be called and this @@ -93,7 +93,7 @@ public interface DataWriter<T> { * failed. * * If this method fails(by throwing an exception), the underlying data source may have garbage - * that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually, + * that need to be cleaned by {@link DataSourceWriter#abort(WriterCommitMessage[])} or manually, * but these garbage should not be visible to data source readers. * * @throws IOException if failure happens during disk/network IO like writing files. http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 19a36dd..3d337b6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -19,20 +19,18 @@ package org.apache.spark.sql.sources.v2.writer; import java.io.Serializable; -import org.apache.spark.TaskContext; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.catalyst.InternalRow; /** - * A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()}, + * A factory of {@link DataWriter} returned by {@link DataSourceWriter#createWriterFactory()}, * which is responsible for creating and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer - * will be created on executors and do the actual writing. So this interface must be + * will be created on executors and do the actual writing. So {@link DataWriterFactory} must be * serializable and {@link DataWriter} doesn't need to be. */ @InterfaceStability.Evolving -public interface DataWriterFactory extends Serializable { +public interface DataWriterFactory<T> extends Serializable { /** * Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data @@ -40,16 +38,19 @@ public interface DataWriterFactory extends Serializable { * are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a * list. * - * If this method fails (by throwing an exception), the corresponding Spark write task would fail - * and get retried until hitting the maximum retry times. + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. * * @param partitionId A unique id of the RDD partition that the returned writer will process. * Usually Spark processes many RDD partitions at the same time, * implementations should use the partition id to distinguish writers for * different partitions. - * @param taskId The task id returned by {@link TaskContext#taskAttemptId()}. Spark may run - * multiple tasks for the same partition (due to speculation or task failures, - * for example). + * @param taskId A unique identifier for a task that is performing the write of the partition + * data. Spark may run multiple tasks for the same partition (due to speculation + * or task failures, for example). + * @param epochId A monotonically increasing id for streaming queries that are split in to + * discrete periods of execution. For non-streaming queries, + * this ID will always be 0. */ - DataWriter<InternalRow> createWriter(int partitionId, long taskId); + DataWriter<T> createDataWriter(int partitionId, long taskId, long epochId); } http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java index 123335c..9e38836 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -19,16 +19,15 @@ package org.apache.spark.sql.sources.v2.writer; import java.io.Serializable; -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; import org.apache.spark.annotation.InterfaceStability; /** * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side - * as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or - * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}. + * as the input parameter of {@link DataSourceWriter#commit(WriterCommitMessage[])}. * - * This is an empty interface, data sources should define their own message class and use it when - * generating messages at executor side and handling the messages at driver side. + * This is an empty interface, data sources should define their own message class and use it in + * their {@link DataWriter#commit()} and {@link DataSourceWriter#commit(WriterCommitMessage[])} + * implementations. */ @InterfaceStability.Evolving public interface WriterCommitMessage extends Serializable {} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java new file mode 100644 index 0000000..a316b2a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; + +/** + * A {@link DataSourceWriter} for use with structured streaming. + * + * Streaming queries are divided into intervals of data called epochs, with a monotonically + * increasing numeric ID. This writer handles commits and aborts for each successive epoch. + */ +@InterfaceStability.Evolving +public interface StreamWriter extends DataSourceWriter { + /** + * Commits this writing job for the specified epoch with a list of commit messages. The commit + * messages are collected from successful data writers and are produced by + * {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to have been + * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * + * The execution engine may call commit() multiple times for the same epoch in some circumstances. + * To support exactly-once data semantics, implementations must ensure that multiple commits for + * the same epoch are idempotent. + */ + void commit(long epochId, WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed and keep failing when retried, or + * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * + * If this method fails (by throwing an exception), the underlying data source may require manual + * cleanup. + * + * Unless the abort is triggered by the failure of commit, the given messages will have some + * null slots, as there may be only a few data writers that were committed before the abort + * happens, or some data writers were committed but their commit messages haven't reached the + * driver when the abort is triggered. So this is just a "best effort" for data sources to + * clean up the data left by data writers. + */ + void abort(long epochId, WriterCommitMessage[] messages); + + default void commit(WriterCommitMessage[] messages) { + throw new UnsupportedOperationException( + "Commit without epoch should not be called with StreamWriter"); + } + + default void abort(WriterCommitMessage[] messages) { + throw new UnsupportedOperationException( + "Abort without epoch should not be called with StreamWriter"); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java deleted file mode 100644 index a4da24f..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer.streaming; - -import java.io.Serializable; - -import org.apache.spark.TaskContext; -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.writer.DataWriter; - -/** - * A factory of {@link DataWriter} returned by - * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating - * and initializing the actual data writer at executor side. - * - * Note that, the writer factory will be serialized and sent to executors, then the data writer - * will be created on executors and do the actual writing. So this interface must be - * serializable and {@link DataWriter} doesn't need to be. - */ -@InterfaceStability.Evolving -public interface StreamingDataWriterFactory extends Serializable { - - /** - * Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data - * object instance when sending data to the data writer, for better performance. Data writers - * are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a - * list. - * - * If this method fails (by throwing an exception), the corresponding Spark write task would fail - * and get retried until hitting the maximum retry times. - * - * @param partitionId A unique id of the RDD partition that the returned writer will process. - * Usually Spark processes many RDD partitions at the same time, - * implementations should use the partition id to distinguish writers for - * different partitions. - * @param taskId The task id returned by {@link TaskContext#taskAttemptId()}. Spark may run - * multiple tasks for the same partition (due to speculation or task failures, - * for example). - * @param epochId A monotonically increasing id for streaming queries that are split in to - * discrete periods of execution. - */ - DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId); -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java deleted file mode 100644 index 3fdfac5..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.writer.DataWriter; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; - -/** - * An interface that defines how to write the data to data source for streaming processing. - * - * Streaming queries are divided into intervals of data called epochs, with a monotonically - * increasing numeric ID. This writer handles commits and aborts for each successive epoch. - */ -@InterfaceStability.Evolving -public interface StreamingWriteSupport { - - /** - * Creates a writer factory which will be serialized and sent to executors. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - */ - StreamingDataWriterFactory createStreamingWriterFactory(); - - /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. - * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call - * {@link #abort(long, WriterCommitMessage[])}. - * - * The execution engine may call `commit` multiple times for the same epoch in some circumstances. - * To support exactly-once data semantics, implementations must ensure that multiple commits for - * the same epoch are idempotent. - */ - void commit(long epochId, WriterCommitMessage[] messages); - - /** - * Aborts this writing job because some data writers are failed and keep failing when retried, or - * the Spark job fails with some unknown reasons, or {@link #commit(long, WriterCommitMessage[])} - * fails. - * - * If this method fails (by throwing an exception), the underlying data source may require manual - * cleanup. - * - * Unless the abort is triggered by the failure of commit, the given messages will have some - * null slots, as there may be only a few data writers that were committed before the abort - * happens, or some data writers were committed but their commit messages haven't reached the - * driver when the abort is triggered. So this is just a "best effort" for data sources to - * clean up the data left by data writers. - */ - void abort(long epochId, WriterCommitMessage[] messages); -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e6c2cba..371ec70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance().asInstanceOf[DataSourceV2] - if (ds.isInstanceOf[BatchReadSupportProvider]) { + if (ds.isInstanceOf[ReadSupport]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = ds, conf = sparkSession.sessionState.conf) val pathsOption = { http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index dfb8c47..4aeddfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -240,7 +240,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { if (classOf[DataSourceV2].isAssignableFrom(cls)) { val source = cls.newInstance().asInstanceOf[DataSourceV2] source match { - case provider: BatchWriteSupportProvider => + case ws: WriteSupport => val options = extraOptions ++ DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) @@ -251,10 +251,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } else { - val writer = provider.createBatchWriteSupport( - UUID.randomUUID().toString, - df.logicalPlan.output.toStructType, - mode, + val writer = ws.createWriter( + UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, new DataSourceOptions(options.asJava)) if (writer.isPresent) { http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index f62f734..7828298 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -17,22 +17,19 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark._ +import scala.reflect.ClassTag + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.sources.v2.reader.InputPartition -class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition) +class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: InputPartition[T]) extends Partition with Serializable -// TODO: we should have 2 RDDs: an RDD[InternalRow] for row-based scan, an `RDD[ColumnarBatch]` for -// columnar scan. -class DataSourceRDD( +class DataSourceRDD[T: ClassTag]( sc: SparkContext, - @transient private val inputPartitions: Seq[InputPartition], - partitionReaderFactory: PartitionReaderFactory, - columnarReads: Boolean) - extends RDD[InternalRow](sc, Nil) { + @transient private val inputPartitions: Seq[InputPartition[T]]) + extends RDD[T](sc, Nil) { override protected def getPartitions: Array[Partition] = { inputPartitions.zipWithIndex.map { @@ -40,21 +37,11 @@ class DataSourceRDD( }.toArray } - private def castPartition(split: Partition): DataSourceRDDPartition = split match { - case p: DataSourceRDDPartition => p - case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split") - } - - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val inputPartition = castPartition(split).inputPartition - val reader: PartitionReader[_] = if (columnarReads) { - partitionReaderFactory.createColumnarReader(inputPartition) - } else { - partitionReaderFactory.createReader(inputPartition) - } - + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition + .createPartitionReader() context.addTaskCompletionListener[Unit](_ => reader.close()) - val iter = new Iterator[Any] { + val iter = new Iterator[T] { private[this] var valuePrepared = false override def hasNext: Boolean = { @@ -64,7 +51,7 @@ class DataSourceRDD( valuePrepared } - override def next(): Any = { + override def next(): T = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } @@ -72,11 +59,10 @@ class DataSourceRDD( reader.get() } } - // TODO: SPARK-25083 remove the type erasure hack in data source scan - new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) + new InterruptibleIterator(context, iter) } override def getPreferredLocations(split: Partition): Seq[String] = { - castPartition(split).inputPartition.preferredLocations() + split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations() } } http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index f7e2959..abc5fb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -27,21 +27,21 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, BatchWriteSupportProvider, DataSourceOptions, DataSourceV2} -import org.apache.spark.sql.sources.v2.reader.{BatchReadSupport, ReadSupport, ScanConfigBuilder, SupportsReportStatistics} -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter import org.apache.spark.sql.types.StructType /** * A logical plan representing a data source v2 scan. * * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param options The options for this scan. Used to create fresh [[DataSourceReader]]. + * @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh + * [[DataSourceReader]]. */ case class DataSourceV2Relation( source: DataSourceV2, - readSupport: BatchReadSupport, output: Seq[AttributeReference], options: Map[String, String], tableIdent: Option[TableIdentifier] = None, @@ -58,12 +58,13 @@ case class DataSourceV2Relation( override def simpleString: String = "RelationV2 " + metadataString - def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) + def newReader(): DataSourceReader = source.createReader(options, userSpecifiedSchema) - override def computeStats(): Statistics = readSupport match { + def newWriter(): DataSourceWriter = source.createWriter(options, schema) + + override def computeStats(): Statistics = newReader match { case r: SupportsReportStatistics => - val statistics = r.estimateStatistics(readSupport.newScanConfigBuilder().build()) - Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) + Statistics(sizeInBytes = r.estimateStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } @@ -84,8 +85,7 @@ case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], source: DataSourceV2, options: Map[String, String], - readSupport: ReadSupport, - scanConfigBuilder: ScanConfigBuilder) + reader: DataSourceReader) extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat { override def isStreaming: Boolean = true @@ -99,8 +99,7 @@ case class StreamingDataSourceV2Relation( // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: StreamingDataSourceV2Relation => - output == other.output && readSupport.getClass == other.readSupport.getClass && - options == other.options + output == other.output && reader.getClass == other.reader.getClass && options == other.options case _ => false } @@ -108,10 +107,9 @@ case class StreamingDataSourceV2Relation( Seq(output, source, options).hashCode() } - override def computeStats(): Statistics = readSupport match { + override def computeStats(): Statistics = reader match { case r: SupportsReportStatistics => - val statistics = r.estimateStatistics(scanConfigBuilder.build()) - Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) + Statistics(sizeInBytes = r.estimateStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } @@ -119,19 +117,19 @@ case class StreamingDataSourceV2Relation( object DataSourceV2Relation { private implicit class SourceHelpers(source: DataSourceV2) { - def asReadSupportProvider: BatchReadSupportProvider = { + def asReadSupport: ReadSupport = { source match { - case provider: BatchReadSupportProvider => - provider + case support: ReadSupport => + support case _ => throw new AnalysisException(s"Data source is not readable: $name") } } - def asWriteSupportProvider: BatchWriteSupportProvider = { + def asWriteSupport: WriteSupport = { source match { - case provider: BatchWriteSupportProvider => - provider + case support: WriteSupport => + support case _ => throw new AnalysisException(s"Data source is not writable: $name") } @@ -146,26 +144,23 @@ object DataSourceV2Relation { } } - def createReadSupport( + def createReader( options: Map[String, String], - userSpecifiedSchema: Option[StructType]): BatchReadSupport = { + userSpecifiedSchema: Option[StructType]): DataSourceReader = { val v2Options = new DataSourceOptions(options.asJava) userSpecifiedSchema match { case Some(s) => - asReadSupportProvider.createBatchReadSupport(s, v2Options) + asReadSupport.createReader(s, v2Options) case _ => - asReadSupportProvider.createBatchReadSupport(v2Options) + asReadSupport.createReader(v2Options) } } - def createWriteSupport( + def createWriter( options: Map[String, String], - schema: StructType): BatchWriteSupport = { - asWriteSupportProvider.createBatchWriteSupport( - UUID.randomUUID().toString, - schema, - SaveMode.Append, - new DataSourceOptions(options.asJava)).get + schema: StructType): DataSourceWriter = { + val v2Options = new DataSourceOptions(options.asJava) + asWriteSupport.createWriter(UUID.randomUUID.toString, schema, SaveMode.Append, v2Options).get } } @@ -174,16 +169,15 @@ object DataSourceV2Relation { options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { - val readSupport = source.createReadSupport(options, userSpecifiedSchema) - val output = readSupport.fullSchema().toAttributes + val reader = source.createReader(options, userSpecifiedSchema) val ident = tableIdent.orElse(tableFromOptions(options)) DataSourceV2Relation( - source, readSupport, output, options, ident, userSpecifiedSchema) + source, reader.readSchema().toAttributes, options, ident, userSpecifiedSchema) } private def tableFromOptions(options: Map[String, String]): Option[TableIdentifier] = { options - .get(DataSourceOptions.TABLE_KEY) - .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY))) + .get(DataSourceOptions.TABLE_KEY) + .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 04a9773..c8494f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -26,7 +28,8 @@ import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeSta import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader +import org.apache.spark.sql.vectorized.ColumnarBatch /** * Physical plan node for scanning data from a data source. @@ -36,8 +39,7 @@ case class DataSourceV2ScanExec( @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], - @transient readSupport: ReadSupport, - @transient scanConfig: ScanConfig) + @transient reader: DataSourceReader) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString @@ -45,8 +47,7 @@ case class DataSourceV2ScanExec( // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: DataSourceV2ScanExec => - output == other.output && readSupport.getClass == other.readSupport.getClass && - options == other.options + output == other.output && reader.getClass == other.reader.getClass && options == other.options case _ => false } @@ -54,39 +55,36 @@ case class DataSourceV2ScanExec( Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = readSupport match { - case _ if partitions.length == 1 => + override def outputPartitioning: physical.Partitioning = reader match { + case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 => + SinglePartition + + case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 => + SinglePartition + + case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 => SinglePartition case s: SupportsReportPartitioning => new DataSourcePartitioning( - s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) + s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) case _ => super.outputPartitioning } - private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) - - private lazy val readerFactory = readSupport match { - case r: BatchReadSupport => r.createReaderFactory(scanConfig) - case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig) - case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig) - case _ => throw new IllegalStateException("unknown read support: " + readSupport) + private lazy val partitions: Seq[InputPartition[InternalRow]] = { + reader.planInputPartitions().asScala } - // TODO: clean this up when we have dedicated scan plan for continuous streaming. - override val supportsBatch: Boolean = { - require(partitions.forall(readerFactory.supportColumnarReads) || - !partitions.exists(readerFactory.supportColumnarReads), - "Cannot mix row-based and columnar input partitions.") - - partitions.exists(readerFactory.supportColumnarReads) + private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match { + case r: SupportsScanColumnarBatch if r.enableBatchRead() => + assert(!reader.isInstanceOf[ContinuousReader], + "continuous stream reader does not support columnar read yet.") + r.planBatchInputPartitions().asScala } - private lazy val inputRDD: RDD[InternalRow] = readSupport match { - case _: ContinuousReadSupport => - assert(!supportsBatch, - "continuous stream reader does not support columnar read yet.") + private lazy val inputRDD: RDD[InternalRow] = reader match { + case _: ContinuousReader => EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) @@ -95,17 +93,22 @@ case class DataSourceV2ScanExec( sparkContext, sqlContext.conf.continuousStreamingExecutorQueueSize, sqlContext.conf.continuousStreamingExecutorPollIntervalMs, - partitions, - schema, - readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) + partitions).asInstanceOf[RDD[InternalRow]] + + case r: SupportsScanColumnarBatch if r.enableBatchRead() => + new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]] case _ => - new DataSourceRDD( - sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch) + new DataSourceRDD(sparkContext, partitions).asInstanceOf[RDD[InternalRow]] } override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) + override val supportsBatch: Boolean = reader match { + case r: SupportsScanColumnarBatch if r.enableBatchRead() => true + case _ => false + } + override protected def needsUnsafeRowConversion: Boolean = false override protected def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 9a3109e..9d97d3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Rep import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader object DataSourceV2Strategy extends Strategy { @@ -37,9 +37,9 @@ object DataSourceV2Strategy extends Strategy { * @return pushed filter and post-scan filters. */ private def pushFilters( - configBuilder: ScanConfigBuilder, + reader: DataSourceReader, filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { - configBuilder match { + reader match { case r: SupportsPushDownFilters => // A map from translated data source filters to original catalyst filter expressions. val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] @@ -71,43 +71,41 @@ object DataSourceV2Strategy extends Strategy { /** * Applies column pruning to the data source, w.r.t. the references of the given expressions. * - * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), - * and new output attributes after column pruning. + * @return new output attributes after column pruning. */ // TODO: nested column pruning. private def pruneColumns( - configBuilder: ScanConfigBuilder, + reader: DataSourceReader, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (ScanConfig, Seq[AttributeReference]) = { - configBuilder match { + exprs: Seq[Expression]): Seq[AttributeReference] = { + reader match { case r: SupportsPushDownRequiredColumns => val requiredColumns = AttributeSet(exprs.flatMap(_.references)) val neededOutput = relation.output.filter(requiredColumns.contains) if (neededOutput != relation.output) { r.pruneColumns(neededOutput.toStructType) - val config = r.build() val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap - config -> config.readSchema().toAttributes.map { + r.readSchema().toAttributes.map { // We have to keep the attribute id during transformation. a => a.withExprId(nameToAttr(a.name).exprId) } } else { - r.build() -> relation.output + relation.output } - case _ => configBuilder.build() -> relation.output + case _ => relation.output } } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => - val configBuilder = relation.readSupport.newScanConfigBuilder() + val reader = relation.newReader() // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFilters) = pushFilters(configBuilder, filters) - val (config, output) = pruneColumns(configBuilder, relation, project ++ postScanFilters) + val (pushedFilters, postScanFilters) = pushFilters(reader, filters) + val output = pruneColumns(reader, relation, project ++ postScanFilters) logInfo( s""" |Pushing operators to ${relation.source.getClass} @@ -117,12 +115,7 @@ object DataSourceV2Strategy extends Strategy { """.stripMargin) val scan = DataSourceV2ScanExec( - output, - relation.source, - relation.options, - pushedFilters, - relation.readSupport, - config) + output, relation.source, relation.options, pushedFilters, reader) val filterCondition = postScanFilters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) @@ -131,26 +124,22 @@ object DataSourceV2Strategy extends Strategy { ProjectExec(project, withFilter) :: Nil case r: StreamingDataSourceV2Relation => - // TODO: support operator pushdown for streaming data sources. - val scanConfig = r.scanConfigBuilder.build() // ensure there is a projection, which will produce unsafe rows required by some operators ProjectExec(r.output, - DataSourceV2ScanExec( - r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil + DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader)) :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil + WriteToDataSourceV2Exec(r.newWriter(), planLater(query)) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil case Repartition(1, false, child) => - val isContinuous = child.find { - case s: StreamingDataSourceV2Relation => s.readSupport.isInstanceOf[ContinuousReadSupport] - case _ => false + val isContinuous = child.collectFirst { + case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r }.isDefined if (isContinuous) { http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index e9cc399..5267f5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -21,7 +21,6 @@ import java.util.regex.Pattern import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport} private[sql] object DataSourceV2Utils extends Logging { @@ -56,12 +55,4 @@ private[sql] object DataSourceV2Utils extends Logging { case _ => Map.empty } - - def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = { - val name = ds match { - case register: DataSourceRegister => register.shortName() - case _ => ds.getClass.getName - } - throw new UnsupportedOperationException(name + " source does not support user-specified schema") - } } http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c3f7b69..59ebb9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -23,11 +23,15 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.streaming.MicroBatchExecution import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils /** @@ -35,8 +39,7 @@ import org.apache.spark.util.Utils * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. */ @deprecated("Use specific logical plans like AppendData instead", "2.4.0") -case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPlan) - extends LogicalPlan { +case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil } @@ -44,48 +47,46 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl /** * The physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) - extends SparkPlan { - +case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = writeSupport.createBatchWriterFactory() - val useCommitCoordinator = writeSupport.useCommitCoordinator + val writeTask = writer.createWriterFactory() + val useCommitCoordinator = writer.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) - logInfo(s"Start processing data source write support: $writeSupport. " + + logInfo(s"Start processing data source writer: $writer. " + s"The input RDD has ${messages.length} partitions.") try { sparkContext.runJob( rdd, (context: TaskContext, iter: Iterator[InternalRow]) => - DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator), + DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator), rdd.partitions.indices, (index, message: WriterCommitMessage) => { messages(index) = message - writeSupport.onDataWriterCommit(message) + writer.onDataWriterCommit(message) } ) - logInfo(s"Data source write support $writeSupport is committing.") - writeSupport.commit(messages) - logInfo(s"Data source write support $writeSupport committed.") + logInfo(s"Data source writer $writer is committing.") + writer.commit(messages) + logInfo(s"Data source writer $writer committed.") } catch { case cause: Throwable => - logError(s"Data source write support $writeSupport is aborting.") + logError(s"Data source writer $writer is aborting.") try { - writeSupport.abort(messages) + writer.abort(messages) } catch { case t: Throwable => - logError(s"Data source write support $writeSupport failed to abort.") + logError(s"Data source writer $writer failed to abort.") cause.addSuppressed(t) throw new SparkException("Writing job failed.", cause) } - logError(s"Data source write support $writeSupport aborted.") + logError(s"Data source writer $writer aborted.") cause match { // Only wrap non fatal exceptions. case NonFatal(e) => throw new SparkException("Writing job aborted.", e) @@ -99,7 +100,7 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark object DataWritingSparkTask extends Logging { def run( - writerFactory: DataWriterFactory, + writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow], useCommitCoordinator: Boolean): WriterCommitMessage = { @@ -108,7 +109,8 @@ object DataWritingSparkTask extends Logging { val partId = context.partitionId() val taskId = context.taskAttemptId() val attemptId = context.attemptNumber() - val dataWriter = writerFactory.createWriter(partId, taskId) + val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0") + val dataWriter = writeTask.createDataWriter(partId, taskId, epochId.toLong) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org