Repository: spark Updated Branches: refs/heads/master 65a8bf603 -> 5014d6e25
[SPARK-22078][SQL] clarify exception behaviors for all data source v2 interfaces ## What changes were proposed in this pull request? clarify exception behaviors for all data source v2 interfaces. ## How was this patch tested? document change only Author: Wenchen Fan <[email protected]> Closes #19623 from cloud-fan/data-source-exception. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5014d6e2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5014d6e2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5014d6e2 Branch: refs/heads/master Commit: 5014d6e2568021e6958eddc9cfb4c512ed7424a2 Parents: 65a8bf6 Author: Wenchen Fan <[email protected]> Authored: Mon Nov 6 22:25:11 2017 +0100 Committer: Wenchen Fan <[email protected]> Committed: Mon Nov 6 22:25:11 2017 +0100 ---------------------------------------------------------------------- .../spark/sql/sources/v2/ReadSupport.java | 3 +++ .../sql/sources/v2/ReadSupportWithSchema.java | 3 +++ .../spark/sql/sources/v2/WriteSupport.java | 3 +++ .../spark/sql/sources/v2/reader/DataReader.java | 11 +++++++- .../sources/v2/reader/DataSourceV2Reader.java | 9 +++++++ .../spark/sql/sources/v2/reader/ReadTask.java | 12 ++++++++- .../reader/SupportsPushDownCatalystFilters.java | 1 - .../v2/reader/SupportsScanUnsafeRow.java | 1 - .../sources/v2/writer/DataSourceV2Writer.java | 28 +++++++++++++------- .../spark/sql/sources/v2/writer/DataWriter.java | 20 +++++++++----- .../sources/v2/writer/DataWriterFactory.java | 3 +++ .../v2/writer/SupportsWriteInternalRow.java | 3 --- 12 files changed, 74 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java index ee489ad..948e20b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java @@ -30,6 +30,9 @@ public interface ReadSupport { /** * Creates a {@link DataSourceV2Reader} to scan the data from this data source. * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * * @param options the options for the returned data source reader, which is an immutable * case-insensitive string-to-string map. */ http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java index 74e81a2..b69c6be 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java @@ -35,6 +35,9 @@ public interface ReadSupportWithSchema { /** * Create a {@link DataSourceV2Reader} to scan the data from this data source. * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * * @param schema the full schema of this data source reader. Full schema usually maps to the * physical schema of the underlying storage of this data source reader, e.g. * CSV files, JSON files, etc, while this reader may not read data with full http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java index 8fdfdfd..1e3b644 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java @@ -35,6 +35,9 @@ public interface WriteSupport { * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data * sources can return None if there is no writing needed to be done according to the save mode. * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * * @param jobId A unique string for the writing job. It's possible that there are many writing * jobs running at the same time, and the returned {@link DataSourceV2Writer} can * use this job id to distinguish itself from other jobs. http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java index 52bb138..8f58c86 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources.v2.reader; import java.io.Closeable; +import java.io.IOException; import org.apache.spark.annotation.InterfaceStability; @@ -34,11 +35,19 @@ public interface DataReader<T> extends Closeable { /** * Proceed to next record, returns false if there is no more records. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + * + * @throws IOException if failure happens during disk/network IO like reading files. */ - boolean next(); + boolean next() throws IOException; /** * Return the current record. This method should return same value until `next` is called. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. */ T get(); } http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java index 88c3219..95ee4a8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -40,6 +40,9 @@ import org.apache.spark.sql.types.StructType; * 3. Special scans. E.g, columnar scan, unsafe row scan, etc. * Names of these interfaces start with `SupportsScan`. * + * If an exception was throw when applying any of these query optimizations, the action would fail + * and no Spark job was submitted. + * * Spark first applies all operator push-down optimizations that this data source supports. Then * Spark collects information this data source reported for further optimizations. Finally Spark * issues the scan request and does the actual data reading. @@ -50,6 +53,9 @@ public interface DataSourceV2Reader { /** * Returns the actual schema of this data source reader, which may be different from the physical * schema of the underlying storage, as column pruning or other optimizations may happen. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. */ StructType readSchema(); @@ -61,6 +67,9 @@ public interface DataSourceV2Reader { * Note that, this may not be a full scan if the data source reader mixes in other optimization * interfaces like column pruning, filter push-down, etc. These optimizations are applied before * Spark issues the scan request. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. */ List<ReadTask<Row>> createReadTasks(); } http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java index 44786db..fa161cd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -36,7 +36,14 @@ public interface ReadTask<T> extends Serializable { /** * The preferred locations where this read task can run faster, but Spark does not guarantee that * this task will always run on these locations. The implementations should make sure that it can - * be run on any location. The location is a string representing the host name of an executor. + * be run on any location. The location is a string representing the host name. + * + * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in + * the returned locations. By default this method returns empty string array, which means this + * task has no location preference. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. */ default String[] preferredLocations() { return new String[0]; @@ -44,6 +51,9 @@ public interface ReadTask<T> extends Serializable { /** * Returns a data reader to do the actual reading work for this read task. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. */ DataReader<T> createDataReader(); } http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java index efc4224..f76c687 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java @@ -17,7 +17,6 @@ package org.apache.spark.sql.sources.v2.reader; -import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.expressions.Expression; http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java index 6008fb5..b90ec88 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java @@ -19,7 +19,6 @@ package org.apache.spark.sql.sources.v2.reader; import java.util.List; -import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java index 37bb15f..fc37b9a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java @@ -30,6 +30,9 @@ import org.apache.spark.sql.types.StructType; * It can mix in various writing optimization interfaces to speed up the data saving. The actual * writing logic is delegated to {@link DataWriter}. * + * If an exception was throw when applying any of these writing optimizations, the action would fail + * and no Spark job was submitted. + * * The writing procedure is: * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the * partitions of the input data(RDD). @@ -50,28 +53,33 @@ public interface DataSourceV2Writer { /** * Creates a writer factory which will be serialized and sent to executors. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. */ DataWriterFactory<Row> createWriterFactory(); /** * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. If this method - * fails(throw exception), this writing job is considered to be failed, and - * {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible - * to data source readers if this method succeeds. + * successful data writers and are produced by {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination + * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * * Note that, one partition may have multiple committed data writers because of speculative tasks. * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer - * can commit successfully, or have a way to clean up the data of already-committed writers. + * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data + * writer can commit, or have a way to clean up the data of already-committed writers. */ void commit(WriterCommitMessage[] messages); /** - * Aborts this writing job because some data writers are failed to write the records and aborted, - * or the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} - * fails. If this method fails(throw exception), the underlying data source may have garbage that - * need to be cleaned manually, but these garbage should not be visible to data source readers. + * Aborts this writing job because some data writers are failed and keep failing when retry, or + * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * + * If this method fails (by throwing an exception), the underlying data source may require manual + * cleanup. * * Unless the abort is triggered by the failure of commit, the given messages should have some * null slots as there maybe only a few data writers that are committed before the abort http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index dc1aab3..04b03e6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources.v2.writer; +import java.io.IOException; + import org.apache.spark.annotation.InterfaceStability; /** @@ -59,8 +61,10 @@ public interface DataWriter<T> { * * If this method fails (by throwing an exception), {@link #abort()} will be called and this * data writer is considered to have been failed. + * + * @throws IOException if failure happens during disk/network IO like writing files. */ - void write(T record); + void write(T record) throws IOException; /** * Commits this writer after all records are written successfully, returns a commit message which @@ -74,8 +78,10 @@ public interface DataWriter<T> { * * If this method fails (by throwing an exception), {@link #abort()} will be called and this * data writer is considered to have been failed. + * + * @throws IOException if failure happens during disk/network IO like writing files. */ - WriterCommitMessage commit(); + WriterCommitMessage commit() throws IOException; /** * Aborts this writer if it is failed. Implementations should clean up the data for already @@ -84,9 +90,11 @@ public interface DataWriter<T> { * This method will only be called if there is one record failed to write, or {@link #commit()} * failed. * - * If this method fails(throw exception), the underlying data source may have garbage that need - * to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually, but - * these garbage should not be visible to data source readers. + * If this method fails(by throwing an exception), the underlying data source may have garbage + * that need to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually, + * but these garbage should not be visible to data source readers. + * + * @throws IOException if failure happens during disk/network IO like writing files. */ - void abort(); + void abort() throws IOException; } http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index fe56cc0..18ec792 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -35,6 +35,9 @@ public interface DataWriterFactory<T> extends Serializable { /** * Returns a data writer to do the actual writing work. * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. + * * @param partitionId A unique id of the RDD partition that the returned writer will process. * Usually Spark processes many RDD partitions at the same time, * implementations should use the partition id to distinguish writers for http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java index a8e9590..3e05188 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java @@ -17,7 +17,6 @@ package org.apache.spark.sql.sources.v2.writer; -import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -29,8 +28,6 @@ import org.apache.spark.sql.catalyst.InternalRow; * changed in the future Spark versions. */ [email protected] -@Experimental @InterfaceStability.Unstable public interface SupportsWriteInternalRow extends DataSourceV2Writer { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
