This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e2dac3465c [SPARK-49425][CONNECT][SQL] Create a shared 
DataFrameWriter
d4e2dac3465c is described below

commit d4e2dac3465cae7a2975de50892382de5af9a9b5
Author: Herman van Hovell <[email protected]>
AuthorDate: Tue Sep 3 00:45:08 2024 -0400

    [SPARK-49425][CONNECT][SQL] Create a shared DataFrameWriter
    
    ### What changes were proposed in this pull request?
    This PR adds a shared interface for DataFrameWriter.
    
    ### Why are the changes needed?
    We are creating a shared interface for Classic and Connect Scala SQL API. 
This class is part of that effort.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47922 from hvanhovell/SPARK-49425.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../org/apache/spark/sql/DataFrameWriter.scala     | 531 --------------------
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  11 +-
 .../spark/sql/internal/DataFrameWriterImpl.scala   | 157 ++++++
 .../org/apache/spark/sql/ClientDatasetSuite.scala  |  45 +-
 project/MimaExcludes.scala                         |   3 +
 .../org/apache/spark/sql/DataFrameWriter.scala     | 533 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/api/Dataset.scala   |  10 +-
 .../spark/sql/errors/CompilationErrors.scala       |  49 ++
 .../spark/sql/errors/DataTypeErrorsBase.scala      |   4 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  49 --
 .../apache/spark/sql/errors/QueryErrorsBase.scala  |   4 -
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  11 +-
 .../DataFrameWriterImpl.scala}                     | 442 ++---------------
 .../org/apache/spark/sql/DataFrameSuite.scala      |   2 +-
 14 files changed, 829 insertions(+), 1022 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
deleted file mode 100644
index 6f64f0204602..000000000000
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.{Locale, Properties}
-
-import scala.jdk.CollectionConverters._
-
-import org.apache.spark.annotation.Stable
-import org.apache.spark.connect.proto
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-
-/**
- * Interface used to write a [[Dataset]] to external storage systems (e.g. 
file systems, key-value
- * stores, etc). Use `Dataset.write` to access this.
- *
- * @since 3.4.0
- */
-@Stable
-final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
-
-  /**
-   * Specifies the behavior when data or table already exists. Options 
include: <ul>
-   * <li>`SaveMode.Overwrite`: overwrite the existing data.</li> 
<li>`SaveMode.Append`: append the
-   * data.</li> <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
-   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li> </ul> 
<p> The default
-   * option is `ErrorIfExists`.
-   *
-   * @since 3.4.0
-   */
-  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
-    this.mode = saveMode
-    this
-  }
-
-  /**
-   * Specifies the behavior when data or table already exists. Options 
include: <ul>
-   * <li>`overwrite`: overwrite the existing data.</li> <li>`append`: append 
the data.</li>
-   * <li>`ignore`: ignore the operation (i.e. no-op).</li> <li>`error` or 
`errorifexists`: default
-   * option, throw an exception at runtime.</li> </ul>
-   *
-   * @since 3.4.0
-   */
-  def mode(saveMode: String): DataFrameWriter[T] = {
-    saveMode.toLowerCase(Locale.ROOT) match {
-      case "overwrite" => mode(SaveMode.Overwrite)
-      case "append" => mode(SaveMode.Append)
-      case "ignore" => mode(SaveMode.Ignore)
-      case "error" | "errorifexists" | "default" => 
mode(SaveMode.ErrorIfExists)
-      case _ =>
-        throw new IllegalArgumentException(s"Unknown save mode: $saveMode. 
Accepted " +
-          "save modes are 'overwrite', 'append', 'ignore', 'error', 
'errorifexists', 'default'.")
-    }
-  }
-
-  /**
-   * Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
-   *
-   * @since 3.4.0
-   */
-  def format(source: String): DataFrameWriter[T] = {
-    this.source = Some(source)
-    this
-  }
-
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: String): DataFrameWriter[T] = {
-    this.extraOptions = this.extraOptions + (key -> value)
-    this
-  }
-
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, 
value.toString)
-
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: Long): DataFrameWriter[T] = option(key, 
value.toString)
-
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: Double): DataFrameWriter[T] = option(key, 
value.toString)
-
-  /**
-   * (Scala-specific) Adds output options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def options(options: scala.collection.Map[String, String]): 
DataFrameWriter[T] = {
-    this.extraOptions ++= options
-    this
-  }
-
-  /**
-   * Adds output options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def options(options: java.util.Map[String, String]): DataFrameWriter[T] = {
-    this.options(options.asScala)
-    this
-  }
-
-  /**
-   * Partitions the output by the given columns on the file system. If 
specified, the output is
-   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
-   * partition a dataset by year and then month, the directory layout would 
look like: <ul>
-   * <li>year=2016/month=01/</li> <li>year=2016/month=02/</li> </ul>
-   *
-   * Partitioning is one of the most widely used techniques to optimize 
physical data layout. It
-   * provides a coarse-grained index for skipping unnecessary data reads when 
queries have
-   * predicates on the partitioned columns. In order for partitioning to work 
well, the number of
-   * distinct values in each column should typically be less than tens of 
thousands.
-   *
-   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
-   * 2.1.0.
-   *
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def partitionBy(colNames: String*): DataFrameWriter[T] = {
-    this.partitioningColumns = Option(colNames)
-    this
-  }
-
-  /**
-   * Buckets the output by the given columns. If specified, the output is laid 
out on the file
-   * system similar to Hive's bucketing scheme, but with a different bucket 
hash function and is
-   * not compatible with Hive's bucketing.
-   *
-   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
-   * 2.1.0.
-   *
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
DataFrameWriter[T] = {
-    require(numBuckets > 0, "The numBuckets should be > 0.")
-    this.numBuckets = Option(numBuckets)
-    this.bucketColumnNames = Option(colName +: colNames)
-    this
-  }
-
-  /**
-   * Sorts the output in each bucket by the given columns.
-   *
-   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
-   * 2.1.0.
-   *
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = {
-    this.sortColumnNames = Option(colName +: colNames)
-    this
-  }
-
-  /**
-   * Clusters the output by the given columns on the storage. The rows with 
matching values in the
-   * specified clustering columns will be consolidated within the same group.
-   *
-   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
-   * together in a file. This arrangement improves query efficiency when you 
apply selective
-   * filters to these clustering columns, thanks to data skipping.
-   *
-   * @since 4.0.0
-   */
-  @scala.annotation.varargs
-  def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
-    this.clusteringColumns = Option(colName +: colNames)
-    this
-  }
-
-  /**
-   * Saves the content of the `DataFrame` at the specified path.
-   *
-   * @since 3.4.0
-   */
-  def save(path: String): Unit = {
-    saveInternal(Some(path))
-  }
-
-  /**
-   * Saves the content of the `DataFrame` as the specified table.
-   *
-   * @since 3.4.0
-   */
-  def save(): Unit = saveInternal(None)
-
-  private def saveInternal(path: Option[String]): Unit = {
-    executeWriteOperation(builder => path.foreach(builder.setPath))
-  }
-
-  private def executeWriteOperation(f: proto.WriteOperation.Builder => Unit): 
Unit = {
-    val builder = proto.WriteOperation.newBuilder()
-
-    builder.setInput(ds.plan.getRoot)
-
-    // Set path or table
-    f(builder)
-
-    // Cannot both be set
-    require(!(builder.hasPath && builder.hasTable))
-
-    builder.setMode(mode match {
-      case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
-      case SaveMode.Overwrite => 
proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
-      case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
-      case SaveMode.ErrorIfExists => 
proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
-    })
-
-    source.foreach(builder.setSource)
-    sortColumnNames.foreach(names => 
builder.addAllSortColumnNames(names.asJava))
-    partitioningColumns.foreach(cols => 
builder.addAllPartitioningColumns(cols.asJava))
-    clusteringColumns.foreach(cols => 
builder.addAllClusteringColumns(cols.asJava))
-
-    numBuckets.foreach(n => {
-      val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder()
-      bucketBuilder.setNumBuckets(n)
-      bucketColumnNames.foreach(names => 
bucketBuilder.addAllBucketColumnNames(names.asJava))
-      builder.setBucketBy(bucketBuilder)
-    })
-
-    extraOptions.foreach { case (k, v) =>
-      builder.putOptions(k, v)
-    }
-
-    
ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
-  }
-
-  /**
-   * Inserts the content of the `DataFrame` to the specified table. It 
requires that the schema of
-   * the `DataFrame` is the same as the schema of the table.
-   *
-   * @note
-   *   Unlike `saveAsTable`, `insertInto` ignores the column names and just 
uses position-based
-   *   resolution. For example:
-   *
-   * @note
-   *   SaveMode.ErrorIfExists and SaveMode.Ignore behave as SaveMode.Append in 
`insertInto` as
-   *   `insertInto` is not a table creating operation.
-   *
-   * {{{
-   *    scala> Seq((1, 2)).toDF("i", 
"j").write.mode("overwrite").saveAsTable("t1")
-   *    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
-   *    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
-   *    scala> sql("select * from t1").show
-   *    +---+---+
-   *    |  i|  j|
-   *    +---+---+
-   *    |  5|  6|
-   *    |  3|  4|
-   *    |  1|  2|
-   *    +---+---+
-   * }}}
-   *
-   * Because it inserts data to an existing table, format or options will be 
ignored.
-   *
-   * @since 3.4.0
-   */
-  def insertInto(tableName: String): Unit = {
-    executeWriteOperation(builder => {
-      builder.setTable(
-        proto.WriteOperation.SaveTable
-          .newBuilder()
-          .setTableName(tableName)
-          .setSaveMethod(
-            
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_INSERT_INTO))
-    })
-  }
-
-  /**
-   * Saves the content of the `DataFrame` as the specified table.
-   *
-   * In the case the table already exists, behavior of this function depends 
on the save mode,
-   * specified by the `mode` function (default to throwing an exception). When 
`mode` is
-   * `Overwrite`, the schema of the `DataFrame` does not need to be the same 
as that of the
-   * existing table.
-   *
-   * When `mode` is `Append`, if there is an existing table, we will use the 
format and options of
-   * the existing table. The column order in the schema of the `DataFrame` 
doesn't need to be same
-   * as that of the existing table. Unlike `insertInto`, `saveAsTable` will 
use the column names
-   * to find the correct column positions. For example:
-   *
-   * {{{
-   *    scala> Seq((1, 2)).toDF("i", 
"j").write.mode("overwrite").saveAsTable("t1")
-   *    scala> Seq((3, 4)).toDF("j", 
"i").write.mode("append").saveAsTable("t1")
-   *    scala> sql("select * from t1").show
-   *    +---+---+
-   *    |  i|  j|
-   *    +---+---+
-   *    |  1|  2|
-   *    |  4|  3|
-   *    +---+---+
-   * }}}
-   *
-   * In this method, save mode is used to determine the behavior if the data 
source table exists
-   * in Spark catalog. We will always overwrite the underlying data of data 
source (e.g. a table
-   * in JDBC data source) if the table doesn't exist in Spark catalog, and 
will always append to
-   * the underlying data of data source if the table already exists.
-   *
-   * When the DataFrame is created from a non-partitioned `HadoopFsRelation` 
with a single input
-   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
-   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
-   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
-   * specific format.
-   *
-   * @since 3.4.0
-   */
-  def saveAsTable(tableName: String): Unit = {
-    executeWriteOperation(builder => {
-      builder.setTable(
-        proto.WriteOperation.SaveTable
-          .newBuilder()
-          .setTableName(tableName)
-          .setSaveMethod(
-            
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_SAVE_AS_TABLE))
-    })
-  }
-
-  /**
-   * Saves the content of the `DataFrame` to an external database table via 
JDBC. In the case the
-   * table already exists in the external database, behavior of this function 
depends on the save
-   * mode, specified by the `mode` function (default to throwing an exception).
-   *
-   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
-   * your external database systems.
-   *
-   * JDBC-specific option and parameter documentation for storing tables via 
JDBC in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @param table
-   *   Name of the table in the external database.
-   * @param connectionProperties
-   *   JDBC database connection arguments, a list of arbitrary string 
tag/value. Normally at least
-   *   a "user" and "password" property should be included. "batchsize" can be 
used to control the
-   *   number of rows per insert. "isolationLevel" can be one of "NONE", 
"READ_COMMITTED",
-   *   "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding 
to standard
-   *   transaction isolation levels defined by JDBC's Connection object, with 
default of
-   *   "READ_UNCOMMITTED".
-   * @since 3.4.0
-   */
-  def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
-    // connectionProperties should override settings in extraOptions.
-    this.extraOptions ++= connectionProperties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
-    format("jdbc").save()
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in JSON format (<a 
href="http://jsonlines.org/";> JSON
-   * Lines text format or newline-delimited JSON</a>) at the specified path. 
This is equivalent
-   * to:
-   * {{{
-   *   format("json").save(path)
-   * }}}
-   *
-   * You can find the JSON-specific options for writing JSON files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
-  def json(path: String): Unit = {
-    format("json").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in Parquet format at the specified 
path. This is
-   * equivalent to:
-   * {{{
-   *   format("parquet").save(path)
-   * }}}
-   *
-   * Parquet-specific option(s) for writing Parquet files can be found in <a 
href=
-   * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
-   * Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
-  def parquet(path: String): Unit = {
-    format("parquet").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in ORC format at the specified path. 
This is equivalent
-   * to:
-   * {{{
-   *   format("orc").save(path)
-   * }}}
-   *
-   * ORC-specific option(s) for writing ORC files can be found in <a href=
-   * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
-   * Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
-  def orc(path: String): Unit = {
-    format("orc").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in a text file at the specified 
path. The DataFrame must
-   * have only one column that is of string type. Each row becomes a new line 
in the output file.
-   * For example:
-   * {{{
-   *   // Scala:
-   *   df.write.text("/path/to/output")
-   *
-   *   // Java:
-   *   df.write().text("/path/to/output")
-   * }}}
-   * The text files will be encoded as UTF-8.
-   *
-   * You can find the text-specific options for writing text files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
-  def text(path: String): Unit = {
-    format("text").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in CSV format at the specified path. 
This is equivalent
-   * to:
-   * {{{
-   *   format("csv").save(path)
-   * }}}
-   *
-   * You can find the CSV-specific options for writing CSV files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
-  def csv(path: String): Unit = {
-    format("csv").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in XML format at the specified path. 
This is equivalent
-   * to:
-   * {{{
-   *   format("xml").save(path)
-   * }}}
-   *
-   * Note that writing a XML file from `DataFrame` having a field `ArrayType` 
with its element as
-   * `ArrayType` would have an additional nested field for the element.
-   *
-   * Namely, roundtrip in writing and reading can end up in different schema 
structure.
-   *
-   * You can find the XML-specific options for writing XML files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 4.0.0
-   */
-  def xml(path: String): Unit = {
-    format("xml").save(path)
-  }
-
-  
///////////////////////////////////////////////////////////////////////////////////////
-  // Builder pattern config options
-  
///////////////////////////////////////////////////////////////////////////////////////
-
-  private var source: Option[String] = None
-
-  private var mode: SaveMode = SaveMode.ErrorIfExists
-
-  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
-
-  private var partitioningColumns: Option[Seq[String]] = None
-
-  private var bucketColumnNames: Option[Seq[String]] = None
-
-  private var numBuckets: Option[Int] = None
-
-  private var sortColumnNames: Option[Seq[String]] = None
-
-  private var clusteringColumns: Option[Seq[String]] = None
-}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3b10978b7c8b..d18a76b06a48 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -37,7 +37,7 @@ import 
org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevel
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
 import org.apache.spark.sql.expressions.SparkUserDefinedFunction
 import org.apache.spark.sql.functions.{struct, to_json}
-import org.apache.spark.sql.internal.{ColumnNodeToProtoConverter, 
UnresolvedAttribute, UnresolvedRegex}
+import org.apache.spark.sql.internal.{ColumnNodeToProtoConverter, 
DataFrameWriterImpl, UnresolvedAttribute, UnresolvedRegex}
 import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types.{Metadata, StructType}
 import org.apache.spark.storage.StorageLevel
@@ -1060,14 +1060,9 @@ class Dataset[T] private[sql] (
       .asScala
       .toArray
 
-  /**
-   * Interface for saving the content of the non-streaming Dataset out into 
external storage.
-   *
-   * @group basic
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def write: DataFrameWriter[T] = {
-    new DataFrameWriter[T](this)
+    new DataFrameWriterImpl[T](this)
   }
 
   /**
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
new file mode 100644
index 000000000000..58fbfea48afe
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{DataFrameWriter, Dataset, SaveMode}
+
+/**
+ * Interface used to write a [[Dataset]] to external storage systems (e.g. 
file systems, key-value
+ * stores, etc). Use `Dataset.write` to access this.
+ *
+ * @since 3.4.0
+ */
+@Stable
+final class DataFrameWriterImpl[T] private[sql] (ds: Dataset[T]) extends 
DataFrameWriter[T] {
+
+  /** @inheritdoc */
+  override def mode(saveMode: SaveMode): this.type = super.mode(saveMode)
+
+  /** @inheritdoc */
+  override def mode(saveMode: String): this.type = super.mode(saveMode)
+
+  /** @inheritdoc */
+  override def format(source: String): this.type = super.format(source)
+
+  /** @inheritdoc */
+  override def option(key: String, value: String): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Boolean): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Long): this.type = super.option(key, 
value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Double): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def options(options: scala.collection.Map[String, String]): 
this.type =
+    super.options(options)
+
+  /** @inheritdoc */
+  override def options(options: java.util.Map[String, String]): this.type =
+    super.options(options)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def partitionBy(colNames: String*): this.type = 
super.partitionBy(colNames: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
this.type =
+    super.bucketBy(numBuckets, colName, colNames: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def sortBy(colName: String, colNames: String*): this.type =
+    super.sortBy(colName, colNames: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def clusterBy(colName: String, colNames: String*): this.type =
+    super.clusterBy(colName, colNames: _*)
+
+  /** @inheritdoc */
+  def save(path: String): Unit = {
+    saveInternal(Some(path))
+  }
+
+  /** @inheritdoc */
+  def save(): Unit = saveInternal(None)
+
+  private def saveInternal(path: Option[String]): Unit = {
+    executeWriteOperation(builder => path.foreach(builder.setPath))
+  }
+
+  private def executeWriteOperation(f: proto.WriteOperation.Builder => Unit): 
Unit = {
+    val builder = proto.WriteOperation.newBuilder()
+
+    builder.setInput(ds.plan.getRoot)
+
+    // Set path or table
+    f(builder)
+
+    // Cannot both be set
+    require(!(builder.hasPath && builder.hasTable))
+
+    builder.setMode(mode match {
+      case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
+      case SaveMode.Overwrite => 
proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
+      case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
+      case SaveMode.ErrorIfExists => 
proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
+    })
+
+    if (source.nonEmpty) {
+      builder.setSource(source)
+    }
+    sortColumnNames.foreach(names => 
builder.addAllSortColumnNames(names.asJava))
+    partitioningColumns.foreach(cols => 
builder.addAllPartitioningColumns(cols.asJava))
+    clusteringColumns.foreach(cols => 
builder.addAllClusteringColumns(cols.asJava))
+
+    numBuckets.foreach(n => {
+      val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder()
+      bucketBuilder.setNumBuckets(n)
+      bucketColumnNames.foreach(names => 
bucketBuilder.addAllBucketColumnNames(names.asJava))
+      builder.setBucketBy(bucketBuilder)
+    })
+
+    extraOptions.foreach { case (k, v) =>
+      builder.putOptions(k, v)
+    }
+
+    
ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
+  }
+
+  /** @inheritdoc */
+  def insertInto(tableName: String): Unit = {
+    executeWriteOperation(builder => {
+      builder.setTable(
+        proto.WriteOperation.SaveTable
+          .newBuilder()
+          .setTableName(tableName)
+          .setSaveMethod(
+            
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_INSERT_INTO))
+    })
+  }
+
+  /** @inheritdoc */
+  def saveAsTable(tableName: String): Unit = {
+    executeWriteOperation(builder => {
+      builder.setTable(
+        proto.WriteOperation.SaveTable
+          .newBuilder()
+          .setTableName(tableName)
+          .setSaveMethod(
+            
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_SAVE_AS_TABLE))
+    })
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
index 1b166f8ace1d..04367d3b95f1 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
@@ -71,35 +71,46 @@ class ClientDatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
   test("write") {
     val df = ss.newDataFrame(_ => ()).limit(10)
 
-    val builder = proto.WriteOperation.newBuilder()
-    builder
+    def toPlan(builder: proto.WriteOperation.Builder): proto.Plan = {
+      proto.Plan
+        .newBuilder()
+        .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
+        .build()
+    }
+
+    val builder = proto.WriteOperation
+      .newBuilder()
       .setInput(df.plan.getRoot)
       .setPath("my/test/path")
       .setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS)
       .setSource("parquet")
-      .addSortColumnNames("col1")
-      .addPartitioningColumns("col99")
-      .setBucketBy(
-        proto.WriteOperation.BucketBy
-          .newBuilder()
-          .setNumBuckets(2)
-          .addBucketColumnNames("col1")
-          .addBucketColumnNames("col2"))
-      .addClusteringColumns("col3")
 
-    val expectedPlan = proto.Plan
-      .newBuilder()
-      .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
-      .build()
+    val partitionedPlan = toPlan(
+      builder
+        .clone()
+        .addSortColumnNames("col1")
+        .addPartitioningColumns("col99")
+        .setBucketBy(
+          proto.WriteOperation.BucketBy
+            .newBuilder()
+            .setNumBuckets(2)
+            .addBucketColumnNames("col1")
+            .addBucketColumnNames("col2")))
 
     df.write
       .sortBy("col1")
       .partitionBy("col99")
       .bucketBy(2, "col1", "col2")
+      .parquet("my/test/path")
+    val actualPartionedPlan = service.getAndClearLatestInputPlan()
+    assert(actualPartionedPlan.equals(partitionedPlan))
+
+    val clusteredPlan = toPlan(builder.clone().addClusteringColumns("col3"))
+    df.write
       .clusterBy("col3")
       .parquet("my/test/path")
-    val actualPlan = service.getAndClearLatestInputPlan()
-    assert(actualPlan.equals(expectedPlan))
+    val actualClusteredPlan = service.getAndClearLatestInputPlan()
+    assert(actualClusteredPlan.equals(clusteredPlan))
   }
 
   test("write jdbc") {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fe4a08971509..21638e481630 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -124,6 +124,9 @@ object MimaExcludes {
     // SPARK-49423: Consolidate Observation in sql/api
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation$"),
+
+    // SPARK-49425: Create a shared DataFrameWriter interface.
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriter")
   )
 
   // Default exclude rules
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
new file mode 100644
index 000000000000..96855ee5ad16
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.CompilationErrors
+
+/**
+ * Interface used to write a [[org.apache.spark.sql.api.Dataset]] to external 
storage systems
+ * (e.g. file systems, key-value stores, etc). Use `Dataset.write` to access 
this.
+ *
+ * @since 1.4.0
+ */
+@Stable
+abstract class DataFrameWriter[T] {
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   * <ul>
+   * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
+   * <li>`SaveMode.Append`: append the data.</li>
+   * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
+   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
+   * </ul>
+   * <p>
+   * The default option is `ErrorIfExists`.
+   *
+   * @since 1.4.0
+   */
+  def mode(saveMode: SaveMode): this.type = {
+    this.mode = saveMode
+    this
+  }
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   * <ul>
+   * <li>`overwrite`: overwrite the existing data.</li>
+   * <li>`append`: append the data.</li>
+   * <li>`ignore`: ignore the operation (i.e. no-op).</li>
+   * <li>`error` or `errorifexists`: default option, throw an exception at 
runtime.</li>
+   * </ul>
+   *
+   * @since 1.4.0
+   */
+  def mode(saveMode: String): this.type = {
+    saveMode.toLowerCase(util.Locale.ROOT) match {
+      case "overwrite" => mode(SaveMode.Overwrite)
+      case "append" => mode(SaveMode.Append)
+      case "ignore" => mode(SaveMode.Ignore)
+      case "error" | "errorifexists" | "default" => 
mode(SaveMode.ErrorIfExists)
+      case _ => throw CompilationErrors.invalidSaveModeError(saveMode)
+    }
+  }
+
+  /**
+   * Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
+   *
+   * @since 1.4.0
+   */
+  def format(source: String): this.type = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 1.4.0
+   */
+  def option(key: String, value: String): this.type = {
+    this.extraOptions = this.extraOptions + (key -> value)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 2.0.0
+   */
+  def option(key: String, value: Boolean): this.type = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 2.0.0
+   */
+  def option(key: String, value: Long): this.type = option(key, value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 2.0.0
+   */
+  def option(key: String, value: Double): this.type = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 1.4.0
+   */
+  def options(options: scala.collection.Map[String, String]): this.type = {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds output options for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names.
+   * If a new option has the same key case-insensitively, it will override the 
existing option.
+   *
+   * @since 1.4.0
+   */
+  def options(options: util.Map[String, String]): this.type = {
+    this.options(options.asScala)
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If 
specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
+   * partition a dataset by year and then month, the directory layout would 
look like:
+   * <ul>
+   * <li>year=2016/month=01/</li>
+   * <li>year=2016/month=02/</li>
+   * </ul>
+   *
+   * Partitioning is one of the most widely used techniques to optimize 
physical data layout.
+   * It provides a coarse-grained index for skipping unnecessary data reads 
when queries have
+   * predicates on the partitioned columns. In order for partitioning to work 
well, the number
+   * of distinct values in each column should typically be less than tens of 
thousands.
+   *
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
+   * 2.1.0.
+   *
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): this.type = {
+    this.partitioningColumns = Option(colNames)
+    validatePartitioning()
+    this
+  }
+
+  /**
+   * Buckets the output by the given columns. If specified, the output is laid 
out on the file
+   * system similar to Hive's bucketing scheme, but with a different bucket 
hash function
+   * and is not compatible with Hive's bucketing.
+   *
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
+   * 2.1.0.
+   *
+   * @since 2.0
+   */
+  @scala.annotation.varargs
+  def bucketBy(numBuckets: Int, colName: String, colNames: String*): this.type 
= {
+    this.numBuckets = Option(numBuckets)
+    this.bucketColumnNames = Option(colName +: colNames)
+    validatePartitioning()
+    this
+  }
+
+  /**
+   * Sorts the output in each bucket by the given columns.
+   *
+   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
+   * 2.1.0.
+   *
+   * @since 2.0
+   */
+  @scala.annotation.varargs
+  def sortBy(colName: String, colNames: String*): this.type = {
+    this.sortColumnNames = Option(colName +: colNames)
+    this
+  }
+
+  /**
+   * Clusters the output by the given columns on the storage. The rows with 
matching values in the
+   * specified clustering columns will be consolidated within the same group.
+   *
+   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
+   * together in a file. This arrangement improves query efficiency when you 
apply selective
+   * filters to these clustering columns, thanks to data skipping.
+   *
+   * @since 4.0
+   */
+  @scala.annotation.varargs
+  def clusterBy(colName: String, colNames: String*): this.type = {
+    this.clusteringColumns = Option(colName +: colNames)
+    validatePartitioning()
+    this
+  }
+
+  /**
+   * Saves the content of the `DataFrame` at the specified path.
+   *
+   * @since 1.4.0
+   */
+  def save(path: String): Unit
+
+  /**
+   * Saves the content of the `DataFrame` as the specified table.
+   *
+   * @since 1.4.0
+   */
+  def save(): Unit
+
+  /**
+   * Inserts the content of the `DataFrame` to the specified table. It 
requires that
+   * the schema of the `DataFrame` is the same as the schema of the table.
+   *
+   * @note Unlike `saveAsTable`, `insertInto` ignores the column names and 
just uses position-based
+   *       resolution. For example:
+   * @note SaveMode.ErrorIfExists and SaveMode.Ignore behave as 
SaveMode.Append in `insertInto` as
+   *       `insertInto` is not a table creating operation.
+   *
+   * {{{
+   *    scala> Seq((1, 2)).toDF("i", 
"j").write.mode("overwrite").saveAsTable("t1")
+   *    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
+   *    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
+   *    scala> sql("select * from t1").show
+   *    +---+---+
+   *    |  i|  j|
+   *    +---+---+
+   *    |  5|  6|
+   *    |  3|  4|
+   *    |  1|  2|
+   *    +---+---+
+   * }}}
+   *
+   *       Because it inserts data to an existing table, format or options 
will be ignored.
+   * @since 1.4.0
+   */
+  def insertInto(tableName: String): Unit
+
+  /**
+   * Saves the content of the `DataFrame` as the specified table.
+   *
+   * In the case the table already exists, behavior of this function depends 
on the
+   * save mode, specified by the `mode` function (default to throwing an 
exception).
+   * When `mode` is `Overwrite`, the schema of the `DataFrame` does not need 
to be
+   * the same as that of the existing table.
+   *
+   * When `mode` is `Append`, if there is an existing table, we will use the 
format and options of
+   * the existing table. The column order in the schema of the `DataFrame` 
doesn't need to be same
+   * as that of the existing table. Unlike `insertInto`, `saveAsTable` will 
use the column names to
+   * find the correct column positions. For example:
+   *
+   * {{{
+   *    scala> Seq((1, 2)).toDF("i", 
"j").write.mode("overwrite").saveAsTable("t1")
+   *    scala> Seq((3, 4)).toDF("j", 
"i").write.mode("append").saveAsTable("t1")
+   *    scala> sql("select * from t1").show
+   *    +---+---+
+   *    |  i|  j|
+   *    +---+---+
+   *    |  1|  2|
+   *    |  4|  3|
+   *    +---+---+
+   * }}}
+   *
+   * In this method, save mode is used to determine the behavior if the data 
source table exists in
+   * Spark catalog. We will always overwrite the underlying data of data 
source (e.g. a table in
+   * JDBC data source) if the table doesn't exist in Spark catalog, and will 
always append to the
+   * underlying data of data source if the table already exists.
+   *
+   * When the DataFrame is created from a non-partitioned `HadoopFsRelation` 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
+   * @since 1.4.0
+   */
+  def saveAsTable(tableName: String): Unit
+
+  /**
+   * Saves the content of the `DataFrame` to an external database table via 
JDBC. In the case the
+   * table already exists in the external database, behavior of this function 
depends on the
+   * save mode, specified by the `mode` function (default to throwing an 
exception).
+   *
+   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
+   * your external database systems.
+   *
+   * JDBC-specific option and parameter documentation for storing tables via 
JDBC in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @param table Name of the table in the external database.
+   * @param connectionProperties JDBC database connection arguments, a list of 
arbitrary string
+   *                             tag/value. Normally at least a "user" and 
"password" property
+   *                             should be included. "batchsize" can be used 
to control the
+   *                             number of rows per insert. "isolationLevel" 
can be one of
+   *                             "NONE", "READ_COMMITTED", "READ_UNCOMMITTED", 
"REPEATABLE_READ",
+   *                             or "SERIALIZABLE", corresponding to standard 
transaction
+   *                             isolation levels defined by JDBC's Connection 
object, with default
+   *                             of "READ_UNCOMMITTED".
+   * @since 1.4.0
+   */
+  def jdbc(url: String, table: String, connectionProperties: util.Properties): 
Unit = {
+    assertNotPartitioned("jdbc")
+    assertNotBucketed("jdbc")
+    assertNotClustered("jdbc")
+    // connectionProperties should override settings in extraOptions.
+    this.extraOptions ++= connectionProperties.asScala
+    // explicit url and dbtable should override all
+    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+    format("jdbc").save()
+  }
+
+  /**
+   * Saves the content of the `DataFrame` in JSON format (<a 
href="http://jsonlines.org/";>
+   * JSON Lines text format or newline-delimited JSON</a>) at the specified 
path.
+   * This is equivalent to:
+   * {{{
+   *   format("json").save(path)
+   * }}}
+   *
+   * You can find the JSON-specific options for writing JSON files in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 1.4.0
+   */
+  def json(path: String): Unit = {
+    format("json").save(path)
+  }
+
+  /**
+   * Saves the content of the `DataFrame` in Parquet format at the specified 
path.
+   * This is equivalent to:
+   * {{{
+   *   format("parquet").save(path)
+   * }}}
+   *
+   * Parquet-specific option(s) for writing Parquet files can be found in
+   * <a href=
+   *   
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 1.4.0
+   */
+  def parquet(path: String): Unit = {
+    format("parquet").save(path)
+  }
+
+  /**
+   * Saves the content of the `DataFrame` in ORC format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("orc").save(path)
+   * }}}
+   *
+   * ORC-specific option(s) for writing ORC files can be found in
+   * <a href=
+   *   
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 1.5.0
+   */
+  def orc(path: String): Unit = {
+    format("orc").save(path)
+  }
+
+  /**
+   * Saves the content of the `DataFrame` in a text file at the specified path.
+   * The DataFrame must have only one column that is of string type.
+   * Each row becomes a new line in the output file. For example:
+   * {{{
+   *   // Scala:
+   *   df.write.text("/path/to/output")
+   *
+   *   // Java:
+   *   df.write().text("/path/to/output")
+   * }}}
+   * The text files will be encoded as UTF-8.
+   *
+   * You can find the text-specific options for writing text files in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 1.6.0
+   */
+  def text(path: String): Unit = {
+    format("text").save(path)
+  }
+
+  /**
+   * Saves the content of the `DataFrame` in CSV format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("csv").save(path)
+   * }}}
+   *
+   * You can find the CSV-specific options for writing CSV files in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
+   *   Data Source Option</a> in the version you use.
+   *
+   * @since 2.0.0
+   */
+  def csv(path: String): Unit = {
+    format("csv").save(path)
+  }
+
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` 
with
+   * its element as `ArrayType` would have an additional nested field for the 
element.
+   * For example, the `DataFrame` having a field below,
+   *
+   *    {@code fieldA [[data1], [data2]]}
+   *
+   * would produce a XML file below.
+   *    {@code
+   *    <fieldA>
+   *        <item>data1</item>
+   *    </fieldA>
+   *    <fieldA>
+   *        <item>data2</item>
+   *    </fieldA>}
+   *
+   * Namely, roundtrip in writing and reading can end up in different schema 
structure.
+   *
+   * You can find the XML-specific options for writing XML files in
+   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   */
+  def xml(path: String): Unit = {
+    format("xml").save(path)
+  }
+
+  protected def isBucketed(): Boolean = {
+    if (sortColumnNames.isDefined && numBuckets.isEmpty) {
+      throw CompilationErrors.sortByWithoutBucketingError()
+    }
+    numBuckets.isDefined
+  }
+
+  protected def assertNotBucketed(operation: String): Unit = {
+    if (isBucketed()) {
+      if (sortColumnNames.isEmpty) {
+        throw CompilationErrors.bucketByUnsupportedByOperationError(operation)
+      } else {
+        throw 
CompilationErrors.bucketByAndSortByUnsupportedByOperationError(operation)
+      }
+    }
+  }
+
+  protected def assertNotPartitioned(operation: String): Unit = {
+    if (partitioningColumns.isDefined) {
+      throw CompilationErrors.operationNotSupportPartitioningError(operation)
+    }
+  }
+
+  protected def assertNotClustered(operation: String): Unit = {
+    if (clusteringColumns.isDefined) {
+      throw CompilationErrors.operationNotSupportClusteringError(operation)
+    }
+  }
+
+  /**
+   * Validate that clusterBy is not used with partitionBy or bucketBy.
+   */
+  protected def validatePartitioning(): Unit = {
+    if (clusteringColumns.nonEmpty) {
+      if (partitioningColumns.nonEmpty) {
+        throw CompilationErrors.clusterByWithPartitionedBy()
+      }
+      if (isBucketed()) {
+        throw CompilationErrors.clusterByWithBucketing()
+      }
+    }
+  }
+
+  
///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  
///////////////////////////////////////////////////////////////////////////////////////
+
+  protected var source: String = ""
+
+  protected var mode: SaveMode = SaveMode.ErrorIfExists
+
+  protected var extraOptions: CaseInsensitiveMap[String] = 
CaseInsensitiveMap[String](Map.empty)
+
+  protected var partitioningColumns: Option[Seq[String]] = None
+
+  protected var bucketColumnNames: Option[Seq[String]] = None
+
+  protected var numBuckets: Option[Int] = None
+
+  protected var sortColumnNames: Option[Seq[String]] = None
+
+  protected var clusteringColumns: Option[Seq[String]] = None
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index a5e125733a29..226860df6813 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -23,7 +23,7 @@ import _root_.java.util
 
 import org.apache.spark.annotation.{DeveloperApi, Stable}
 import org.apache.spark.api.java.function.{FilterFunction, FlatMapFunction, 
ForeachFunction, ForeachPartitionFunction, MapFunction, MapPartitionsFunction, 
ReduceFunction}
-import org.apache.spark.sql.{functions, AnalysisException, Column, Encoder, 
Observation, Row, TypedColumn}
+import org.apache.spark.sql.{functions, AnalysisException, Column, 
DataFrameWriter, Encoder, Observation, Row, TypedColumn}
 import org.apache.spark.sql.types.{Metadata, StructType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.ArrayImplicits._
@@ -2861,4 +2861,12 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] 
extends Serializable {
    */
   @DeveloperApi
   def semanticHash(): Int
+
+  /**
+   * Interface for saving the content of the non-streaming Dataset out into 
external storage.
+   *
+   * @group basic
+   * @since 1.6.0
+   */
+  def write: DataFrameWriter[T]
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
index efefdf3ea21e..6034c4190631 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
@@ -77,6 +77,55 @@ private[sql] trait CompilationErrors extends 
DataTypeErrorsBase {
         "intMinValue" -> toSQLValue(Int.MinValue),
         "intMaxValue" -> toSQLValue(Int.MaxValue)))
   }
+
+  def invalidSaveModeError(saveMode: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_SAVE_MODE",
+      messageParameters = Map("mode" -> toDSOption(saveMode))
+    )
+  }
+
+  def sortByWithoutBucketingError(): Throwable = {
+    new AnalysisException(
+      errorClass = "SORT_BY_WITHOUT_BUCKETING",
+      messageParameters = Map.empty)
+  }
+
+  def bucketByUnsupportedByOperationError(operation: String): Throwable = {
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1312",
+      messageParameters = Map("operation" -> operation))
+  }
+
+  def bucketByAndSortByUnsupportedByOperationError(operation: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1313",
+      messageParameters = Map("operation" -> operation))
+  }
+
+  def operationNotSupportPartitioningError(operation: String): Throwable = {
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1197",
+      messageParameters = Map("operation" -> operation))
+  }
+
+  def operationNotSupportClusteringError(operation: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CLUSTERING_NOT_SUPPORTED",
+      messageParameters = Map("operation" -> operation))
+  }
+
+  def clusterByWithPartitionedBy(): Throwable = {
+    new AnalysisException(
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+      messageParameters = Map.empty)
+  }
+
+  def clusterByWithBucketing(): Throwable = {
+    new AnalysisException(
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+      messageParameters = Map.empty)
+  }
 }
 
 private[sql] object CompilationErrors extends CompilationErrors
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
index 930f92db2682..d22f35b3fe50 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
@@ -96,4 +96,8 @@ private[sql] trait DataTypeErrorsBase {
   def getQueryContext(context: QueryContext): Array[QueryContext] = {
     if (context == null) Array.empty else Array(context)
   }
+
+  def toDSOption(option: String): String = {
+    quoteByDefault(option)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 613e7cff1e42..fa8ea2f5289f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2231,12 +2231,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "validColumnNames" -> validColumnNames.mkString(", ")))
   }
 
-  def operationNotSupportPartitioningError(operation: String): Throwable = {
-    new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1197",
-      messageParameters = Map("operation" -> operation))
-  }
-
   def mixedRefsInAggFunc(funcStr: String, origin: Origin): Throwable = {
     new AnalysisException(
       errorClass =
@@ -3266,13 +3260,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key))
   }
 
-  def invalidSaveModeError(saveMode: String): Throwable = {
-    new AnalysisException(
-      errorClass = "INVALID_SAVE_MODE",
-      messageParameters = Map("mode" -> toDSOption(saveMode))
-    )
-  }
-
   def invalidSingleVariantColumn(): Throwable = {
     new AnalysisException(
       errorClass = "INVALID_SINGLE_VARIANT_COLUMN",
@@ -3299,24 +3286,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("quote" -> quote))
   }
 
-  def sortByWithoutBucketingError(): Throwable = {
-    new AnalysisException(
-      errorClass = "SORT_BY_WITHOUT_BUCKETING",
-      messageParameters = Map.empty)
-  }
-
-  def bucketByUnsupportedByOperationError(operation: String): Throwable = {
-    new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1312",
-      messageParameters = Map("operation" -> operation))
-  }
-
-  def bucketByAndSortByUnsupportedByOperationError(operation: String): 
Throwable = {
-    new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1313",
-      messageParameters = Map("operation" -> operation))
-  }
-
   def tableAlreadyExistsError(tableIdent: TableIdentifier): Throwable = {
     new TableAlreadyExistsException(tableIdent.nameParts)
   }
@@ -4127,22 +4096,4 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("functionName" -> functionName)
     )
   }
-
-  def operationNotSupportClusteringError(operation: String): Throwable = {
-    new AnalysisException(
-      errorClass = "CLUSTERING_NOT_SUPPORTED",
-      messageParameters = Map("operation" -> operation))
-  }
-
-  def clusterByWithPartitionedBy(): Throwable = {
-    new AnalysisException(
-      errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
-      messageParameters = Map.empty)
-  }
-
-  def clusterByWithBucketing(): Throwable = {
-    new AnalysisException(
-      errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
-      messageParameters = Map.empty)
-  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
index b18937257bae..350c709de07c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
@@ -47,10 +47,6 @@ private[sql] trait QueryErrorsBase extends 
DataTypeErrorsBase {
     quoteByDefault(conf)
   }
 
-  def toDSOption(option: String): String = {
-    quoteByDefault(option)
-  }
-
   def toSQLExpr(e: Expression): String = {
     quoteByDefault(toPrettySQL(e))
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 597decdbc740..5288d77d40b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -60,8 +60,8 @@ import 
org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation, FileTable}
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.execution.stat.StatFunctions
+import org.apache.spark.sql.internal.{DataFrameWriterImpl, SQLConf}
 import org.apache.spark.sql.internal.ExpressionUtils.column
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.TypedAggUtils.withInputType
 import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types._
@@ -1614,19 +1614,14 @@ class Dataset[T] private[sql](
     }
   }
 
-  /**
-   * Interface for saving the content of the non-streaming Dataset out into 
external storage.
-   *
-   * @group basic
-   * @since 1.6.0
-   */
+  /** @inheritdoc */
   def write: DataFrameWriter[T] = {
     if (isStreaming) {
       logicalPlan.failAnalysis(
         errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
         messageParameters = Map("methodName" -> toSQLId("write")))
     }
-    new DataFrameWriter[T](this)
+    new DataFrameWriterImpl[T](this)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
similarity index 60%
rename from sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 60734efbf5bb..7248a2d3f056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
+package org.apache.spark.sql.internal
 
-import java.util.{Locale, Properties}
+import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.{DataFrameWriter, Dataset, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OptionList, 
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, 
UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, 
CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, 
TableCatalog, TableProvider, V1Table}
+import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.catalog.TableWritePrivilege
 import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
 import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
DataSourceUtils, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
@@ -52,202 +51,58 @@ import org.apache.spark.util.ArrayImplicits._
  * @since 1.4.0
  */
 @Stable
-final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
+final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends 
DataFrameWriter[T] {
+  format(ds.sparkSession.sessionState.conf.defaultDataSourceName)
 
   private val df = ds.toDF()
 
-  /**
-   * Specifies the behavior when data or table already exists. Options include:
-   * <ul>
-   * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
-   * <li>`SaveMode.Append`: append the data.</li>
-   * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
-   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
-   * </ul>
-   * <p>
-   * The default option is `ErrorIfExists`.
-   *
-   * @since 1.4.0
-   */
-  def mode(saveMode: SaveMode): DataFrameWriter[T] = {
-    this.mode = saveMode
-    this
-  }
+  /** @inheritdoc */
+  override def mode(saveMode: SaveMode): this.type = super.mode(saveMode)
 
-  /**
-   * Specifies the behavior when data or table already exists. Options include:
-   * <ul>
-   * <li>`overwrite`: overwrite the existing data.</li>
-   * <li>`append`: append the data.</li>
-   * <li>`ignore`: ignore the operation (i.e. no-op).</li>
-   * <li>`error` or `errorifexists`: default option, throw an exception at 
runtime.</li>
-   * </ul>
-   *
-   * @since 1.4.0
-   */
-  def mode(saveMode: String): DataFrameWriter[T] = {
-    saveMode.toLowerCase(Locale.ROOT) match {
-      case "overwrite" => mode(SaveMode.Overwrite)
-      case "append" => mode(SaveMode.Append)
-      case "ignore" => mode(SaveMode.Ignore)
-      case "error" | "errorifexists" | "default" => 
mode(SaveMode.ErrorIfExists)
-      case _ => throw QueryCompilationErrors.invalidSaveModeError(saveMode)
-    }
-  }
+  /** @inheritdoc */
+  override def mode(saveMode: String): this.type = super.mode(saveMode)
 
-  /**
-   * Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
-   *
-   * @since 1.4.0
-   */
-  def format(source: String): DataFrameWriter[T] = {
-    this.source = source
-    this
-  }
+  /** @inheritdoc */
+  override def format(source: String): this.type = super.format(source)
 
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 1.4.0
-   */
-  def option(key: String, value: String): DataFrameWriter[T] = {
-    this.extraOptions = this.extraOptions + (key -> value)
-    this
-  }
+  /** @inheritdoc */
+  override def option(key: String, value: String): this.type = 
super.option(key, value)
 
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, 
value.toString)
+  /** @inheritdoc */
+  override def option(key: String, value: Boolean): this.type = 
super.option(key, value)
 
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Long): DataFrameWriter[T] = option(key, 
value.toString)
+  /** @inheritdoc */
+  override def option(key: String, value: Long): this.type = super.option(key, 
value)
 
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Double): DataFrameWriter[T] = option(key, 
value.toString)
+  /** @inheritdoc */
+  override def option(key: String, value: Double): this.type = 
super.option(key, value)
 
-  /**
-   * (Scala-specific) Adds output options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 1.4.0
-   */
-  def options(options: scala.collection.Map[String, String]): 
DataFrameWriter[T] = {
-    this.extraOptions ++= options
-    this
-  }
+  /** @inheritdoc */
+  override def options(options: scala.collection.Map[String, String]): 
this.type =
+    super.options(options)
 
-  /**
-   * Adds output options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 1.4.0
-   */
-  def options(options: java.util.Map[String, String]): DataFrameWriter[T] = {
-    this.options(options.asScala)
-    this
-  }
+  /** @inheritdoc */
+  override def options(options: java.util.Map[String, String]): this.type =
+    super.options(options)
 
-  /**
-   * Partitions the output by the given columns on the file system. If 
specified, the output is
-   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
-   * partition a dataset by year and then month, the directory layout would 
look like:
-   * <ul>
-   * <li>year=2016/month=01/</li>
-   * <li>year=2016/month=02/</li>
-   * </ul>
-   *
-   * Partitioning is one of the most widely used techniques to optimize 
physical data layout.
-   * It provides a coarse-grained index for skipping unnecessary data reads 
when queries have
-   * predicates on the partitioned columns. In order for partitioning to work 
well, the number
-   * of distinct values in each column should typically be less than tens of 
thousands.
-   *
-   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
-   * 2.1.0.
-   *
-   * @since 1.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def partitionBy(colNames: String*): DataFrameWriter[T] = {
-    this.partitioningColumns = Option(colNames)
-    validatePartitioning()
-    this
-  }
+  override def partitionBy(colNames: String*): this.type = 
super.partitionBy(colNames: _*)
 
-  /**
-   * Buckets the output by the given columns. If specified, the output is laid 
out on the file
-   * system similar to Hive's bucketing scheme, but with a different bucket 
hash function
-   * and is not compatible with Hive's bucketing.
-   *
-   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
-   * 2.1.0.
-   *
-   * @since 2.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
DataFrameWriter[T] = {
-    this.numBuckets = Option(numBuckets)
-    this.bucketColumnNames = Option(colName +: colNames)
-    validatePartitioning()
-    this
-  }
+  override def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
this.type =
+    super.bucketBy(numBuckets, colName, colNames: _*)
 
-  /**
-   * Sorts the output in each bucket by the given columns.
-   *
-   * This is applicable for all file-based data sources (e.g. Parquet, JSON) 
starting with Spark
-   * 2.1.0.
-   *
-   * @since 2.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = {
-    this.sortColumnNames = Option(colName +: colNames)
-    this
-  }
+  override def sortBy(colName: String, colNames: String*): this.type =
+    super.sortBy(colName, colNames: _*)
 
-  /**
-   * Clusters the output by the given columns on the storage. The rows with 
matching values in the
-   * specified clustering columns will be consolidated within the same group.
-   *
-   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
-   * together in a file. This arrangement improves query efficiency when you 
apply selective
-   * filters to these clustering columns, thanks to data skipping.
-   *
-   * @since 4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
-    this.clusteringColumns = Option(colName +: colNames)
-    validatePartitioning()
-    this
-  }
+  override def clusterBy(colName: String, colNames: String*): this.type =
+    super.clusterBy(colName, colNames: _*)
 
   /**
    * Saves the content of the `DataFrame` at the specified path.
@@ -522,37 +377,12 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   }
 
   private def getBucketSpec: Option[BucketSpec] = {
-    if (sortColumnNames.isDefined && numBuckets.isEmpty) {
-      throw QueryCompilationErrors.sortByWithoutBucketingError()
-    }
-
+    isBucketed()
     numBuckets.map { n =>
       BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil))
     }
   }
 
-  private def assertNotBucketed(operation: String): Unit = {
-    if (getBucketSpec.isDefined) {
-      if (sortColumnNames.isEmpty) {
-        throw 
QueryCompilationErrors.bucketByUnsupportedByOperationError(operation)
-      } else {
-        throw 
QueryCompilationErrors.bucketByAndSortByUnsupportedByOperationError(operation)
-      }
-    }
-  }
-
-  private def assertNotPartitioned(operation: String): Unit = {
-    if (partitioningColumns.isDefined) {
-      throw 
QueryCompilationErrors.operationNotSupportPartitioningError(operation)
-    }
-  }
-
-  private def assertNotClustered(operation: String): Unit = {
-    if (clusteringColumns.isDefined) {
-      throw 
QueryCompilationErrors.operationNotSupportClusteringError(operation)
-    }
-  }
-
   /**
    * Saves the content of the `DataFrame` as the specified table.
    *
@@ -773,180 +603,6 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       s" - table: ${existingTable.partitioning().mkString(", ")}")
   }
 
-  /**
-   * Validate that clusterBy is not used with partitionBy or bucketBy.
-   */
-  private def validatePartitioning(): Unit = {
-    if (clusteringColumns.nonEmpty) {
-      if (partitioningColumns.nonEmpty) {
-        throw QueryCompilationErrors.clusterByWithPartitionedBy()
-      }
-      if (getBucketSpec.nonEmpty) {
-        throw QueryCompilationErrors.clusterByWithBucketing()
-      }
-    }
-  }
-
-  /**
-   * Saves the content of the `DataFrame` to an external database table via 
JDBC. In the case the
-   * table already exists in the external database, behavior of this function 
depends on the
-   * save mode, specified by the `mode` function (default to throwing an 
exception).
-   *
-   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
-   * your external database systems.
-   *
-   * JDBC-specific option and parameter documentation for storing tables via 
JDBC in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @param table Name of the table in the external database.
-   * @param connectionProperties JDBC database connection arguments, a list of 
arbitrary string
-   *                             tag/value. Normally at least a "user" and 
"password" property
-   *                             should be included. "batchsize" can be used 
to control the
-   *                             number of rows per insert. "isolationLevel" 
can be one of
-   *                             "NONE", "READ_COMMITTED", "READ_UNCOMMITTED", 
"REPEATABLE_READ",
-   *                             or "SERIALIZABLE", corresponding to standard 
transaction
-   *                             isolation levels defined by JDBC's Connection 
object, with default
-   *                             of "READ_UNCOMMITTED".
-   * @since 1.4.0
-   */
-  def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
-    assertNotPartitioned("jdbc")
-    assertNotBucketed("jdbc")
-    assertNotClustered("jdbc")
-    // connectionProperties should override settings in extraOptions.
-    this.extraOptions ++= connectionProperties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
-    format("jdbc").save()
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in JSON format (<a 
href="http://jsonlines.org/";>
-   * JSON Lines text format or newline-delimited JSON</a>) at the specified 
path.
-   * This is equivalent to:
-   * {{{
-   *   format("json").save(path)
-   * }}}
-   *
-   * You can find the JSON-specific options for writing JSON files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 1.4.0
-   */
-  def json(path: String): Unit = {
-    format("json").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in Parquet format at the specified 
path.
-   * This is equivalent to:
-   * {{{
-   *   format("parquet").save(path)
-   * }}}
-   *
-   * Parquet-specific option(s) for writing Parquet files can be found in
-   * <a href=
-   *   
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 1.4.0
-   */
-  def parquet(path: String): Unit = {
-    format("parquet").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in ORC format at the specified path.
-   * This is equivalent to:
-   * {{{
-   *   format("orc").save(path)
-   * }}}
-   *
-   * ORC-specific option(s) for writing ORC files can be found in
-   * <a href=
-   *   
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 1.5.0
-   */
-  def orc(path: String): Unit = {
-    format("orc").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in a text file at the specified path.
-   * The DataFrame must have only one column that is of string type.
-   * Each row becomes a new line in the output file. For example:
-   * {{{
-   *   // Scala:
-   *   df.write.text("/path/to/output")
-   *
-   *   // Java:
-   *   df.write().text("/path/to/output")
-   * }}}
-   * The text files will be encoded as UTF-8.
-   *
-   * You can find the text-specific options for writing text files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 1.6.0
-   */
-  def text(path: String): Unit = {
-    format("text").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in CSV format at the specified path.
-   * This is equivalent to:
-   * {{{
-   *   format("csv").save(path)
-   * }}}
-   *
-   * You can find the CSV-specific options for writing CSV files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
-  def csv(path: String): Unit = {
-    format("csv").save(path)
-  }
-
-  /**
-   * Saves the content of the `DataFrame` in XML format at the specified path.
-   * This is equivalent to:
-   * {{{
-   *   format("xml").save(path)
-   * }}}
-   *
-   * Note that writing a XML file from `DataFrame` having a field `ArrayType` 
with
-   * its element as `ArrayType` would have an additional nested field for the 
element.
-   * For example, the `DataFrame` having a field below,
-   *
-   *    {@code fieldA [[data1], [data2]]}
-   *
-   * would produce a XML file below.
-   *    {@code
-   *    <fieldA>
-   *        <item>data1</item>
-   *    </fieldA>
-   *    <fieldA>
-   *        <item>data2</item>
-   *    </fieldA>}
-   *
-   * Namely, roundtrip in writing and reading can end up in different schema 
structure.
-   *
-   * You can find the XML-specific options for writing XML files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   */
-  def xml(path: String): Unit = {
-    format("xml").save(path)
-  }
-
   /**
    * Wrap a DataFrameWriter action to track the QueryExecution and time cost, 
then report to the
    * user-registered callback functions.
@@ -963,24 +619,4 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       case other => other
     }
   }
-
-  
///////////////////////////////////////////////////////////////////////////////////////
-  // Builder pattern config options
-  
///////////////////////////////////////////////////////////////////////////////////////
-
-  private var source: String = 
df.sparkSession.sessionState.conf.defaultDataSourceName
-
-  private var mode: SaveMode = SaveMode.ErrorIfExists
-
-  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
-
-  private var partitioningColumns: Option[Seq[String]] = None
-
-  private var bucketColumnNames: Option[Seq[String]] = None
-
-  private var numBuckets: Option[Int] = None
-
-  private var sortColumnNames: Option[Seq[String]] = None
-
-  private var clusteringColumns: Option[Seq[String]] = None
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 70d7797ba89f..b1c41033fd76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1967,7 +1967,7 @@ class DataFrameSuite extends QueryTest
   test("SPARK-29442 Set `default` mode should override the existing mode") {
     val df = Seq(Tuple1(1)).toDF()
     val writer = df.write.mode("overwrite").mode("default")
-    val modeField = 
classOf[DataFrameWriter[Tuple1[Int]]].getDeclaredField("mode")
+    val modeField = classOf[DataFrameWriter[_]].getDeclaredField("mode")
     modeField.setAccessible(true)
     assert(SaveMode.ErrorIfExists === 
modeField.get(writer).asInstanceOf[SaveMode])
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to