This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 954ef96  [SPARK-25530][SQL] data source v2 API refactor (batch write)
954ef96 is described below

commit 954ef96c495268e3a22b7b661ce45c558b532c65
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Jan 15 13:53:48 2019 -0800

    [SPARK-25530][SQL] data source v2 API refactor (batch write)
    
    ## What changes were proposed in this pull request?
    
    Adjust the batch write API to match the read API refactor after 
https://github.com/apache/spark/pull/23086
    
    The doc with high-level ideas:
    
https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing
    
    Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, 
and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans 
up some code as batch API is completed.
    
    This PR also removes the test from 
https://github.com/apache/spark/pull/22688 . Now data source must return a 
table for read/write.
    
    A few notes about future changes:
    1. We will create `SupportsStreamingWrite` later for streaming APIs
    2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. 
for the new end-user write APIs. I think streaming APIs would remain to use 
`OutputMode`, and new end-user write APIs will apply to batch only, at least in 
the near future.
    3. We will remove `SaveMode` from data source API: 
https://issues.apache.org/jira/browse/SPARK-26356
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23208 from cloud-fan/refactor-batch.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../sql/sources/v2/BatchWriteSupportProvider.java  | 59 --------------
 .../apache/spark/sql/sources/v2/DataSourceV2.java  | 10 +--
 .../{SupportsRead.java => SupportsBatchWrite.java} | 23 +++---
 .../apache/spark/sql/sources/v2/SupportsRead.java  |  5 +-
 .../v2/{SupportsRead.java => SupportsWrite.java}   | 18 ++---
 .../{BatchWriteSupport.java => BatchWrite.java}    |  2 +-
 .../spark/sql/sources/v2/writer/DataWriter.java    | 12 +--
 .../sql/sources/v2/writer/DataWriterFactory.java   |  2 +-
 .../SupportsSaveMode.java}                         | 21 ++---
 .../spark/sql/sources/v2/writer/WriteBuilder.java  | 69 ++++++++++++++++
 .../sql/sources/v2/writer/WriterCommitMessage.java |  2 +-
 .../org/apache/spark/sql/DataFrameReader.scala     |  8 +-
 .../org/apache/spark/sql/DataFrameWriter.scala     | 50 ++++++------
 .../datasources/v2/DataSourceV2Relation.scala      | 89 ++++++---------------
 .../datasources/v2/DataSourceV2Strategy.scala      | 14 +++-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 26 +++----
 .../execution/streaming/MicroBatchExecution.scala  |  4 +-
 ...atchWritSupport.scala => MicroBatchWrite.scala} |  7 +-
 .../streaming/sources/PackedRowWriterFactory.scala |  4 +-
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 28 +++----
 .../sql/sources/v2/SimpleWritableDataSource.scala  | 91 +++++++++++++---------
 21 files changed, 262 insertions(+), 282 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
deleted file mode 100644
index df439e2..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability for batch processing.
- *
- * This interface is used to create {@link BatchWriteSupport} instances when 
end users run
- * {@code Dataset.write.format(...).option(...).save()}.
- */
-@Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates an optional {@link BatchWriteSupport} instance to save the data 
to this data source,
-   * which is called by Spark at the beginning of each batch query.
-   *
-   * Data sources can return None if there is no writing needed to be done 
according to the save
-   * mode.
-   *
-   * @param queryId A unique string for the writing query. It's possible that 
there are many
-   *                writing queries running at the same time, and the returned
-   *                {@link BatchWriteSupport} can use this id to distinguish 
itself from others.
-   * @param schema the schema of the data to be written.
-   * @param mode the save mode which determines what to do when the data are 
already in this data
-   *             source, please refer to {@link SaveMode} for more details.
-   * @param options the options for the returned data source writer, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   * @return a write support to write data to this data source.
-   */
-  Optional<BatchWriteSupport> createBatchWriteSupport(
-      String queryId,
-      StructType schema,
-      SaveMode mode,
-      DataSourceOptions options);
-}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
index eae7a45..4aaa57d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -20,15 +20,7 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * The base interface for data source v2. Implementations must have a public, 
0-arg constructor.
- *
- * Note that this is an empty interface. Data source implementations must mix 
in interfaces such as
- * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, 
which can provide
- * batch or streaming read/write support instances. Otherwise it's just a 
dummy data source which
- * is un-readable/writable.
- *
- * If Spark fails to execute any methods in the implementations of this 
interface (by throwing an
- * exception), the read action will fail and no Spark job will be submitted.
+ * TODO: remove it when we finish the API refactor for streaming side.
  */
 @Evolving
 public interface DataSourceV2 {}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
similarity index 59%
copy from 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
copy to 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
index e22738d..08caadd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
@@ -17,19 +17,16 @@
 
 package org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
 
 /**
- * An internal base interface of mix-in interfaces for readable {@link Table}. 
This adds
- * {@link #newScanBuilder(DataSourceOptions)} that is used to create a scan 
for batch, micro-batch,
- * or continuous processing.
+ * An empty mix-in interface for {@link Table}, to indicate this table 
supports batch write.
+ * <p>
+ * If a {@link Table} implements this interface, the
+ * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)}  must return a 
{@link WriteBuilder}
+ * with {@link WriteBuilder#buildForBatch()} implemented.
+ * </p>
  */
-interface SupportsRead extends Table {
-
-  /**
-   * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. 
Spark will call this
-   * method to configure each scan.
-   */
-  ScanBuilder newScanBuilder(DataSourceOptions options);
-}
+@Evolving
+public interface SupportsBatchWrite extends SupportsWrite {}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
index e22738d..5031c71 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
@@ -29,7 +29,10 @@ interface SupportsRead extends Table {
 
   /**
    * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. 
Spark will call this
-   * method to configure each scan.
+   * method to configure each data source scan.
+   *
+   * @param options The options for reading, which is an immutable 
case-insensitive
+   *                string-to-string map.
    */
   ScanBuilder newScanBuilder(DataSourceOptions options);
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
similarity index 62%
copy from 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
copy to 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
index e22738d..ecdfe20 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
@@ -17,19 +17,19 @@
 
 package org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.sources.v2.writer.BatchWrite;
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
 
 /**
- * An internal base interface of mix-in interfaces for readable {@link Table}. 
This adds
- * {@link #newScanBuilder(DataSourceOptions)} that is used to create a scan 
for batch, micro-batch,
- * or continuous processing.
+ * An internal base interface of mix-in interfaces for writable {@link Table}. 
This adds
+ * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
+ * for batch or streaming.
  */
-interface SupportsRead extends Table {
+interface SupportsWrite extends Table {
 
   /**
-   * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. 
Spark will call this
-   * method to configure each scan.
+   * Returns a {@link WriteBuilder} which can be used to create {@link 
BatchWrite}. Spark will call
+   * this method to configure each data source write.
    */
-  ScanBuilder newScanBuilder(DataSourceOptions options);
+  WriteBuilder newWriteBuilder(DataSourceOptions options);
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
 b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java
similarity index 99%
rename from 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
rename to 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java
index efe1ac4..9129775 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java
@@ -38,7 +38,7 @@ import org.apache.spark.annotation.Evolving;
  * Please refer to the documentation of commit/abort methods for detailed 
specifications.
  */
 @Evolving
-public interface BatchWriteSupport {
+public interface BatchWrite {
 
   /**
    * Creates a writer factory which will be serialized and sent to executors.
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index d142ee5..11228ad 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -36,11 +36,11 @@ import org.apache.spark.annotation.Evolving;
  *
  * If this data writer succeeds(all records are successfully written and 
{@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver side 
and pass to
- * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit 
messages from other data
+ * {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from 
other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark may retry this writing 
task a few times.
  * In each retry, {@link DataWriterFactory#createWriter(int, long)} will 
receive a
- * different `taskId`. Spark will call {@link 
BatchWriteSupport#abort(WriterCommitMessage[])}
+ * different `taskId`. Spark will call {@link 
BatchWrite#abort(WriterCommitMessage[])}
  * when the configured number of retries is exhausted.
  *
  * Besides the retry mechanism, Spark may launch speculative tasks if the 
existing writing task
@@ -71,11 +71,11 @@ public interface DataWriter<T> {
   /**
    * Commits this writer after all records are written successfully, returns a 
commit message which
    * will be sent back to driver side and passed to
-   * {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
+   * {@link BatchWrite#commit(WriterCommitMessage[])}.
    *
    * The written data should only be visible to data source readers after
-   * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which 
means this method
-   * should still "hide" the written data and ask the {@link 
BatchWriteSupport} at driver side to
+   * {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means 
this method
+   * should still "hide" the written data and ask the {@link BatchWrite} at 
driver side to
    * do the final commit via {@link WriterCommitMessage}.
    *
    * If this method fails (by throwing an exception), {@link #abort()} will be 
called and this
@@ -93,7 +93,7 @@ public interface DataWriter<T> {
    * failed.
    *
    * If this method fails(by throwing an exception), the underlying data 
source may have garbage
-   * that need to be cleaned by {@link 
BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
+   * that need to be cleaned by {@link 
BatchWrite#abort(WriterCommitMessage[])} or manually,
    * but these garbage should not be visible to data source readers.
    *
    * @throws IOException if failure happens during disk/network IO like 
writing files.
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index 65105f4..bf2db90 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -24,7 +24,7 @@ import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 /**
- * A factory of {@link DataWriter} returned by {@link 
BatchWriteSupport#createBatchWriterFactory()},
+ * A factory of {@link DataWriter} returned by {@link 
BatchWrite#createBatchWriterFactory()},
  * which is responsible for creating and initializing the actual data writer 
at executor side.
  *
  * Note that, the writer factory will be serialized and sent to executors, 
then the data writer
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
similarity index 56%
copy from 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
copy to 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
index e22738d..c4295f2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
@@ -15,21 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.sources.v2;
+package org.apache.spark.sql.sources.v2.writer;
 
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.SaveMode;
 
-/**
- * An internal base interface of mix-in interfaces for readable {@link Table}. 
This adds
- * {@link #newScanBuilder(DataSourceOptions)} that is used to create a scan 
for batch, micro-batch,
- * or continuous processing.
- */
-interface SupportsRead extends Table {
-
-  /**
-   * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. 
Spark will call this
-   * method to configure each scan.
-   */
-  ScanBuilder newScanBuilder(DataSourceOptions options);
+// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be 
removed before
+// Spark 3.0 when all the new write operators are finished. See SPARK-26356 
for more details.
+public interface SupportsSaveMode extends WriteBuilder {
+  WriteBuilder mode(SaveMode mode);
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
new file mode 100644
index 0000000..e861c72
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface for building the {@link BatchWrite}. Implementations can mix 
in some interfaces to
+ * support different ways to write data to data sources.
+ *
+ * Unless modified by a mixin interface, the {@link BatchWrite} configured by 
this builder is to
+ * append data without affecting existing data.
+ */
+@Evolving
+public interface WriteBuilder {
+
+  /**
+   * Passes the `queryId` from Spark to data source. `queryId` is a unique 
string of the query. It's
+   * possible that there are many queries running at the same time, or a query 
is restarted and
+   * resumed. {@link BatchWrite} can use this id to identify the query.
+   *
+   * @return a new builder with the `queryId`. By default it returns `this`, 
which means the given
+   *         `queryId` is ignored. Please override this method to take the 
`queryId`.
+   */
+  default WriteBuilder withQueryId(String queryId) {
+    return this;
+  }
+
+  /**
+   * Passes the schema of the input data from Spark to data source.
+   *
+   * @return a new builder with the `schema`. By default it returns `this`, 
which means the given
+   *         `schema` is ignored. Please override this method to take the 
`schema`.
+   */
+  default WriteBuilder withInputDataSchema(StructType schema) {
+    return this;
+  }
+
+  /**
+   * Returns a {@link BatchWrite} to write data to batch source. By default 
this method throws
+   * exception, data sources must overwrite this method to provide an 
implementation, if the
+   * {@link Table} that creates this scan implements {@link 
SupportsBatchWrite}.
+   *
+   * Note that, the returned {@link BatchWrite} can be null if the 
implementation supports SaveMode,
+   * to indicate that no writing is needed. We can clean it up after removing
+   * {@link SupportsSaveMode}.
+   */
+  default BatchWrite buildForBatch() {
+    throw new UnsupportedOperationException("Batch scans are not supported");
+  }
+}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 9216e34..6334c8f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
 
 /**
  * A commit message returned by {@link DataWriter#commit()} and will be sent 
back to the driver side
- * as the input parameter of {@link 
BatchWriteSupport#commit(WriterCommitMessage[])} or
+ * as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} 
or
  * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
  *
  * This is an empty interface, data sources should define their own message 
class and use it when
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ce8e4c8..af369a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         case _ => provider.getTable(dsOptions)
       }
       table match {
-        case s: SupportsBatchRead =>
-          Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
-            provider, s, finalOptions, userSpecifiedSchema = 
userSpecifiedSchema))
-
+        case _: SupportsBatchRead =>
+          Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, 
finalOptions))
         case _ => loadV1Source(paths: _*)
       }
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 981b3a8..228dcb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -32,6 +32,7 @@ import 
org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2Utils, WriteToDataSourceV2}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -241,33 +242,38 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
     assertNotBucketed("save")
 
-    val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-      val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-      source match {
-        case provider: BatchWriteSupportProvider =>
-          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-            source,
-            df.sparkSession.sessionState.conf)
-          val options = sessionOptions ++ extraOptions
-
+    val session = df.sparkSession
+    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
+    if (classOf[TableProvider].isAssignableFrom(cls)) {
+      val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+        provider, session.sessionState.conf)
+      val options = sessionOptions ++ extraOptions
+      val dsOptions = new DataSourceOptions(options.asJava)
+      provider.getTable(dsOptions) match {
+        case table: SupportsBatchWrite =>
           if (mode == SaveMode.Append) {
-            val relation = DataSourceV2Relation.createRelationForWrite(source, 
options)
+            val relation = DataSourceV2Relation.create(table, options)
             runCommand(df.sparkSession, "save") {
               AppendData.byName(relation, df.logicalPlan)
             }
-
           } else {
-            val writer = provider.createBatchWriteSupport(
-              UUID.randomUUID().toString,
-              df.logicalPlan.output.toStructType,
-              mode,
-              new DataSourceOptions(options.asJava))
-
-            if (writer.isPresent) {
-              runCommand(df.sparkSession, "save") {
-                WriteToDataSourceV2(writer.get, df.logicalPlan)
-              }
+            val writeBuilder = table.newWriteBuilder(dsOptions)
+              .withQueryId(UUID.randomUUID().toString)
+              .withInputDataSchema(df.logicalPlan.schema)
+            writeBuilder match {
+              case s: SupportsSaveMode =>
+                val write = s.mode(mode).buildForBatch()
+                // It can only return null with `SupportsSaveMode`. We can 
clean it up after
+                // removing `SupportsSaveMode`.
+                if (write != null) {
+                  runCommand(df.sparkSession, "save") {
+                    WriteToDataSourceV2(write, df.logicalPlan)
+                  }
+                }
+
+              case _ => throw new AnalysisException(
+                s"data source ${table.name} does not support SaveMode $mode")
             }
           }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7bf2b8b..6321578 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -21,46 +21,51 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
+import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table   The table that this relation represents.
+ * @param options The options for this table operation. It's used to create 
fresh [[ScanBuilder]]
+ *                and [[WriteBuilder]].
  */
 case class DataSourceV2Relation(
-    // TODO: remove `source` when we finish API refactor for write.
-    source: TableProvider,
-    table: SupportsBatchRead,
+    table: Table,
     output: Seq[AttributeReference],
-    options: Map[String, String],
-    userSpecifiedSchema: Option[StructType] = None)
+    options: Map[String, String])
   extends LeafNode with MultiInstanceRelation with NamedRelation {
 
-  import DataSourceV2Relation._
-
   override def name: String = table.name()
 
   override def simpleString(maxFields: Int): String = {
     s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
   }
 
-  def newWriteSupport(): BatchWriteSupport = 
source.createWriteSupport(options, schema)
+  def newScanBuilder(): ScanBuilder = table match {
+    case s: SupportsBatchRead =>
+      val dsOptions = new DataSourceOptions(options.asJava)
+      s.newScanBuilder(dsOptions)
+    case _ => throw new AnalysisException(s"Table is not readable: 
${table.name()}")
+  }
+
 
-  def newScanBuilder(): ScanBuilder = {
-    val dsOptions = new DataSourceOptions(options.asJava)
-    table.newScanBuilder(dsOptions)
+
+  def newWriteBuilder(schema: StructType): WriteBuilder = table match {
+    case s: SupportsBatchWrite =>
+      val dsOptions = new DataSourceOptions(options.asJava)
+      s.newWriteBuilder(dsOptions)
+        .withQueryId(UUID.randomUUID().toString)
+        .withInputDataSchema(schema)
+    case _ => throw new AnalysisException(s"Table is not writable: 
${table.name()}")
   }
 
   override def computeStats(): Statistics = {
@@ -126,52 +131,8 @@ case class StreamingDataSourceV2Relation(
 }
 
 object DataSourceV2Relation {
-  private implicit class SourceHelpers(source: DataSourceV2) {
-    def asWriteSupportProvider: BatchWriteSupportProvider = {
-      source match {
-        case provider: BatchWriteSupportProvider =>
-          provider
-        case _ =>
-          throw new AnalysisException(s"Data source is not writable: $name")
-      }
-    }
-
-    def name: String = {
-      source match {
-        case registered: DataSourceRegister =>
-          registered.shortName()
-        case _ =>
-          source.getClass.getSimpleName
-      }
-    }
-
-    def createWriteSupport(
-        options: Map[String, String],
-        schema: StructType): BatchWriteSupport = {
-      asWriteSupportProvider.createBatchWriteSupport(
-        UUID.randomUUID().toString,
-        schema,
-        SaveMode.Append,
-        new DataSourceOptions(options.asJava)).get
-    }
-  }
-
-  def create(
-      provider: TableProvider,
-      table: SupportsBatchRead,
-      options: Map[String, String],
-      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
+  def create(table: Table, options: Map[String, String]): DataSourceV2Relation 
= {
     val output = table.schema().toAttributes
-    DataSourceV2Relation(provider, table, output, options, userSpecifiedSchema)
-  }
-
-  // TODO: remove this when we finish API refactor for write.
-  def createRelationForWrite(
-      source: DataSourceV2,
-      options: Map[String, String]): DataSourceV2Relation = {
-    val provider = source.asInstanceOf[TableProvider]
-    val dsOptions = new DataSourceOptions(options.asJava)
-    val table = provider.getTable(dsOptions)
-    create(provider, table.asInstanceOf[SupportsBatchRead], options)
+    DataSourceV2Relation(table, output, options)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 2e26fce..79540b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{sources, Strategy}
+import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
Repartition}
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
 
 object DataSourceV2Strategy extends Strategy {
 
@@ -110,7 +111,7 @@ object DataSourceV2Strategy extends Strategy {
       val (scan, output) = pruneColumns(scanBuilder, relation, project ++ 
postScanFilters)
       logInfo(
         s"""
-           |Pushing operators to ${relation.source.getClass}
+           |Pushing operators to ${relation.name}
            |Pushed Filters: ${pushedFilters.mkString(", ")}
            |Post-Scan Filters: ${postScanFilters.mkString(",")}
            |Output: ${output.mkString(", ")}
@@ -136,7 +137,14 @@ object DataSourceV2Strategy extends Strategy {
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
     case AppendData(r: DataSourceV2Relation, query, _) =>
-      WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil
+      val writeBuilder = r.newWriteBuilder(query.schema)
+      writeBuilder match {
+        case s: SupportsSaveMode =>
+          val write = s.mode(SaveMode.Append).buildForBatch()
+          assert(write != null)
+          WriteToDataSourceV2Exec(write, planLater(query)) :: Nil
+        case _ => throw new AnalysisException(s"data source ${r.name} does not 
support SaveMode")
+      }
 
     case WriteToContinuousDataSource(writer, query) =>
       WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index d7e20ee..406fb8c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{LongAccumulator, Utils}
  * specific logical plans, like 
[[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
  */
 @deprecated("Use specific logical plans like AppendData instead", "2.4.0")
-case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: 
LogicalPlan)
+case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
   extends LogicalPlan {
   override def children: Seq[LogicalPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
@@ -44,7 +44,7 @@ case class WriteToDataSourceV2(writeSupport: 
BatchWriteSupport, query: LogicalPl
 /**
  * The physical plan for writing data into data source v2.
  */
-case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: 
SparkPlan)
+case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan)
   extends UnaryExecNode {
 
   var commitProgress: Option[StreamWriterCommitProgress] = None
@@ -53,13 +53,13 @@ case class WriteToDataSourceV2Exec(writeSupport: 
BatchWriteSupport, query: Spark
   override def output: Seq[Attribute] = Nil
 
   override protected def doExecute(): RDD[InternalRow] = {
-    val writerFactory = writeSupport.createBatchWriterFactory()
-    val useCommitCoordinator = writeSupport.useCommitCoordinator
+    val writerFactory = batchWrite.createBatchWriterFactory()
+    val useCommitCoordinator = batchWrite.useCommitCoordinator
     val rdd = query.execute()
     val messages = new Array[WriterCommitMessage](rdd.partitions.length)
     val totalNumRowsAccumulator = new LongAccumulator()
 
-    logInfo(s"Start processing data source write support: $writeSupport. " +
+    logInfo(s"Start processing data source write support: $batchWrite. " +
       s"The input RDD has ${messages.length} partitions.")
 
     try {
@@ -72,26 +72,26 @@ case class WriteToDataSourceV2Exec(writeSupport: 
BatchWriteSupport, query: Spark
           val commitMessage = result.writerCommitMessage
           messages(index) = commitMessage
           totalNumRowsAccumulator.add(result.numRows)
-          writeSupport.onDataWriterCommit(commitMessage)
+          batchWrite.onDataWriterCommit(commitMessage)
         }
       )
 
-      logInfo(s"Data source write support $writeSupport is committing.")
-      writeSupport.commit(messages)
-      logInfo(s"Data source write support $writeSupport committed.")
+      logInfo(s"Data source write support $batchWrite is committing.")
+      batchWrite.commit(messages)
+      logInfo(s"Data source write support $batchWrite committed.")
       commitProgress = 
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
       case cause: Throwable =>
-        logError(s"Data source write support $writeSupport is aborting.")
+        logError(s"Data source write support $batchWrite is aborting.")
         try {
-          writeSupport.abort(messages)
+          batchWrite.abort(messages)
         } catch {
           case t: Throwable =>
-            logError(s"Data source write support $writeSupport failed to 
abort.")
+            logError(s"Data source write support $batchWrite failed to abort.")
             cause.addSuppressed(t)
             throw new SparkException("Writing job failed.", cause)
         }
-        logError(s"Data source write support $writeSupport aborted.")
+        logError(s"Data source write support $batchWrite aborted.")
         cause match {
           // Only wrap non fatal exceptions.
           case NonFatal(e) => throw new SparkException("Writing job aborted.", 
e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 38ecb0d..db1bf32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -27,7 +27,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
 import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
-import 
org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, 
RateControlMicroBatchReadSupport}
+import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, 
RateControlMicroBatchReadSupport}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2._
 import 
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset 
=> OffsetV2}
@@ -515,7 +515,7 @@ class MicroBatchExecution(
           newAttributePlan.schema,
           outputMode,
           new DataSourceOptions(extraOptions.asJava))
-        WriteToDataSourceV2(new MicroBatchWritSupport(currentBatchId, writer), 
newAttributePlan)
+        WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), 
newAttributePlan)
       case _ => throw new IllegalArgumentException(s"unknown sink type for 
$sink")
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
similarity index 84%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
index 9f88416..143235e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
@@ -18,16 +18,15 @@
 package org.apache.spark.sql.execution.streaming.sources
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, 
DataWriterFactory, WriterCommitMessage}
 import 
org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, 
StreamingWriteSupport}
 
 /**
- * A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch 
plan. It implements
+ * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It 
implements
  * the non-streaming interface, forwarding the epoch ID determined at 
construction to a wrapped
  * streaming write support.
  */
-class MicroBatchWritSupport(eppchId: Long, val writeSupport: 
StreamingWriteSupport)
-  extends BatchWriteSupport {
+class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) 
extends BatchWrite {
 
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     writeSupport.commit(eppchId, messages)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
index ac3c71c..fd4cb44 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
@@ -21,12 +21,12 @@ import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, 
DataWriterFactory, WriterCommitMessage}
 import 
org.apache.spark.sql.sources.v2.writer.streaming.StreamingDataWriterFactory
 
 /**
  * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit 
message for delivery
- * to a [[BatchWriteSupport]] on the driver.
+ * to a [[BatchWrite]] on the driver.
  *
  * Note that, because it sends all rows to the driver, this factory will 
generally be unsuitable
  * for production-quality sinks. It's intended for use in tests.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index d282193..c60ea4a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -329,8 +329,8 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
         .format(classOf[DataSourceV2WithSessionConfig].getName).load()
       val options = df.queryExecution.optimizedPlan.collectFirst {
         case d: DataSourceV2Relation => d.options
-      }
-      assert(options.get.get(optionName) == Some("false"))
+      }.get
+      assert(options.get(optionName).get == "false")
     }
   }
 
@@ -356,13 +356,11 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
       val cls = classOf[SimpleWriteOnlyDataSource]
       val path = file.getCanonicalPath
       val df = spark.range(5).select('id as 'i, -'id as 'j)
-      try {
-        df.write.format(cls.getName).option("path", path).mode("error").save()
-        df.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
-        df.write.format(cls.getName).option("path", path).mode("ignore").save()
-      } catch {
-        case e: SchemaReadAttemptException => fail("Schema read was 
attempted.", e)
-      }
+      // non-append mode should not throw exception, as they don't access 
schema.
+      df.write.format(cls.getName).option("path", path).mode("error").save()
+      df.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
+      df.write.format(cls.getName).option("path", path).mode("ignore").save()
+      // append mode will access schema and should throw exception.
       intercept[SchemaReadAttemptException] {
         df.write.format(cls.getName).option("path", path).mode("append").save()
       }
@@ -680,10 +678,12 @@ object SpecificReaderFactory extends 
PartitionReaderFactory {
 class SchemaReadAttemptException(m: String) extends RuntimeException(m)
 
 class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
-  override def writeSchema(): StructType = {
-    // This is a bit hacky since this source implements read support but throws
-    // during schema retrieval. Might have to rewrite but it's done
-    // such so for minimised changes.
-    throw new SchemaReadAttemptException("read is not supported")
+
+  override def getTable(options: DataSourceOptions): Table = {
+    new MyTable(options) {
+      override def schema(): StructType = {
+        throw new SchemaReadAttemptException("schema should not be read.")
+      }
+    }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 82bb4fa..6e4f2bb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.sources.v2
 
 import java.io.{BufferedReader, InputStreamReader, IOException}
-import java.util.Optional
 
 import scala.collection.JavaConverters._
 
@@ -35,15 +34,13 @@ import org.apache.spark.util.SerializableConfiguration
 
 /**
  * A HDFS based transactional writable data source.
- * Each task writes data to 
`target/_temporary/queryId/$jobId-$partitionId-$attemptNumber`.
- * Each job moves files from `target/_temporary/queryId/` to `target`.
+ * Each task writes data to 
`target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/uniqueId/` to `target`.
  */
 class SimpleWritableDataSource extends DataSourceV2
-  with TableProvider
-  with BatchWriteSupportProvider
-  with SessionConfigSupport {
+  with TableProvider with SessionConfigSupport {
 
-  protected def writeSchema(): StructType = new StructType().add("i", 
"long").add("j", "long")
+  private val tableSchema = new StructType().add("i", "long").add("j", "long")
 
   override def keyPrefix: String = "simpleWritableDataSource"
 
@@ -68,22 +65,50 @@ class SimpleWritableDataSource extends DataSourceV2
       new CSVReaderFactory(serializableConf)
     }
 
-    override def readSchema(): StructType = writeSchema
+    override def readSchema(): StructType = tableSchema
   }
 
-  override def getTable(options: DataSourceOptions): Table = {
-    val path = new Path(options.get("path").get())
-    val conf = SparkContext.getActive.get.hadoopConfiguration
-    new SimpleBatchTable {
-      override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
-        new MyScanBuilder(path.toUri.toString, conf)
+  class MyWriteBuilder(path: String) extends WriteBuilder with 
SupportsSaveMode {
+    private var queryId: String = _
+    private var mode: SaveMode = _
+
+    override def withQueryId(queryId: String): WriteBuilder = {
+      this.queryId = queryId
+      this
+    }
+
+    override def mode(mode: SaveMode): WriteBuilder = {
+      this.mode = mode
+      this
+    }
+
+    override def buildForBatch(): BatchWrite = {
+      assert(mode != null)
+
+      val hadoopPath = new Path(path)
+      val hadoopConf = SparkContext.getActive.get.hadoopConfiguration
+      val fs = hadoopPath.getFileSystem(hadoopConf)
+
+      if (mode == SaveMode.ErrorIfExists) {
+        if (fs.exists(hadoopPath)) {
+          throw new RuntimeException("data already exists.")
+        }
+      }
+      if (mode == SaveMode.Ignore) {
+        if (fs.exists(hadoopPath)) {
+          return null
+        }
+      }
+      if (mode == SaveMode.Overwrite) {
+        fs.delete(hadoopPath, true)
       }
 
-      override def schema(): StructType = writeSchema
+      val pathStr = hadoopPath.toUri.toString
+      new MyBatchWrite(queryId, pathStr, hadoopConf)
     }
   }
 
-  class WritSupport(queryId: String, path: String, conf: Configuration) 
extends BatchWriteSupport {
+  class MyBatchWrite(queryId: String, path: String, conf: Configuration) 
extends BatchWrite {
     override def createBatchWriterFactory(): DataWriterFactory = {
       SimpleCounter.resetCounter
       new CSVDataWriterFactory(path, queryId, new 
SerializableConfiguration(conf))
@@ -116,33 +141,23 @@ class SimpleWritableDataSource extends DataSourceV2
     }
   }
 
-  override def createBatchWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: SaveMode,
-      options: DataSourceOptions): Optional[BatchWriteSupport] = {
-    assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", 
false))
+  class MyTable(options: DataSourceOptions) extends SimpleBatchTable with 
SupportsBatchWrite {
+    private val path = options.get("path").get()
+    private val conf = SparkContext.getActive.get.hadoopConfiguration
 
-    val path = new Path(options.get("path").get())
-    val conf = SparkContext.getActive.get.hadoopConfiguration
-    val fs = path.getFileSystem(conf)
+    override def schema(): StructType = tableSchema
 
-    if (mode == SaveMode.ErrorIfExists) {
-      if (fs.exists(path)) {
-        throw new RuntimeException("data already exists.")
-      }
+    override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+      new MyScanBuilder(new Path(path).toUri.toString, conf)
     }
-    if (mode == SaveMode.Ignore) {
-      if (fs.exists(path)) {
-        return Optional.empty()
-      }
-    }
-    if (mode == SaveMode.Overwrite) {
-      fs.delete(path, true)
+
+    override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+      new MyWriteBuilder(path)
     }
+  }
 
-    val pathStr = path.toUri.toString
-    Optional.of(new WritSupport(queryId, pathStr, conf))
+  override def getTable(options: DataSourceOptions): Table = {
+    new MyTable(options)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to