Repository: spark
Updated Branches:
  refs/heads/master 65a8bf603 -> 5014d6e25


[SPARK-22078][SQL] clarify exception behaviors for all data source v2 interfaces

## What changes were proposed in this pull request?

clarify exception behaviors for all data source v2 interfaces.

## How was this patch tested?

document change only

Author: Wenchen Fan <[email protected]>

Closes #19623 from cloud-fan/data-source-exception.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5014d6e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5014d6e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5014d6e2

Branch: refs/heads/master
Commit: 5014d6e2568021e6958eddc9cfb4c512ed7424a2
Parents: 65a8bf6
Author: Wenchen Fan <[email protected]>
Authored: Mon Nov 6 22:25:11 2017 +0100
Committer: Wenchen Fan <[email protected]>
Committed: Mon Nov 6 22:25:11 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/sources/v2/ReadSupport.java       |  3 +++
 .../sql/sources/v2/ReadSupportWithSchema.java   |  3 +++
 .../spark/sql/sources/v2/WriteSupport.java      |  3 +++
 .../spark/sql/sources/v2/reader/DataReader.java | 11 +++++++-
 .../sources/v2/reader/DataSourceV2Reader.java   |  9 +++++++
 .../spark/sql/sources/v2/reader/ReadTask.java   | 12 ++++++++-
 .../reader/SupportsPushDownCatalystFilters.java |  1 -
 .../v2/reader/SupportsScanUnsafeRow.java        |  1 -
 .../sources/v2/writer/DataSourceV2Writer.java   | 28 +++++++++++++-------
 .../spark/sql/sources/v2/writer/DataWriter.java | 20 +++++++++-----
 .../sources/v2/writer/DataWriterFactory.java    |  3 +++
 .../v2/writer/SupportsWriteInternalRow.java     |  3 ---
 12 files changed, 74 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
index ee489ad..948e20b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
@@ -30,6 +30,9 @@ public interface ReadSupport {
   /**
    * Creates a {@link DataSourceV2Reader} to scan the data from this data 
source.
    *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
+   *
    * @param options the options for the returned data source reader, which is 
an immutable
    *                case-insensitive string-to-string map.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
index 74e81a2..b69c6be 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
@@ -35,6 +35,9 @@ public interface ReadSupportWithSchema {
   /**
    * Create a {@link DataSourceV2Reader} to scan the data from this data 
source.
    *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
+   *
    * @param schema the full schema of this data source reader. Full schema 
usually maps to the
    *               physical schema of the underlying storage of this data 
source reader, e.g.
    *               CSV files, JSON files, etc, while this reader may not read 
data with full

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
index 8fdfdfd..1e3b644 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
@@ -35,6 +35,9 @@ public interface WriteSupport {
    * Creates an optional {@link DataSourceV2Writer} to save the data to this 
data source. Data
    * sources can return None if there is no writing needed to be done 
according to the save mode.
    *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
+   *
    * @param jobId A unique string for the writing job. It's possible that 
there are many writing
    *              jobs running at the same time, and the returned {@link 
DataSourceV2Writer} can
    *              use this job id to distinguish itself from other jobs.

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
index 52bb138..8f58c86 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import java.io.Closeable;
+import java.io.IOException;
 
 import org.apache.spark.annotation.InterfaceStability;
 
@@ -34,11 +35,19 @@ public interface DataReader<T> extends Closeable {
 
   /**
    * Proceed to next record, returns false if there is no more records.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
+   * get retried until hitting the maximum retry times.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
reading files.
    */
-  boolean next();
+  boolean next() throws IOException;
 
   /**
    * Return the current record. This method should return same value until 
`next` is called.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
+   * get retried until hitting the maximum retry times.
    */
   T get();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
index 88c3219..95ee4a8 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
@@ -40,6 +40,9 @@ import org.apache.spark.sql.types.StructType;
  *   3. Special scans. E.g, columnar scan, unsafe row scan, etc.
  *      Names of these interfaces start with `SupportsScan`.
  *
+ * If an exception was throw when applying any of these query optimizations, 
the action would fail
+ * and no Spark job was submitted.
+ *
  * Spark first applies all operator push-down optimizations that this data 
source supports. Then
  * Spark collects information this data source reported for further 
optimizations. Finally Spark
  * issues the scan request and does the actual data reading.
@@ -50,6 +53,9 @@ public interface DataSourceV2Reader {
   /**
    * Returns the actual schema of this data source reader, which may be 
different from the physical
    * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
    */
   StructType readSchema();
 
@@ -61,6 +67,9 @@ public interface DataSourceV2Reader {
    * Note that, this may not be a full scan if the data source reader mixes in 
other optimization
    * interfaces like column pruning, filter push-down, etc. These 
optimizations are applied before
    * Spark issues the scan request.
+   *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
    */
   List<ReadTask<Row>> createReadTasks();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
index 44786db..fa161cd 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
@@ -36,7 +36,14 @@ public interface ReadTask<T> extends Serializable {
   /**
    * The preferred locations where this read task can run faster, but Spark 
does not guarantee that
    * this task will always run on these locations. The implementations should 
make sure that it can
-   * be run on any location. The location is a string representing the host 
name of an executor.
+   * be run on any location. The location is a string representing the host 
name.
+   *
+   * Note that if a host name cannot be recognized by Spark, it will be 
ignored as it was not in
+   * the returned locations. By default this method returns empty string 
array, which means this
+   * task has no location preference.
+   *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
    */
   default String[] preferredLocations() {
     return new String[0];
@@ -44,6 +51,9 @@ public interface ReadTask<T> extends Serializable {
 
   /**
    * Returns a data reader to do the actual reading work for this read task.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark 
task would fail and
+   * get retried until hitting the maximum retry times.
    */
   DataReader<T> createDataReader();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
index efc4224..f76c687 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.sources.v2.reader;
 
-import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.catalyst.expressions.Expression;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
index 6008fb5..b90ec88 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
@@ -19,7 +19,6 @@ package org.apache.spark.sql.sources.v2.reader;
 
 import java.util.List;
 
-import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
index 37bb15f..fc37b9a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
@@ -30,6 +30,9 @@ import org.apache.spark.sql.types.StructType;
  * It can mix in various writing optimization interfaces to speed up the data 
saving. The actual
  * writing logic is delegated to {@link DataWriter}.
  *
+ * If an exception was throw when applying any of these writing optimizations, 
the action would fail
+ * and no Spark job was submitted.
+ *
  * The writing procedure is:
  *   1. Create a writer factory by {@link #createWriterFactory()}, serialize 
and send it to all the
  *      partitions of the input data(RDD).
@@ -50,28 +53,33 @@ public interface DataSourceV2Writer {
 
   /**
    * Creates a writer factory which will be serialized and sent to executors.
+   *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
    */
   DataWriterFactory<Row> createWriterFactory();
 
   /**
    * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link DataWriter#commit()}. 
If this method
-   * fails(throw exception), this writing job is considered to be failed, and
-   * {@link #abort(WriterCommitMessage[])} will be called. The written data 
should only be visible
-   * to data source readers if this method succeeds.
+   * successful data writers and are produced by {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
+   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
    *
    * Note that, one partition may have multiple committed data writers because 
of speculative tasks.
    * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a mechanism to make 
sure only one data writer
-   * can commit successfully, or have a way to clean up the data of 
already-committed writers.
+   * aware of this and handle it correctly, e.g., have a coordinator to make 
sure only one data
+   * writer can commit, or have a way to clean up the data of 
already-committed writers.
    */
   void commit(WriterCommitMessage[] messages);
 
   /**
-   * Aborts this writing job because some data writers are failed to write the 
records and aborted,
-   * or the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])}
-   * fails. If this method fails(throw exception), the underlying data source 
may have garbage that
-   * need to be cleaned manually, but these garbage should not be visible to 
data source readers.
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
+   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
    *
    * Unless the abort is triggered by the failure of commit, the given 
messages should have some
    * null slots as there maybe only a few data writers that are committed 
before the abort

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index dc1aab3..04b03e6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources.v2.writer;
 
+import java.io.IOException;
+
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
@@ -59,8 +61,10 @@ public interface DataWriter<T> {
    *
    * If this method fails (by throwing an exception), {@link #abort()} will be 
called and this
    * data writer is considered to have been failed.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
writing files.
    */
-  void write(T record);
+  void write(T record) throws IOException;
 
   /**
    * Commits this writer after all records are written successfully, returns a 
commit message which
@@ -74,8 +78,10 @@ public interface DataWriter<T> {
    *
    * If this method fails (by throwing an exception), {@link #abort()} will be 
called and this
    * data writer is considered to have been failed.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
writing files.
    */
-  WriterCommitMessage commit();
+  WriterCommitMessage commit() throws IOException;
 
   /**
    * Aborts this writer if it is failed. Implementations should clean up the 
data for already
@@ -84,9 +90,11 @@ public interface DataWriter<T> {
    * This method will only be called if there is one record failed to write, 
or {@link #commit()}
    * failed.
    *
-   * If this method fails(throw exception), the underlying data source may 
have garbage that need
-   * to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} 
or manually, but
-   * these garbage should not be visible to data source readers.
+   * If this method fails(by throwing an exception), the underlying data 
source may have garbage
+   * that need to be cleaned by {@link 
DataSourceV2Writer#abort(WriterCommitMessage[])} or manually,
+   * but these garbage should not be visible to data source readers.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
writing files.
    */
-  void abort();
+  void abort() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index fe56cc0..18ec792 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -35,6 +35,9 @@ public interface DataWriterFactory<T> extends Serializable {
   /**
    * Returns a data writer to do the actual writing work.
    *
+   * If this method fails (by throwing an exception), the action would fail 
and no Spark job was
+   * submitted.
+   *
    * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
    *                    Usually Spark processes many RDD partitions at the 
same time,
    *                    implementations should use the partition id to 
distinguish writers for

http://git-wip-us.apache.org/repos/asf/spark/blob/5014d6e2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
index a8e9590..3e05188 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.sources.v2.writer;
 
-import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -29,8 +28,6 @@ import org.apache.spark.sql.catalyst.InternalRow;
  * changed in the future Spark versions.
  */
 
[email protected]
-@Experimental
 @InterfaceStability.Unstable
 public interface SupportsWriteInternalRow extends DataSourceV2Writer {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to