This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d4e2dac3465c [SPARK-49425][CONNECT][SQL] Create a shared
DataFrameWriter
d4e2dac3465c is described below
commit d4e2dac3465cae7a2975de50892382de5af9a9b5
Author: Herman van Hovell <[email protected]>
AuthorDate: Tue Sep 3 00:45:08 2024 -0400
[SPARK-49425][CONNECT][SQL] Create a shared DataFrameWriter
### What changes were proposed in this pull request?
This PR adds a shared interface for DataFrameWriter.
### Why are the changes needed?
We are creating a shared interface for Classic and Connect Scala SQL API.
This class is part of that effort.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47922 from hvanhovell/SPARK-49425.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../org/apache/spark/sql/DataFrameWriter.scala | 531 --------------------
.../main/scala/org/apache/spark/sql/Dataset.scala | 11 +-
.../spark/sql/internal/DataFrameWriterImpl.scala | 157 ++++++
.../org/apache/spark/sql/ClientDatasetSuite.scala | 45 +-
project/MimaExcludes.scala | 3 +
.../org/apache/spark/sql/DataFrameWriter.scala | 533 +++++++++++++++++++++
.../scala/org/apache/spark/sql/api/Dataset.scala | 10 +-
.../spark/sql/errors/CompilationErrors.scala | 49 ++
.../spark/sql/errors/DataTypeErrorsBase.scala | 4 +
.../spark/sql/errors/QueryCompilationErrors.scala | 49 --
.../apache/spark/sql/errors/QueryErrorsBase.scala | 4 -
.../main/scala/org/apache/spark/sql/Dataset.scala | 11 +-
.../DataFrameWriterImpl.scala} | 442 ++---------------
.../org/apache/spark/sql/DataFrameSuite.scala | 2 +-
14 files changed, 829 insertions(+), 1022 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
deleted file mode 100644
index 6f64f0204602..000000000000
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.{Locale, Properties}
-
-import scala.jdk.CollectionConverters._
-
-import org.apache.spark.annotation.Stable
-import org.apache.spark.connect.proto
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-
-/**
- * Interface used to write a [[Dataset]] to external storage systems (e.g.
file systems, key-value
- * stores, etc). Use `Dataset.write` to access this.
- *
- * @since 3.4.0
- */
-@Stable
-final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
-
- /**
- * Specifies the behavior when data or table already exists. Options
include: <ul>
- * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
<li>`SaveMode.Append`: append the
- * data.</li> <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
- * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li> </ul>
<p> The default
- * option is `ErrorIfExists`.
- *
- * @since 3.4.0
- */
- def mode(saveMode: SaveMode): DataFrameWriter[T] = {
- this.mode = saveMode
- this
- }
-
- /**
- * Specifies the behavior when data or table already exists. Options
include: <ul>
- * <li>`overwrite`: overwrite the existing data.</li> <li>`append`: append
the data.</li>
- * <li>`ignore`: ignore the operation (i.e. no-op).</li> <li>`error` or
`errorifexists`: default
- * option, throw an exception at runtime.</li> </ul>
- *
- * @since 3.4.0
- */
- def mode(saveMode: String): DataFrameWriter[T] = {
- saveMode.toLowerCase(Locale.ROOT) match {
- case "overwrite" => mode(SaveMode.Overwrite)
- case "append" => mode(SaveMode.Append)
- case "ignore" => mode(SaveMode.Ignore)
- case "error" | "errorifexists" | "default" =>
mode(SaveMode.ErrorIfExists)
- case _ =>
- throw new IllegalArgumentException(s"Unknown save mode: $saveMode.
Accepted " +
- "save modes are 'overwrite', 'append', 'ignore', 'error',
'errorifexists', 'default'.")
- }
- }
-
- /**
- * Specifies the underlying output data source. Built-in options include
"parquet", "json", etc.
- *
- * @since 3.4.0
- */
- def format(source: String): DataFrameWriter[T] = {
- this.source = Some(source)
- this
- }
-
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: String): DataFrameWriter[T] = {
- this.extraOptions = this.extraOptions + (key -> value)
- this
- }
-
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: Boolean): DataFrameWriter[T] = option(key,
value.toString)
-
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: Long): DataFrameWriter[T] = option(key,
value.toString)
-
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: Double): DataFrameWriter[T] = option(key,
value.toString)
-
- /**
- * (Scala-specific) Adds output options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def options(options: scala.collection.Map[String, String]):
DataFrameWriter[T] = {
- this.extraOptions ++= options
- this
- }
-
- /**
- * Adds output options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def options(options: java.util.Map[String, String]): DataFrameWriter[T] = {
- this.options(options.asScala)
- this
- }
-
- /**
- * Partitions the output by the given columns on the file system. If
specified, the output is
- * laid out on the file system similar to Hive's partitioning scheme. As an
example, when we
- * partition a dataset by year and then month, the directory layout would
look like: <ul>
- * <li>year=2016/month=01/</li> <li>year=2016/month=02/</li> </ul>
- *
- * Partitioning is one of the most widely used techniques to optimize
physical data layout. It
- * provides a coarse-grained index for skipping unnecessary data reads when
queries have
- * predicates on the partitioned columns. In order for partitioning to work
well, the number of
- * distinct values in each column should typically be less than tens of
thousands.
- *
- * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
- * 2.1.0.
- *
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def partitionBy(colNames: String*): DataFrameWriter[T] = {
- this.partitioningColumns = Option(colNames)
- this
- }
-
- /**
- * Buckets the output by the given columns. If specified, the output is laid
out on the file
- * system similar to Hive's bucketing scheme, but with a different bucket
hash function and is
- * not compatible with Hive's bucketing.
- *
- * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
- * 2.1.0.
- *
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def bucketBy(numBuckets: Int, colName: String, colNames: String*):
DataFrameWriter[T] = {
- require(numBuckets > 0, "The numBuckets should be > 0.")
- this.numBuckets = Option(numBuckets)
- this.bucketColumnNames = Option(colName +: colNames)
- this
- }
-
- /**
- * Sorts the output in each bucket by the given columns.
- *
- * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
- * 2.1.0.
- *
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = {
- this.sortColumnNames = Option(colName +: colNames)
- this
- }
-
- /**
- * Clusters the output by the given columns on the storage. The rows with
matching values in the
- * specified clustering columns will be consolidated within the same group.
- *
- * For instance, if you cluster a dataset by date, the data sharing the same
date will be stored
- * together in a file. This arrangement improves query efficiency when you
apply selective
- * filters to these clustering columns, thanks to data skipping.
- *
- * @since 4.0.0
- */
- @scala.annotation.varargs
- def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
- this.clusteringColumns = Option(colName +: colNames)
- this
- }
-
- /**
- * Saves the content of the `DataFrame` at the specified path.
- *
- * @since 3.4.0
- */
- def save(path: String): Unit = {
- saveInternal(Some(path))
- }
-
- /**
- * Saves the content of the `DataFrame` as the specified table.
- *
- * @since 3.4.0
- */
- def save(): Unit = saveInternal(None)
-
- private def saveInternal(path: Option[String]): Unit = {
- executeWriteOperation(builder => path.foreach(builder.setPath))
- }
-
- private def executeWriteOperation(f: proto.WriteOperation.Builder => Unit):
Unit = {
- val builder = proto.WriteOperation.newBuilder()
-
- builder.setInput(ds.plan.getRoot)
-
- // Set path or table
- f(builder)
-
- // Cannot both be set
- require(!(builder.hasPath && builder.hasTable))
-
- builder.setMode(mode match {
- case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
- case SaveMode.Overwrite =>
proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
- case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
- case SaveMode.ErrorIfExists =>
proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
- })
-
- source.foreach(builder.setSource)
- sortColumnNames.foreach(names =>
builder.addAllSortColumnNames(names.asJava))
- partitioningColumns.foreach(cols =>
builder.addAllPartitioningColumns(cols.asJava))
- clusteringColumns.foreach(cols =>
builder.addAllClusteringColumns(cols.asJava))
-
- numBuckets.foreach(n => {
- val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder()
- bucketBuilder.setNumBuckets(n)
- bucketColumnNames.foreach(names =>
bucketBuilder.addAllBucketColumnNames(names.asJava))
- builder.setBucketBy(bucketBuilder)
- })
-
- extraOptions.foreach { case (k, v) =>
- builder.putOptions(k, v)
- }
-
-
ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
- }
-
- /**
- * Inserts the content of the `DataFrame` to the specified table. It
requires that the schema of
- * the `DataFrame` is the same as the schema of the table.
- *
- * @note
- * Unlike `saveAsTable`, `insertInto` ignores the column names and just
uses position-based
- * resolution. For example:
- *
- * @note
- * SaveMode.ErrorIfExists and SaveMode.Ignore behave as SaveMode.Append in
`insertInto` as
- * `insertInto` is not a table creating operation.
- *
- * {{{
- * scala> Seq((1, 2)).toDF("i",
"j").write.mode("overwrite").saveAsTable("t1")
- * scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
- * scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
- * scala> sql("select * from t1").show
- * +---+---+
- * | i| j|
- * +---+---+
- * | 5| 6|
- * | 3| 4|
- * | 1| 2|
- * +---+---+
- * }}}
- *
- * Because it inserts data to an existing table, format or options will be
ignored.
- *
- * @since 3.4.0
- */
- def insertInto(tableName: String): Unit = {
- executeWriteOperation(builder => {
- builder.setTable(
- proto.WriteOperation.SaveTable
- .newBuilder()
- .setTableName(tableName)
- .setSaveMethod(
-
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_INSERT_INTO))
- })
- }
-
- /**
- * Saves the content of the `DataFrame` as the specified table.
- *
- * In the case the table already exists, behavior of this function depends
on the save mode,
- * specified by the `mode` function (default to throwing an exception). When
`mode` is
- * `Overwrite`, the schema of the `DataFrame` does not need to be the same
as that of the
- * existing table.
- *
- * When `mode` is `Append`, if there is an existing table, we will use the
format and options of
- * the existing table. The column order in the schema of the `DataFrame`
doesn't need to be same
- * as that of the existing table. Unlike `insertInto`, `saveAsTable` will
use the column names
- * to find the correct column positions. For example:
- *
- * {{{
- * scala> Seq((1, 2)).toDF("i",
"j").write.mode("overwrite").saveAsTable("t1")
- * scala> Seq((3, 4)).toDF("j",
"i").write.mode("append").saveAsTable("t1")
- * scala> sql("select * from t1").show
- * +---+---+
- * | i| j|
- * +---+---+
- * | 1| 2|
- * | 4| 3|
- * +---+---+
- * }}}
- *
- * In this method, save mode is used to determine the behavior if the data
source table exists
- * in Spark catalog. We will always overwrite the underlying data of data
source (e.g. a table
- * in JDBC data source) if the table doesn't exist in Spark catalog, and
will always append to
- * the underlying data of data source if the table already exists.
- *
- * When the DataFrame is created from a non-partitioned `HadoopFsRelation`
with a single input
- * path, and the data source provider can be mapped to an existing Hive
builtin SerDe (i.e. ORC
- * and Parquet), the table is persisted in a Hive compatible format, which
means other systems
- * like Hive will be able to read this table. Otherwise, the table is
persisted in a Spark SQL
- * specific format.
- *
- * @since 3.4.0
- */
- def saveAsTable(tableName: String): Unit = {
- executeWriteOperation(builder => {
- builder.setTable(
- proto.WriteOperation.SaveTable
- .newBuilder()
- .setTableName(tableName)
- .setSaveMethod(
-
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_SAVE_AS_TABLE))
- })
- }
-
- /**
- * Saves the content of the `DataFrame` to an external database table via
JDBC. In the case the
- * table already exists in the external database, behavior of this function
depends on the save
- * mode, specified by the `mode` function (default to throwing an exception).
- *
- * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
- * your external database systems.
- *
- * JDBC-specific option and parameter documentation for storing tables via
JDBC in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param table
- * Name of the table in the external database.
- * @param connectionProperties
- * JDBC database connection arguments, a list of arbitrary string
tag/value. Normally at least
- * a "user" and "password" property should be included. "batchsize" can be
used to control the
- * number of rows per insert. "isolationLevel" can be one of "NONE",
"READ_COMMITTED",
- * "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding
to standard
- * transaction isolation levels defined by JDBC's Connection object, with
default of
- * "READ_UNCOMMITTED".
- * @since 3.4.0
- */
- def jdbc(url: String, table: String, connectionProperties: Properties): Unit
= {
- // connectionProperties should override settings in extraOptions.
- this.extraOptions ++= connectionProperties.asScala
- // explicit url and dbtable should override all
- this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
- format("jdbc").save()
- }
-
- /**
- * Saves the content of the `DataFrame` in JSON format (<a
href="http://jsonlines.org/"> JSON
- * Lines text format or newline-delimited JSON</a>) at the specified path.
This is equivalent
- * to:
- * {{{
- * format("json").save(path)
- * }}}
- *
- * You can find the JSON-specific options for writing JSON files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
- def json(path: String): Unit = {
- format("json").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in Parquet format at the specified
path. This is
- * equivalent to:
- * {{{
- * format("parquet").save(path)
- * }}}
- *
- * Parquet-specific option(s) for writing Parquet files can be found in <a
href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
Data
- * Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
- def parquet(path: String): Unit = {
- format("parquet").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in ORC format at the specified path.
This is equivalent
- * to:
- * {{{
- * format("orc").save(path)
- * }}}
- *
- * ORC-specific option(s) for writing ORC files can be found in <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
Data
- * Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
- def orc(path: String): Unit = {
- format("orc").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in a text file at the specified
path. The DataFrame must
- * have only one column that is of string type. Each row becomes a new line
in the output file.
- * For example:
- * {{{
- * // Scala:
- * df.write.text("/path/to/output")
- *
- * // Java:
- * df.write().text("/path/to/output")
- * }}}
- * The text files will be encoded as UTF-8.
- *
- * You can find the text-specific options for writing text files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
- def text(path: String): Unit = {
- format("text").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in CSV format at the specified path.
This is equivalent
- * to:
- * {{{
- * format("csv").save(path)
- * }}}
- *
- * You can find the CSV-specific options for writing CSV files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
- def csv(path: String): Unit = {
- format("csv").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in XML format at the specified path.
This is equivalent
- * to:
- * {{{
- * format("xml").save(path)
- * }}}
- *
- * Note that writing a XML file from `DataFrame` having a field `ArrayType`
with its element as
- * `ArrayType` would have an additional nested field for the element.
- *
- * Namely, roundtrip in writing and reading can end up in different schema
structure.
- *
- * You can find the XML-specific options for writing XML files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 4.0.0
- */
- def xml(path: String): Unit = {
- format("xml").save(path)
- }
-
-
///////////////////////////////////////////////////////////////////////////////////////
- // Builder pattern config options
-
///////////////////////////////////////////////////////////////////////////////////////
-
- private var source: Option[String] = None
-
- private var mode: SaveMode = SaveMode.ErrorIfExists
-
- private var extraOptions = CaseInsensitiveMap[String](Map.empty)
-
- private var partitioningColumns: Option[Seq[String]] = None
-
- private var bucketColumnNames: Option[Seq[String]] = None
-
- private var numBuckets: Option[Int] = None
-
- private var sortColumnNames: Option[Seq[String]] = None
-
- private var clusteringColumns: Option[Seq[String]] = None
-}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3b10978b7c8b..d18a76b06a48 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevel
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
import org.apache.spark.sql.functions.{struct, to_json}
-import org.apache.spark.sql.internal.{ColumnNodeToProtoConverter,
UnresolvedAttribute, UnresolvedRegex}
+import org.apache.spark.sql.internal.{ColumnNodeToProtoConverter,
DataFrameWriterImpl, UnresolvedAttribute, UnresolvedRegex}
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{Metadata, StructType}
import org.apache.spark.storage.StorageLevel
@@ -1060,14 +1060,9 @@ class Dataset[T] private[sql] (
.asScala
.toArray
- /**
- * Interface for saving the content of the non-streaming Dataset out into
external storage.
- *
- * @group basic
- * @since 3.4.0
- */
+ /** @inheritdoc */
def write: DataFrameWriter[T] = {
- new DataFrameWriter[T](this)
+ new DataFrameWriterImpl[T](this)
}
/**
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
new file mode 100644
index 000000000000..58fbfea48afe
--- /dev/null
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.{DataFrameWriter, Dataset, SaveMode}
+
+/**
+ * Interface used to write a [[Dataset]] to external storage systems (e.g.
file systems, key-value
+ * stores, etc). Use `Dataset.write` to access this.
+ *
+ * @since 3.4.0
+ */
+@Stable
+final class DataFrameWriterImpl[T] private[sql] (ds: Dataset[T]) extends
DataFrameWriter[T] {
+
+ /** @inheritdoc */
+ override def mode(saveMode: SaveMode): this.type = super.mode(saveMode)
+
+ /** @inheritdoc */
+ override def mode(saveMode: String): this.type = super.mode(saveMode)
+
+ /** @inheritdoc */
+ override def format(source: String): this.type = super.format(source)
+
+ /** @inheritdoc */
+ override def option(key: String, value: String): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key,
value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def options(options: scala.collection.Map[String, String]):
this.type =
+ super.options(options)
+
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type =
+ super.options(options)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def partitionBy(colNames: String*): this.type =
super.partitionBy(colNames: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def bucketBy(numBuckets: Int, colName: String, colNames: String*):
this.type =
+ super.bucketBy(numBuckets, colName, colNames: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def sortBy(colName: String, colNames: String*): this.type =
+ super.sortBy(colName, colNames: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def clusterBy(colName: String, colNames: String*): this.type =
+ super.clusterBy(colName, colNames: _*)
+
+ /** @inheritdoc */
+ def save(path: String): Unit = {
+ saveInternal(Some(path))
+ }
+
+ /** @inheritdoc */
+ def save(): Unit = saveInternal(None)
+
+ private def saveInternal(path: Option[String]): Unit = {
+ executeWriteOperation(builder => path.foreach(builder.setPath))
+ }
+
+ private def executeWriteOperation(f: proto.WriteOperation.Builder => Unit):
Unit = {
+ val builder = proto.WriteOperation.newBuilder()
+
+ builder.setInput(ds.plan.getRoot)
+
+ // Set path or table
+ f(builder)
+
+ // Cannot both be set
+ require(!(builder.hasPath && builder.hasTable))
+
+ builder.setMode(mode match {
+ case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
+ case SaveMode.Overwrite =>
proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
+ case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
+ case SaveMode.ErrorIfExists =>
proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
+ })
+
+ if (source.nonEmpty) {
+ builder.setSource(source)
+ }
+ sortColumnNames.foreach(names =>
builder.addAllSortColumnNames(names.asJava))
+ partitioningColumns.foreach(cols =>
builder.addAllPartitioningColumns(cols.asJava))
+ clusteringColumns.foreach(cols =>
builder.addAllClusteringColumns(cols.asJava))
+
+ numBuckets.foreach(n => {
+ val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder()
+ bucketBuilder.setNumBuckets(n)
+ bucketColumnNames.foreach(names =>
bucketBuilder.addAllBucketColumnNames(names.asJava))
+ builder.setBucketBy(bucketBuilder)
+ })
+
+ extraOptions.foreach { case (k, v) =>
+ builder.putOptions(k, v)
+ }
+
+
ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build())
+ }
+
+ /** @inheritdoc */
+ def insertInto(tableName: String): Unit = {
+ executeWriteOperation(builder => {
+ builder.setTable(
+ proto.WriteOperation.SaveTable
+ .newBuilder()
+ .setTableName(tableName)
+ .setSaveMethod(
+
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_INSERT_INTO))
+ })
+ }
+
+ /** @inheritdoc */
+ def saveAsTable(tableName: String): Unit = {
+ executeWriteOperation(builder => {
+ builder.setTable(
+ proto.WriteOperation.SaveTable
+ .newBuilder()
+ .setTableName(tableName)
+ .setSaveMethod(
+
proto.WriteOperation.SaveTable.TableSaveMethod.TABLE_SAVE_METHOD_SAVE_AS_TABLE))
+ })
+ }
+}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
index 1b166f8ace1d..04367d3b95f1 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
@@ -71,35 +71,46 @@ class ClientDatasetSuite extends ConnectFunSuite with
BeforeAndAfterEach {
test("write") {
val df = ss.newDataFrame(_ => ()).limit(10)
- val builder = proto.WriteOperation.newBuilder()
- builder
+ def toPlan(builder: proto.WriteOperation.Builder): proto.Plan = {
+ proto.Plan
+ .newBuilder()
+ .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
+ .build()
+ }
+
+ val builder = proto.WriteOperation
+ .newBuilder()
.setInput(df.plan.getRoot)
.setPath("my/test/path")
.setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS)
.setSource("parquet")
- .addSortColumnNames("col1")
- .addPartitioningColumns("col99")
- .setBucketBy(
- proto.WriteOperation.BucketBy
- .newBuilder()
- .setNumBuckets(2)
- .addBucketColumnNames("col1")
- .addBucketColumnNames("col2"))
- .addClusteringColumns("col3")
- val expectedPlan = proto.Plan
- .newBuilder()
- .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
- .build()
+ val partitionedPlan = toPlan(
+ builder
+ .clone()
+ .addSortColumnNames("col1")
+ .addPartitioningColumns("col99")
+ .setBucketBy(
+ proto.WriteOperation.BucketBy
+ .newBuilder()
+ .setNumBuckets(2)
+ .addBucketColumnNames("col1")
+ .addBucketColumnNames("col2")))
df.write
.sortBy("col1")
.partitionBy("col99")
.bucketBy(2, "col1", "col2")
+ .parquet("my/test/path")
+ val actualPartionedPlan = service.getAndClearLatestInputPlan()
+ assert(actualPartionedPlan.equals(partitionedPlan))
+
+ val clusteredPlan = toPlan(builder.clone().addClusteringColumns("col3"))
+ df.write
.clusterBy("col3")
.parquet("my/test/path")
- val actualPlan = service.getAndClearLatestInputPlan()
- assert(actualPlan.equals(expectedPlan))
+ val actualClusteredPlan = service.getAndClearLatestInputPlan()
+ assert(actualClusteredPlan.equals(clusteredPlan))
}
test("write jdbc") {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fe4a08971509..21638e481630 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -124,6 +124,9 @@ object MimaExcludes {
// SPARK-49423: Consolidate Observation in sql/api
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation$"),
+
+ // SPARK-49425: Create a shared DataFrameWriter interface.
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriter")
)
// Default exclude rules
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/api/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
new file mode 100644
index 000000000000..96855ee5ad16
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.CompilationErrors
+
+/**
+ * Interface used to write a [[org.apache.spark.sql.api.Dataset]] to external
storage systems
+ * (e.g. file systems, key-value stores, etc). Use `Dataset.write` to access
this.
+ *
+ * @since 1.4.0
+ */
+@Stable
+abstract class DataFrameWriter[T] {
+
+ /**
+ * Specifies the behavior when data or table already exists. Options include:
+ * <ul>
+ * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
+ * <li>`SaveMode.Append`: append the data.</li>
+ * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
+ * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
+ * </ul>
+ * <p>
+ * The default option is `ErrorIfExists`.
+ *
+ * @since 1.4.0
+ */
+ def mode(saveMode: SaveMode): this.type = {
+ this.mode = saveMode
+ this
+ }
+
+ /**
+ * Specifies the behavior when data or table already exists. Options include:
+ * <ul>
+ * <li>`overwrite`: overwrite the existing data.</li>
+ * <li>`append`: append the data.</li>
+ * <li>`ignore`: ignore the operation (i.e. no-op).</li>
+ * <li>`error` or `errorifexists`: default option, throw an exception at
runtime.</li>
+ * </ul>
+ *
+ * @since 1.4.0
+ */
+ def mode(saveMode: String): this.type = {
+ saveMode.toLowerCase(util.Locale.ROOT) match {
+ case "overwrite" => mode(SaveMode.Overwrite)
+ case "append" => mode(SaveMode.Append)
+ case "ignore" => mode(SaveMode.Ignore)
+ case "error" | "errorifexists" | "default" =>
mode(SaveMode.ErrorIfExists)
+ case _ => throw CompilationErrors.invalidSaveModeError(saveMode)
+ }
+ }
+
+ /**
+ * Specifies the underlying output data source. Built-in options include
"parquet", "json", etc.
+ *
+ * @since 1.4.0
+ */
+ def format(source: String): this.type = {
+ this.source = source
+ this
+ }
+
+ /**
+ * Adds an output option for the underlying data source.
+ *
+ * All options are maintained in a case-insensitive way in terms of key
names.
+ * If a new option has the same key case-insensitively, it will override the
existing option.
+ *
+ * @since 1.4.0
+ */
+ def option(key: String, value: String): this.type = {
+ this.extraOptions = this.extraOptions + (key -> value)
+ this
+ }
+
+ /**
+ * Adds an output option for the underlying data source.
+ *
+ * All options are maintained in a case-insensitive way in terms of key
names.
+ * If a new option has the same key case-insensitively, it will override the
existing option.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: Boolean): this.type = option(key,
value.toString)
+
+ /**
+ * Adds an output option for the underlying data source.
+ *
+ * All options are maintained in a case-insensitive way in terms of key
names.
+ * If a new option has the same key case-insensitively, it will override the
existing option.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: Long): this.type = option(key, value.toString)
+
+ /**
+ * Adds an output option for the underlying data source.
+ *
+ * All options are maintained in a case-insensitive way in terms of key
names.
+ * If a new option has the same key case-insensitively, it will override the
existing option.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: Double): this.type = option(key,
value.toString)
+
+ /**
+ * (Scala-specific) Adds output options for the underlying data source.
+ *
+ * All options are maintained in a case-insensitive way in terms of key
names.
+ * If a new option has the same key case-insensitively, it will override the
existing option.
+ *
+ * @since 1.4.0
+ */
+ def options(options: scala.collection.Map[String, String]): this.type = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * Adds output options for the underlying data source.
+ *
+ * All options are maintained in a case-insensitive way in terms of key
names.
+ * If a new option has the same key case-insensitively, it will override the
existing option.
+ *
+ * @since 1.4.0
+ */
+ def options(options: util.Map[String, String]): this.type = {
+ this.options(options.asScala)
+ this
+ }
+
+ /**
+ * Partitions the output by the given columns on the file system. If
specified, the output is
+ * laid out on the file system similar to Hive's partitioning scheme. As an
example, when we
+ * partition a dataset by year and then month, the directory layout would
look like:
+ * <ul>
+ * <li>year=2016/month=01/</li>
+ * <li>year=2016/month=02/</li>
+ * </ul>
+ *
+ * Partitioning is one of the most widely used techniques to optimize
physical data layout.
+ * It provides a coarse-grained index for skipping unnecessary data reads
when queries have
+ * predicates on the partitioned columns. In order for partitioning to work
well, the number
+ * of distinct values in each column should typically be less than tens of
thousands.
+ *
+ * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
+ * 2.1.0.
+ *
+ * @since 1.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colNames: String*): this.type = {
+ this.partitioningColumns = Option(colNames)
+ validatePartitioning()
+ this
+ }
+
+ /**
+ * Buckets the output by the given columns. If specified, the output is laid
out on the file
+ * system similar to Hive's bucketing scheme, but with a different bucket
hash function
+ * and is not compatible with Hive's bucketing.
+ *
+ * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
+ * 2.1.0.
+ *
+ * @since 2.0
+ */
+ @scala.annotation.varargs
+ def bucketBy(numBuckets: Int, colName: String, colNames: String*): this.type
= {
+ this.numBuckets = Option(numBuckets)
+ this.bucketColumnNames = Option(colName +: colNames)
+ validatePartitioning()
+ this
+ }
+
+ /**
+ * Sorts the output in each bucket by the given columns.
+ *
+ * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
+ * 2.1.0.
+ *
+ * @since 2.0
+ */
+ @scala.annotation.varargs
+ def sortBy(colName: String, colNames: String*): this.type = {
+ this.sortColumnNames = Option(colName +: colNames)
+ this
+ }
+
+ /**
+ * Clusters the output by the given columns on the storage. The rows with
matching values in the
+ * specified clustering columns will be consolidated within the same group.
+ *
+ * For instance, if you cluster a dataset by date, the data sharing the same
date will be stored
+ * together in a file. This arrangement improves query efficiency when you
apply selective
+ * filters to these clustering columns, thanks to data skipping.
+ *
+ * @since 4.0
+ */
+ @scala.annotation.varargs
+ def clusterBy(colName: String, colNames: String*): this.type = {
+ this.clusteringColumns = Option(colName +: colNames)
+ validatePartitioning()
+ this
+ }
+
+ /**
+ * Saves the content of the `DataFrame` at the specified path.
+ *
+ * @since 1.4.0
+ */
+ def save(path: String): Unit
+
+ /**
+ * Saves the content of the `DataFrame` as the specified table.
+ *
+ * @since 1.4.0
+ */
+ def save(): Unit
+
+ /**
+ * Inserts the content of the `DataFrame` to the specified table. It
requires that
+ * the schema of the `DataFrame` is the same as the schema of the table.
+ *
+ * @note Unlike `saveAsTable`, `insertInto` ignores the column names and
just uses position-based
+ * resolution. For example:
+ * @note SaveMode.ErrorIfExists and SaveMode.Ignore behave as
SaveMode.Append in `insertInto` as
+ * `insertInto` is not a table creating operation.
+ *
+ * {{{
+ * scala> Seq((1, 2)).toDF("i",
"j").write.mode("overwrite").saveAsTable("t1")
+ * scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
+ * scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
+ * scala> sql("select * from t1").show
+ * +---+---+
+ * | i| j|
+ * +---+---+
+ * | 5| 6|
+ * | 3| 4|
+ * | 1| 2|
+ * +---+---+
+ * }}}
+ *
+ * Because it inserts data to an existing table, format or options
will be ignored.
+ * @since 1.4.0
+ */
+ def insertInto(tableName: String): Unit
+
+ /**
+ * Saves the content of the `DataFrame` as the specified table.
+ *
+ * In the case the table already exists, behavior of this function depends
on the
+ * save mode, specified by the `mode` function (default to throwing an
exception).
+ * When `mode` is `Overwrite`, the schema of the `DataFrame` does not need
to be
+ * the same as that of the existing table.
+ *
+ * When `mode` is `Append`, if there is an existing table, we will use the
format and options of
+ * the existing table. The column order in the schema of the `DataFrame`
doesn't need to be same
+ * as that of the existing table. Unlike `insertInto`, `saveAsTable` will
use the column names to
+ * find the correct column positions. For example:
+ *
+ * {{{
+ * scala> Seq((1, 2)).toDF("i",
"j").write.mode("overwrite").saveAsTable("t1")
+ * scala> Seq((3, 4)).toDF("j",
"i").write.mode("append").saveAsTable("t1")
+ * scala> sql("select * from t1").show
+ * +---+---+
+ * | i| j|
+ * +---+---+
+ * | 1| 2|
+ * | 4| 3|
+ * +---+---+
+ * }}}
+ *
+ * In this method, save mode is used to determine the behavior if the data
source table exists in
+ * Spark catalog. We will always overwrite the underlying data of data
source (e.g. a table in
+ * JDBC data source) if the table doesn't exist in Spark catalog, and will
always append to the
+ * underlying data of data source if the table already exists.
+ *
+ * When the DataFrame is created from a non-partitioned `HadoopFsRelation`
with a single input
+ * path, and the data source provider can be mapped to an existing Hive
builtin SerDe (i.e. ORC
+ * and Parquet), the table is persisted in a Hive compatible format, which
means other systems
+ * like Hive will be able to read this table. Otherwise, the table is
persisted in a Spark SQL
+ * specific format.
+ *
+ * @since 1.4.0
+ */
+ def saveAsTable(tableName: String): Unit
+
+ /**
+ * Saves the content of the `DataFrame` to an external database table via
JDBC. In the case the
+ * table already exists in the external database, behavior of this function
depends on the
+ * save mode, specified by the `mode` function (default to throwing an
exception).
+ *
+ * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
+ * your external database systems.
+ *
+ * JDBC-specific option and parameter documentation for storing tables via
JDBC in
+ * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @param table Name of the table in the external database.
+ * @param connectionProperties JDBC database connection arguments, a list of
arbitrary string
+ * tag/value. Normally at least a "user" and
"password" property
+ * should be included. "batchsize" can be used
to control the
+ * number of rows per insert. "isolationLevel"
can be one of
+ * "NONE", "READ_COMMITTED", "READ_UNCOMMITTED",
"REPEATABLE_READ",
+ * or "SERIALIZABLE", corresponding to standard
transaction
+ * isolation levels defined by JDBC's Connection
object, with default
+ * of "READ_UNCOMMITTED".
+ * @since 1.4.0
+ */
+ def jdbc(url: String, table: String, connectionProperties: util.Properties):
Unit = {
+ assertNotPartitioned("jdbc")
+ assertNotBucketed("jdbc")
+ assertNotClustered("jdbc")
+ // connectionProperties should override settings in extraOptions.
+ this.extraOptions ++= connectionProperties.asScala
+ // explicit url and dbtable should override all
+ this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+ format("jdbc").save()
+ }
+
+ /**
+ * Saves the content of the `DataFrame` in JSON format (<a
href="http://jsonlines.org/">
+ * JSON Lines text format or newline-delimited JSON</a>) at the specified
path.
+ * This is equivalent to:
+ * {{{
+ * format("json").save(path)
+ * }}}
+ *
+ * You can find the JSON-specific options for writing JSON files in
+ * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @since 1.4.0
+ */
+ def json(path: String): Unit = {
+ format("json").save(path)
+ }
+
+ /**
+ * Saves the content of the `DataFrame` in Parquet format at the specified
path.
+ * This is equivalent to:
+ * {{{
+ * format("parquet").save(path)
+ * }}}
+ *
+ * Parquet-specific option(s) for writing Parquet files can be found in
+ * <a href=
+ *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @since 1.4.0
+ */
+ def parquet(path: String): Unit = {
+ format("parquet").save(path)
+ }
+
+ /**
+ * Saves the content of the `DataFrame` in ORC format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("orc").save(path)
+ * }}}
+ *
+ * ORC-specific option(s) for writing ORC files can be found in
+ * <a href=
+ *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @since 1.5.0
+ */
+ def orc(path: String): Unit = {
+ format("orc").save(path)
+ }
+
+ /**
+ * Saves the content of the `DataFrame` in a text file at the specified path.
+ * The DataFrame must have only one column that is of string type.
+ * Each row becomes a new line in the output file. For example:
+ * {{{
+ * // Scala:
+ * df.write.text("/path/to/output")
+ *
+ * // Java:
+ * df.write().text("/path/to/output")
+ * }}}
+ * The text files will be encoded as UTF-8.
+ *
+ * You can find the text-specific options for writing text files in
+ * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @since 1.6.0
+ */
+ def text(path: String): Unit = {
+ format("text").save(path)
+ }
+
+ /**
+ * Saves the content of the `DataFrame` in CSV format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("csv").save(path)
+ * }}}
+ *
+ * You can find the CSV-specific options for writing CSV files in
+ * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @since 2.0.0
+ */
+ def csv(path: String): Unit = {
+ format("csv").save(path)
+ }
+
+ /**
+ * Saves the content of the `DataFrame` in XML format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("xml").save(path)
+ * }}}
+ *
+ * Note that writing a XML file from `DataFrame` having a field `ArrayType`
with
+ * its element as `ArrayType` would have an additional nested field for the
element.
+ * For example, the `DataFrame` having a field below,
+ *
+ * {@code fieldA [[data1], [data2]]}
+ *
+ * would produce a XML file below.
+ * {@code
+ * <fieldA>
+ * <item>data1</item>
+ * </fieldA>
+ * <fieldA>
+ * <item>data2</item>
+ * </fieldA>}
+ *
+ * Namely, roundtrip in writing and reading can end up in different schema
structure.
+ *
+ * You can find the XML-specific options for writing XML files in
+ * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ */
+ def xml(path: String): Unit = {
+ format("xml").save(path)
+ }
+
+ protected def isBucketed(): Boolean = {
+ if (sortColumnNames.isDefined && numBuckets.isEmpty) {
+ throw CompilationErrors.sortByWithoutBucketingError()
+ }
+ numBuckets.isDefined
+ }
+
+ protected def assertNotBucketed(operation: String): Unit = {
+ if (isBucketed()) {
+ if (sortColumnNames.isEmpty) {
+ throw CompilationErrors.bucketByUnsupportedByOperationError(operation)
+ } else {
+ throw
CompilationErrors.bucketByAndSortByUnsupportedByOperationError(operation)
+ }
+ }
+ }
+
+ protected def assertNotPartitioned(operation: String): Unit = {
+ if (partitioningColumns.isDefined) {
+ throw CompilationErrors.operationNotSupportPartitioningError(operation)
+ }
+ }
+
+ protected def assertNotClustered(operation: String): Unit = {
+ if (clusteringColumns.isDefined) {
+ throw CompilationErrors.operationNotSupportClusteringError(operation)
+ }
+ }
+
+ /**
+ * Validate that clusterBy is not used with partitionBy or bucketBy.
+ */
+ protected def validatePartitioning(): Unit = {
+ if (clusteringColumns.nonEmpty) {
+ if (partitioningColumns.nonEmpty) {
+ throw CompilationErrors.clusterByWithPartitionedBy()
+ }
+ if (isBucketed()) {
+ throw CompilationErrors.clusterByWithBucketing()
+ }
+ }
+ }
+
+
///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+
///////////////////////////////////////////////////////////////////////////////////////
+
+ protected var source: String = ""
+
+ protected var mode: SaveMode = SaveMode.ErrorIfExists
+
+ protected var extraOptions: CaseInsensitiveMap[String] =
CaseInsensitiveMap[String](Map.empty)
+
+ protected var partitioningColumns: Option[Seq[String]] = None
+
+ protected var bucketColumnNames: Option[Seq[String]] = None
+
+ protected var numBuckets: Option[Int] = None
+
+ protected var sortColumnNames: Option[Seq[String]] = None
+
+ protected var clusteringColumns: Option[Seq[String]] = None
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index a5e125733a29..226860df6813 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -23,7 +23,7 @@ import _root_.java.util
import org.apache.spark.annotation.{DeveloperApi, Stable}
import org.apache.spark.api.java.function.{FilterFunction, FlatMapFunction,
ForeachFunction, ForeachPartitionFunction, MapFunction, MapPartitionsFunction,
ReduceFunction}
-import org.apache.spark.sql.{functions, AnalysisException, Column, Encoder,
Observation, Row, TypedColumn}
+import org.apache.spark.sql.{functions, AnalysisException, Column,
DataFrameWriter, Encoder, Observation, Row, TypedColumn}
import org.apache.spark.sql.types.{Metadata, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ArrayImplicits._
@@ -2861,4 +2861,12 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]]
extends Serializable {
*/
@DeveloperApi
def semanticHash(): Int
+
+ /**
+ * Interface for saving the content of the non-streaming Dataset out into
external storage.
+ *
+ * @group basic
+ * @since 1.6.0
+ */
+ def write: DataFrameWriter[T]
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
index efefdf3ea21e..6034c4190631 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/CompilationErrors.scala
@@ -77,6 +77,55 @@ private[sql] trait CompilationErrors extends
DataTypeErrorsBase {
"intMinValue" -> toSQLValue(Int.MinValue),
"intMaxValue" -> toSQLValue(Int.MaxValue)))
}
+
+ def invalidSaveModeError(saveMode: String): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_SAVE_MODE",
+ messageParameters = Map("mode" -> toDSOption(saveMode))
+ )
+ }
+
+ def sortByWithoutBucketingError(): Throwable = {
+ new AnalysisException(
+ errorClass = "SORT_BY_WITHOUT_BUCKETING",
+ messageParameters = Map.empty)
+ }
+
+ def bucketByUnsupportedByOperationError(operation: String): Throwable = {
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_1312",
+ messageParameters = Map("operation" -> operation))
+ }
+
+ def bucketByAndSortByUnsupportedByOperationError(operation: String):
Throwable = {
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_1313",
+ messageParameters = Map("operation" -> operation))
+ }
+
+ def operationNotSupportPartitioningError(operation: String): Throwable = {
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_1197",
+ messageParameters = Map("operation" -> operation))
+ }
+
+ def operationNotSupportClusteringError(operation: String): Throwable = {
+ new AnalysisException(
+ errorClass = "CLUSTERING_NOT_SUPPORTED",
+ messageParameters = Map("operation" -> operation))
+ }
+
+ def clusterByWithPartitionedBy(): Throwable = {
+ new AnalysisException(
+ errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+ messageParameters = Map.empty)
+ }
+
+ def clusterByWithBucketing(): Throwable = {
+ new AnalysisException(
+ errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+ messageParameters = Map.empty)
+ }
}
private[sql] object CompilationErrors extends CompilationErrors
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
index 930f92db2682..d22f35b3fe50 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala
@@ -96,4 +96,8 @@ private[sql] trait DataTypeErrorsBase {
def getQueryContext(context: QueryContext): Array[QueryContext] = {
if (context == null) Array.empty else Array(context)
}
+
+ def toDSOption(option: String): String = {
+ quoteByDefault(option)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 613e7cff1e42..fa8ea2f5289f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2231,12 +2231,6 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"validColumnNames" -> validColumnNames.mkString(", ")))
}
- def operationNotSupportPartitioningError(operation: String): Throwable = {
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1197",
- messageParameters = Map("operation" -> operation))
- }
-
def mixedRefsInAggFunc(funcStr: String, origin: Origin): Throwable = {
new AnalysisException(
errorClass =
@@ -3266,13 +3260,6 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key))
}
- def invalidSaveModeError(saveMode: String): Throwable = {
- new AnalysisException(
- errorClass = "INVALID_SAVE_MODE",
- messageParameters = Map("mode" -> toDSOption(saveMode))
- )
- }
-
def invalidSingleVariantColumn(): Throwable = {
new AnalysisException(
errorClass = "INVALID_SINGLE_VARIANT_COLUMN",
@@ -3299,24 +3286,6 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("quote" -> quote))
}
- def sortByWithoutBucketingError(): Throwable = {
- new AnalysisException(
- errorClass = "SORT_BY_WITHOUT_BUCKETING",
- messageParameters = Map.empty)
- }
-
- def bucketByUnsupportedByOperationError(operation: String): Throwable = {
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1312",
- messageParameters = Map("operation" -> operation))
- }
-
- def bucketByAndSortByUnsupportedByOperationError(operation: String):
Throwable = {
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1313",
- messageParameters = Map("operation" -> operation))
- }
-
def tableAlreadyExistsError(tableIdent: TableIdentifier): Throwable = {
new TableAlreadyExistsException(tableIdent.nameParts)
}
@@ -4127,22 +4096,4 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("functionName" -> functionName)
)
}
-
- def operationNotSupportClusteringError(operation: String): Throwable = {
- new AnalysisException(
- errorClass = "CLUSTERING_NOT_SUPPORTED",
- messageParameters = Map("operation" -> operation))
- }
-
- def clusterByWithPartitionedBy(): Throwable = {
- new AnalysisException(
- errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
- messageParameters = Map.empty)
- }
-
- def clusterByWithBucketing(): Throwable = {
- new AnalysisException(
- errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
- messageParameters = Map.empty)
- }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
index b18937257bae..350c709de07c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
@@ -47,10 +47,6 @@ private[sql] trait QueryErrorsBase extends
DataTypeErrorsBase {
quoteByDefault(conf)
}
- def toDSOption(option: String): String = {
- quoteByDefault(option)
- }
-
def toSQLExpr(e: Expression): String = {
quoteByDefault(toPrettySQL(e))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 597decdbc740..5288d77d40b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -60,8 +60,8 @@ import
org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
+import org.apache.spark.sql.internal.{DataFrameWriterImpl, SQLConf}
import org.apache.spark.sql.internal.ExpressionUtils.column
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.TypedAggUtils.withInputType
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
@@ -1614,19 +1614,14 @@ class Dataset[T] private[sql](
}
}
- /**
- * Interface for saving the content of the non-streaming Dataset out into
external storage.
- *
- * @group basic
- * @since 1.6.0
- */
+ /** @inheritdoc */
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
messageParameters = Map("methodName" -> toSQLId("write")))
}
- new DataFrameWriter[T](this)
+ new DataFrameWriterImpl[T](this)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
similarity index 60%
rename from sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 60734efbf5bb..7248a2d3f056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -15,22 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.internal
-import java.util.{Locale, Properties}
+import java.util.Locale
import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.{DataFrameWriter, Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OptionList,
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect,
UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin,
CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table,
TableCatalog, TableProvider, V1Table}
+import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.catalog.TableWritePrivilege
import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
import org.apache.spark.sql.connector.expressions.{ClusterByTransform,
FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource,
DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
@@ -52,202 +51,58 @@ import org.apache.spark.util.ArrayImplicits._
* @since 1.4.0
*/
@Stable
-final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
+final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends
DataFrameWriter[T] {
+ format(ds.sparkSession.sessionState.conf.defaultDataSourceName)
private val df = ds.toDF()
- /**
- * Specifies the behavior when data or table already exists. Options include:
- * <ul>
- * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
- * <li>`SaveMode.Append`: append the data.</li>
- * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
- * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
- * </ul>
- * <p>
- * The default option is `ErrorIfExists`.
- *
- * @since 1.4.0
- */
- def mode(saveMode: SaveMode): DataFrameWriter[T] = {
- this.mode = saveMode
- this
- }
+ /** @inheritdoc */
+ override def mode(saveMode: SaveMode): this.type = super.mode(saveMode)
- /**
- * Specifies the behavior when data or table already exists. Options include:
- * <ul>
- * <li>`overwrite`: overwrite the existing data.</li>
- * <li>`append`: append the data.</li>
- * <li>`ignore`: ignore the operation (i.e. no-op).</li>
- * <li>`error` or `errorifexists`: default option, throw an exception at
runtime.</li>
- * </ul>
- *
- * @since 1.4.0
- */
- def mode(saveMode: String): DataFrameWriter[T] = {
- saveMode.toLowerCase(Locale.ROOT) match {
- case "overwrite" => mode(SaveMode.Overwrite)
- case "append" => mode(SaveMode.Append)
- case "ignore" => mode(SaveMode.Ignore)
- case "error" | "errorifexists" | "default" =>
mode(SaveMode.ErrorIfExists)
- case _ => throw QueryCompilationErrors.invalidSaveModeError(saveMode)
- }
- }
+ /** @inheritdoc */
+ override def mode(saveMode: String): this.type = super.mode(saveMode)
- /**
- * Specifies the underlying output data source. Built-in options include
"parquet", "json", etc.
- *
- * @since 1.4.0
- */
- def format(source: String): DataFrameWriter[T] = {
- this.source = source
- this
- }
+ /** @inheritdoc */
+ override def format(source: String): this.type = super.format(source)
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 1.4.0
- */
- def option(key: String, value: String): DataFrameWriter[T] = {
- this.extraOptions = this.extraOptions + (key -> value)
- this
- }
+ /** @inheritdoc */
+ override def option(key: String, value: String): this.type =
super.option(key, value)
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Boolean): DataFrameWriter[T] = option(key,
value.toString)
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type =
super.option(key, value)
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Long): DataFrameWriter[T] = option(key,
value.toString)
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key,
value)
- /**
- * Adds an output option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Double): DataFrameWriter[T] = option(key,
value.toString)
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type =
super.option(key, value)
- /**
- * (Scala-specific) Adds output options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 1.4.0
- */
- def options(options: scala.collection.Map[String, String]):
DataFrameWriter[T] = {
- this.extraOptions ++= options
- this
- }
+ /** @inheritdoc */
+ override def options(options: scala.collection.Map[String, String]):
this.type =
+ super.options(options)
- /**
- * Adds output options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 1.4.0
- */
- def options(options: java.util.Map[String, String]): DataFrameWriter[T] = {
- this.options(options.asScala)
- this
- }
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type =
+ super.options(options)
- /**
- * Partitions the output by the given columns on the file system. If
specified, the output is
- * laid out on the file system similar to Hive's partitioning scheme. As an
example, when we
- * partition a dataset by year and then month, the directory layout would
look like:
- * <ul>
- * <li>year=2016/month=01/</li>
- * <li>year=2016/month=02/</li>
- * </ul>
- *
- * Partitioning is one of the most widely used techniques to optimize
physical data layout.
- * It provides a coarse-grained index for skipping unnecessary data reads
when queries have
- * predicates on the partitioned columns. In order for partitioning to work
well, the number
- * of distinct values in each column should typically be less than tens of
thousands.
- *
- * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
- * 2.1.0.
- *
- * @since 1.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def partitionBy(colNames: String*): DataFrameWriter[T] = {
- this.partitioningColumns = Option(colNames)
- validatePartitioning()
- this
- }
+ override def partitionBy(colNames: String*): this.type =
super.partitionBy(colNames: _*)
- /**
- * Buckets the output by the given columns. If specified, the output is laid
out on the file
- * system similar to Hive's bucketing scheme, but with a different bucket
hash function
- * and is not compatible with Hive's bucketing.
- *
- * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
- * 2.1.0.
- *
- * @since 2.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def bucketBy(numBuckets: Int, colName: String, colNames: String*):
DataFrameWriter[T] = {
- this.numBuckets = Option(numBuckets)
- this.bucketColumnNames = Option(colName +: colNames)
- validatePartitioning()
- this
- }
+ override def bucketBy(numBuckets: Int, colName: String, colNames: String*):
this.type =
+ super.bucketBy(numBuckets, colName, colNames: _*)
- /**
- * Sorts the output in each bucket by the given columns.
- *
- * This is applicable for all file-based data sources (e.g. Parquet, JSON)
starting with Spark
- * 2.1.0.
- *
- * @since 2.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = {
- this.sortColumnNames = Option(colName +: colNames)
- this
- }
+ override def sortBy(colName: String, colNames: String*): this.type =
+ super.sortBy(colName, colNames: _*)
- /**
- * Clusters the output by the given columns on the storage. The rows with
matching values in the
- * specified clustering columns will be consolidated within the same group.
- *
- * For instance, if you cluster a dataset by date, the data sharing the same
date will be stored
- * together in a file. This arrangement improves query efficiency when you
apply selective
- * filters to these clustering columns, thanks to data skipping.
- *
- * @since 4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
- this.clusteringColumns = Option(colName +: colNames)
- validatePartitioning()
- this
- }
+ override def clusterBy(colName: String, colNames: String*): this.type =
+ super.clusterBy(colName, colNames: _*)
/**
* Saves the content of the `DataFrame` at the specified path.
@@ -522,37 +377,12 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
}
private def getBucketSpec: Option[BucketSpec] = {
- if (sortColumnNames.isDefined && numBuckets.isEmpty) {
- throw QueryCompilationErrors.sortByWithoutBucketingError()
- }
-
+ isBucketed()
numBuckets.map { n =>
BucketSpec(n, bucketColumnNames.get, sortColumnNames.getOrElse(Nil))
}
}
- private def assertNotBucketed(operation: String): Unit = {
- if (getBucketSpec.isDefined) {
- if (sortColumnNames.isEmpty) {
- throw
QueryCompilationErrors.bucketByUnsupportedByOperationError(operation)
- } else {
- throw
QueryCompilationErrors.bucketByAndSortByUnsupportedByOperationError(operation)
- }
- }
- }
-
- private def assertNotPartitioned(operation: String): Unit = {
- if (partitioningColumns.isDefined) {
- throw
QueryCompilationErrors.operationNotSupportPartitioningError(operation)
- }
- }
-
- private def assertNotClustered(operation: String): Unit = {
- if (clusteringColumns.isDefined) {
- throw
QueryCompilationErrors.operationNotSupportClusteringError(operation)
- }
- }
-
/**
* Saves the content of the `DataFrame` as the specified table.
*
@@ -773,180 +603,6 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
s" - table: ${existingTable.partitioning().mkString(", ")}")
}
- /**
- * Validate that clusterBy is not used with partitionBy or bucketBy.
- */
- private def validatePartitioning(): Unit = {
- if (clusteringColumns.nonEmpty) {
- if (partitioningColumns.nonEmpty) {
- throw QueryCompilationErrors.clusterByWithPartitionedBy()
- }
- if (getBucketSpec.nonEmpty) {
- throw QueryCompilationErrors.clusterByWithBucketing()
- }
- }
- }
-
- /**
- * Saves the content of the `DataFrame` to an external database table via
JDBC. In the case the
- * table already exists in the external database, behavior of this function
depends on the
- * save mode, specified by the `mode` function (default to throwing an
exception).
- *
- * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
- * your external database systems.
- *
- * JDBC-specific option and parameter documentation for storing tables via
JDBC in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param table Name of the table in the external database.
- * @param connectionProperties JDBC database connection arguments, a list of
arbitrary string
- * tag/value. Normally at least a "user" and
"password" property
- * should be included. "batchsize" can be used
to control the
- * number of rows per insert. "isolationLevel"
can be one of
- * "NONE", "READ_COMMITTED", "READ_UNCOMMITTED",
"REPEATABLE_READ",
- * or "SERIALIZABLE", corresponding to standard
transaction
- * isolation levels defined by JDBC's Connection
object, with default
- * of "READ_UNCOMMITTED".
- * @since 1.4.0
- */
- def jdbc(url: String, table: String, connectionProperties: Properties): Unit
= {
- assertNotPartitioned("jdbc")
- assertNotBucketed("jdbc")
- assertNotClustered("jdbc")
- // connectionProperties should override settings in extraOptions.
- this.extraOptions ++= connectionProperties.asScala
- // explicit url and dbtable should override all
- this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
- format("jdbc").save()
- }
-
- /**
- * Saves the content of the `DataFrame` in JSON format (<a
href="http://jsonlines.org/">
- * JSON Lines text format or newline-delimited JSON</a>) at the specified
path.
- * This is equivalent to:
- * {{{
- * format("json").save(path)
- * }}}
- *
- * You can find the JSON-specific options for writing JSON files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 1.4.0
- */
- def json(path: String): Unit = {
- format("json").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in Parquet format at the specified
path.
- * This is equivalent to:
- * {{{
- * format("parquet").save(path)
- * }}}
- *
- * Parquet-specific option(s) for writing Parquet files can be found in
- * <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 1.4.0
- */
- def parquet(path: String): Unit = {
- format("parquet").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in ORC format at the specified path.
- * This is equivalent to:
- * {{{
- * format("orc").save(path)
- * }}}
- *
- * ORC-specific option(s) for writing ORC files can be found in
- * <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 1.5.0
- */
- def orc(path: String): Unit = {
- format("orc").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in a text file at the specified path.
- * The DataFrame must have only one column that is of string type.
- * Each row becomes a new line in the output file. For example:
- * {{{
- * // Scala:
- * df.write.text("/path/to/output")
- *
- * // Java:
- * df.write().text("/path/to/output")
- * }}}
- * The text files will be encoded as UTF-8.
- *
- * You can find the text-specific options for writing text files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 1.6.0
- */
- def text(path: String): Unit = {
- format("text").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in CSV format at the specified path.
- * This is equivalent to:
- * {{{
- * format("csv").save(path)
- * }}}
- *
- * You can find the CSV-specific options for writing CSV files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
- def csv(path: String): Unit = {
- format("csv").save(path)
- }
-
- /**
- * Saves the content of the `DataFrame` in XML format at the specified path.
- * This is equivalent to:
- * {{{
- * format("xml").save(path)
- * }}}
- *
- * Note that writing a XML file from `DataFrame` having a field `ArrayType`
with
- * its element as `ArrayType` would have an additional nested field for the
element.
- * For example, the `DataFrame` having a field below,
- *
- * {@code fieldA [[data1], [data2]]}
- *
- * would produce a XML file below.
- * {@code
- * <fieldA>
- * <item>data1</item>
- * </fieldA>
- * <fieldA>
- * <item>data2</item>
- * </fieldA>}
- *
- * Namely, roundtrip in writing and reading can end up in different schema
structure.
- *
- * You can find the XML-specific options for writing XML files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
- * Data Source Option</a> in the version you use.
- */
- def xml(path: String): Unit = {
- format("xml").save(path)
- }
-
/**
* Wrap a DataFrameWriter action to track the QueryExecution and time cost,
then report to the
* user-registered callback functions.
@@ -963,24 +619,4 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
case other => other
}
}
-
-
///////////////////////////////////////////////////////////////////////////////////////
- // Builder pattern config options
-
///////////////////////////////////////////////////////////////////////////////////////
-
- private var source: String =
df.sparkSession.sessionState.conf.defaultDataSourceName
-
- private var mode: SaveMode = SaveMode.ErrorIfExists
-
- private var extraOptions = CaseInsensitiveMap[String](Map.empty)
-
- private var partitioningColumns: Option[Seq[String]] = None
-
- private var bucketColumnNames: Option[Seq[String]] = None
-
- private var numBuckets: Option[Int] = None
-
- private var sortColumnNames: Option[Seq[String]] = None
-
- private var clusteringColumns: Option[Seq[String]] = None
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 70d7797ba89f..b1c41033fd76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1967,7 +1967,7 @@ class DataFrameSuite extends QueryTest
test("SPARK-29442 Set `default` mode should override the existing mode") {
val df = Seq(Tuple1(1)).toDF()
val writer = df.write.mode("overwrite").mode("default")
- val modeField =
classOf[DataFrameWriter[Tuple1[Int]]].getDeclaredField("mode")
+ val modeField = classOf[DataFrameWriter[_]].getDeclaredField("mode")
modeField.setAccessible(true)
assert(SaveMode.ErrorIfExists ===
modeField.get(writer).asInstanceOf[SaveMode])
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]