This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bafce5de394b [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter 
API for Scala
bafce5de394b is described below

commit bafce5de394bafe63226ab5dae0ce3ac4245a793
Author: Jiaheng Tang <[email protected]>
AuthorDate: Thu Jul 25 12:58:11 2024 +0800

    [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala
    
    ### What changes were proposed in this pull request?
    
    Introduce a new `clusterBy` DataFrame API in Scala. This PR adds the API 
for both the DataFrameWriter V1 and V2, as well as Spark Connect.
    
    ### Why are the changes needed?
    
    Introduce more ways for users to interact with clustered tables.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it adds a new `clusterBy` DataFrame API in Scala to allow specifying 
the clustering columns when writing DataFrames.
    
    ### How was this patch tested?
    
    New unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47301 from zedtang/clusterby-scala-api.
    
    Authored-by: Jiaheng Tang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json | 14 ++++
 .../sql/connect/planner/SparkConnectPlanner.scala  | 10 +++
 .../org/apache/spark/sql/connect/dsl/package.scala |  4 ++
 .../connect/planner/SparkConnectProtoSuite.scala   | 42 ++++++++++++
 .../org/apache/spark/sql/DataFrameWriter.scala     | 19 ++++++
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 23 +++++++
 .../org/apache/spark/sql/ClientDatasetSuite.scala  |  4 ++
 project/MimaExcludes.scala                         |  4 +-
 .../spark/sql/catalyst/catalog/interface.scala     | 31 +++++++++
 .../spark/sql/errors/QueryCompilationErrors.scala  | 30 +++++++++
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  4 ++
 .../org/apache/spark/sql/DataFrameWriter.scala     | 66 +++++++++++++++++--
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 40 ++++++++++-
 .../execution/datasources/DataSourceUtils.scala    |  5 ++
 .../spark/sql/execution/datasources/rules.scala    | 19 +++++-
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 50 +++++++++++++-
 .../execution/command/DescribeTableSuiteBase.scala | 51 ++++++++++++++
 .../sql/test/DataFrameReaderWriterSuite.scala      | 77 +++++++++++++++++++++-
 18 files changed, 482 insertions(+), 11 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 44f0a59a4b48..65e063518054 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -471,6 +471,20 @@
     ],
     "sqlState" : "0A000"
   },
+  "CLUSTERING_COLUMNS_MISMATCH" : {
+    "message" : [
+      "Specified clustering does not match that of the existing table 
<tableName>.",
+      "Specified clustering columns: [<specifiedClusteringString>].",
+      "Existing clustering columns: [<existingClusteringString>]."
+    ],
+    "sqlState" : "42P10"
+  },
+  "CLUSTERING_NOT_SUPPORTED" : {
+    "message" : [
+      "'<operation>' does not support clustering."
+    ],
+    "sqlState" : "42000"
+  },
   "CODEC_NOT_AVAILABLE" : {
     "message" : [
       "The codec <codecName> is not available."
diff --git 
a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 013f0d83391c..e790a25ec97f 100644
--- 
a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3090,6 +3090,11 @@ class SparkConnectPlanner(
       w.partitionBy(names.toSeq: _*)
     }
 
+    if (writeOperation.getClusteringColumnsCount > 0) {
+      val names = writeOperation.getClusteringColumnsList.asScala
+      w.clusterBy(names.head, names.tail.toSeq: _*)
+    }
+
     if (writeOperation.hasSource) {
       w.format(writeOperation.getSource)
     }
@@ -3153,6 +3158,11 @@ class SparkConnectPlanner(
       w.partitionedBy(names.head, names.tail: _*)
     }
 
+    if (writeOperation.getClusteringColumnsCount > 0) {
+      val names = writeOperation.getClusteringColumnsList.asScala
+      w.clusterBy(names.head, names.tail.toSeq: _*)
+    }
+
     writeOperation.getMode match {
       case proto.WriteOperationV2.Mode.MODE_CREATE =>
         if (writeOperation.hasProvider) {
diff --git 
a/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala 
b/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
index 3edb63ee8e81..fdbfc39cc9a5 100644
--- 
a/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -219,6 +219,7 @@ package object dsl {
           mode: Option[String] = None,
           sortByColumns: Seq[String] = Seq.empty,
           partitionByCols: Seq[String] = Seq.empty,
+          clusterByCols: Seq[String] = Seq.empty,
           bucketByCols: Seq[String] = Seq.empty,
           numBuckets: Option[Int] = None): Command = {
         val writeOp = WriteOperation.newBuilder()
@@ -242,6 +243,7 @@ package object dsl {
         }
         sortByColumns.foreach(writeOp.addSortColumnNames(_))
         partitionByCols.foreach(writeOp.addPartitioningColumns(_))
+        clusterByCols.foreach(writeOp.addClusteringColumns(_))
 
         if (numBuckets.nonEmpty && bucketByCols.nonEmpty) {
           val op = WriteOperation.BucketBy.newBuilder()
@@ -272,6 +274,7 @@ package object dsl {
           options: Map[String, String] = Map.empty,
           tableProperties: Map[String, String] = Map.empty,
           partitionByCols: Seq[Expression] = Seq.empty,
+          clusterByCols: Seq[String] = Seq.empty,
           mode: Option[String] = None,
           overwriteCondition: Option[Expression] = None): Command = {
         val writeOp = WriteOperationV2.newBuilder()
@@ -279,6 +282,7 @@ package object dsl {
         tableName.foreach(writeOp.setTableName)
         provider.foreach(writeOp.setProvider)
         partitionByCols.foreach(writeOp.addPartitioningColumns)
+        clusterByCols.foreach(writeOp.addClusteringColumns)
         options.foreach { case (k, v) =>
           writeOp.putOptions(k, v)
         }
diff --git 
a/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 6721555220fe..190f8cde16f5 100644
--- 
a/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -596,6 +596,48 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
     }
   }
 
+  test("Write with clustering") {
+    // Cluster by existing column.
+    withTable("testtable") {
+      transform(
+        localRelation.write(
+          tableName = Some("testtable"),
+          tableSaveMethod = Some("save_as_table"),
+          format = Some("parquet"),
+          clusterByCols = Seq("id")))
+    }
+
+    // Cluster by non-existing column.
+    assertThrows[AnalysisException](
+      transform(
+        localRelation
+          .write(
+            tableName = Some("testtable"),
+            tableSaveMethod = Some("save_as_table"),
+            format = Some("parquet"),
+            clusterByCols = Seq("noid"))))
+  }
+
+  test("Write V2 with clustering") {
+    // Cluster by existing column.
+    withTable("testtable") {
+      transform(
+        localRelation.writeV2(
+          tableName = Some("testtable"),
+          mode = Some("MODE_CREATE"),
+          clusterByCols = Seq("id")))
+    }
+
+    // Cluster by non-existing column.
+    assertThrows[AnalysisException](
+      transform(
+        localRelation
+          .writeV2(
+            tableName = Some("testtable"),
+            mode = Some("MODE_CREATE"),
+            clusterByCols = Seq("noid"))))
+  }
+
   test("Write with invalid bucketBy configuration") {
     val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = 
Some(0))
     assertThrows[InvalidCommandInput] {
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b499c2a5d909..6f64f0204602 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
     this
   }
 
+  /**
+   * Clusters the output by the given columns on the storage. The rows with 
matching values in the
+   * specified clustering columns will be consolidated within the same group.
+   *
+   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
+   * together in a file. This arrangement improves query efficiency when you 
apply selective
+   * filters to these clustering columns, thanks to data skipping.
+   *
+   * @since 4.0.0
+   */
+  @scala.annotation.varargs
+  def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
+    this.clusteringColumns = Option(colName +: colNames)
+    this
+  }
+
   /**
    * Saves the content of the `DataFrame` at the specified path.
    *
@@ -242,6 +258,7 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
     source.foreach(builder.setSource)
     sortColumnNames.foreach(names => 
builder.addAllSortColumnNames(names.asJava))
     partitioningColumns.foreach(cols => 
builder.addAllPartitioningColumns(cols.asJava))
+    clusteringColumns.foreach(cols => 
builder.addAllClusteringColumns(cols.asJava))
 
     numBuckets.foreach(n => {
       val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder()
@@ -509,4 +526,6 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
   private var numBuckets: Option[Int] = None
 
   private var sortColumnNames: Option[Seq[String]] = None
+
+  private var clusteringColumns: Option[Seq[String]] = None
 }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index c419c9079f02..dd0fc2fb35d2 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -41,6 +41,8 @@ final class DataFrameWriterV2[T] private[sql] (table: String, 
ds: Dataset[T])
 
   private var partitioning: Option[Seq[proto.Expression]] = None
 
+  private var clustering: Option[Seq[String]] = None
+
   private var overwriteCondition: Option[proto.Expression] = None
 
   override def using(provider: String): CreateTableWriter[T] = {
@@ -77,6 +79,12 @@ final class DataFrameWriterV2[T] private[sql] (table: 
String, ds: Dataset[T])
     this
   }
 
+  @scala.annotation.varargs
+  override def clusterBy(colName: String, colNames: String*): 
CreateTableWriter[T] = {
+    this.clustering = Some(colName +: colNames)
+    this
+  }
+
   override def create(): Unit = {
     executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE)
   }
@@ -133,6 +141,7 @@ final class DataFrameWriterV2[T] private[sql] (table: 
String, ds: Dataset[T])
     provider.foreach(builder.setProvider)
 
     partitioning.foreach(columns => 
builder.addAllPartitioningColumns(columns.asJava))
+    clustering.foreach(columns => 
builder.addAllClusteringColumns(columns.asJava))
 
     options.foreach { case (k, v) =>
       builder.putOptions(k, v)
@@ -252,8 +261,22 @@ trait CreateTableWriter[T] extends 
WriteConfigMethods[CreateTableWriter[T]] {
    *
    * @since 3.4.0
    */
+  @scala.annotation.varargs
   def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T]
 
+  /**
+   * Clusters the output by the given columns on the storage. The rows with 
matching values in the
+   * specified clustering columns will be consolidated within the same group.
+   *
+   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
+   * together in a file. This arrangement improves query efficiency when you 
apply selective
+   * filters to these clustering columns, thanks to data skipping.
+   *
+   * @since 4.0.0
+   */
+  @scala.annotation.varargs
+  def clusterBy(colName: String, colNames: String*): CreateTableWriter[T]
+
   /**
    * Specifies a provider for the underlying output data source. Spark's 
default catalog supports
    * "parquet", "json", etc.
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
index 9d6f07cf603a..c69cbcf6332e 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
@@ -85,6 +85,7 @@ class ClientDatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
           .setNumBuckets(2)
           .addBucketColumnNames("col1")
           .addBucketColumnNames("col2"))
+      .addClusteringColumns("col3")
 
     val expectedPlan = proto.Plan
       .newBuilder()
@@ -95,6 +96,7 @@ class ClientDatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
       .sortBy("col1")
       .partitionBy("col99")
       .bucketBy(2, "col1", "col2")
+      .clusterBy("col3")
       .parquet("my/test/path")
     val actualPlan = service.getAndClearLatestInputPlan()
     assert(actualPlan.equals(expectedPlan))
@@ -136,6 +138,7 @@ class ClientDatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
       .setTableName("t1")
       .addPartitioningColumns(col("col99").expr)
       .setProvider("json")
+      .addClusteringColumns("col3")
       .putTableProperties("key", "value")
       .putOptions("key2", "value2")
       .setMode(proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE)
@@ -147,6 +150,7 @@ class ClientDatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 
     df.writeTo("t1")
       .partitionedBy(col("col99"))
+      .clusterBy("col3")
       .using("json")
       .tableProperty("key", "value")
       .options(Map("key2" -> "value2"))
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 8cf872e4dd0f..c126d12b1473 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -100,7 +100,9 @@ object MimaExcludes {
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext#implicits._sqlContext"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits._sqlContext"),
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.session"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SparkSession#implicits._sqlContext")
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SparkSession#implicits._sqlContext"),
+    // SPARK-48761: Add clusterBy() to CreateTableWriter.
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.CreateTableWriter.clusterBy")
   )
 
   // Default exclude rules
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index c281b0df8a6d..dcd1d3137da3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -197,10 +197,22 @@ object ClusterBySpec {
     ret
   }
 
+  /**
+   * Converts the clustering column property to a ClusterBySpec.
+   */
   def fromProperty(columns: String): ClusterBySpec = {
     
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
   }
 
+  /**
+   * Converts a ClusterBySpec to a clustering column property map entry, with 
validation
+   * of the column names against the schema.
+   *
+   * @param schema the schema of the table.
+   * @param clusterBySpec the ClusterBySpec to be converted to a property.
+   * @param resolver the resolver used to match the column names.
+   * @return a map entry for the clustering column property.
+   */
   def toProperty(
       schema: StructType,
       clusterBySpec: ClusterBySpec,
@@ -209,10 +221,25 @@ object ClusterBySpec {
       normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson
   }
 
+  /**
+   * Converts a ClusterBySpec to a clustering column property map entry, 
without validating
+   * the column names against the schema.
+   *
+   * @param clusterBySpec existing ClusterBySpec to be converted to properties.
+   * @return a map entry for the clustering column property.
+   */
+  def toPropertyWithoutValidation(clusterBySpec: ClusterBySpec): (String, 
String) = {
+    (CatalogTable.PROP_CLUSTERING_COLUMNS -> clusterBySpec.toJson)
+  }
+
   private def normalizeClusterBySpec(
       schema: StructType,
       clusterBySpec: ClusterBySpec,
       resolver: Resolver): ClusterBySpec = {
+    if (schema.isEmpty) {
+      return clusterBySpec
+    }
+
     val normalizedColumns = clusterBySpec.columnNames.map { columnName =>
       val position = SchemaUtils.findColumnPosition(
         columnName.fieldNames().toImmutableArraySeq, schema, resolver)
@@ -239,6 +266,10 @@ object ClusterBySpec {
     val normalizedClusterBySpec = normalizeClusterBySpec(schema, 
clusterBySpec, resolver)
     ClusterByTransform(normalizedClusterBySpec.columnNames)
   }
+
+  def fromColumnNames(names: Seq[String]): ClusterBySpec = {
+    ClusterBySpec(names.map(FieldReference(_)))
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9de4fa2f6b26..f4169d8054fe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1866,6 +1866,18 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "existingBucketString" -> existingBucketString))
   }
 
+  def mismatchedTableClusteringError(
+      tableName: String,
+      specifiedClusteringString: String,
+      existingClusteringString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CLUSTERING_COLUMNS_MISMATCH",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "specifiedClusteringString" -> specifiedClusteringString,
+        "existingClusteringString" -> existingClusteringString))
+  }
+
   def specifyPartitionNotAllowedWhenTableSchemaNotDefinedError(): Throwable = {
     new AnalysisException(
       errorClass = "_LEGACY_ERROR_TEMP_1165",
@@ -4100,4 +4112,22 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("functionName" -> functionName)
     )
   }
+
+  def operationNotSupportClusteringError(operation: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CLUSTERING_NOT_SUPPORTED",
+      messageParameters = Map("operation" -> operation))
+  }
+
+  def clusterByWithPartitionedBy(): Throwable = {
+    new AnalysisException(
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+      messageParameters = Map.empty)
+  }
+
+  def clusterByWithBucketing(): Throwable = {
+    new AnalysisException(
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+      messageParameters = Map.empty)
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 505a5a616920..852e39931626 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -194,6 +194,10 @@ abstract class InMemoryBaseTable(
           case (v, t) =>
             throw new IllegalArgumentException(s"Match: unsupported 
argument(s) type - ($v, $t)")
         }
+      case ClusterByTransform(columnNames) =>
+        columnNames.map { colName =>
+          extractor(colName.fieldNames, cleanedSchema, row)._1
+        }
     }.toImmutableArraySeq
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2d6d5f0e8b2b..991487170f17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, 
CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, 
TableCatalog, TableProvider, V1Table}
 import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -193,6 +193,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   @scala.annotation.varargs
   def partitionBy(colNames: String*): DataFrameWriter[T] = {
     this.partitioningColumns = Option(colNames)
+    validatePartitioning()
     this
   }
 
@@ -210,6 +211,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
DataFrameWriter[T] = {
     this.numBuckets = Option(numBuckets)
     this.bucketColumnNames = Option(colName +: colNames)
+    validatePartitioning()
     this
   }
 
@@ -227,6 +229,23 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     this
   }
 
+  /**
+   * Clusters the output by the given columns on the storage. The rows with 
matching values in the
+   * specified clustering columns will be consolidated within the same group.
+   *
+   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
+   * together in a file. This arrangement improves query efficiency when you 
apply selective
+   * filters to these clustering columns, thanks to data skipping.
+   *
+   * @since 4.0
+   */
+  @scala.annotation.varargs
+  def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = {
+    this.clusteringColumns = Option(colName +: colNames)
+    validatePartitioning()
+    this
+  }
+
   /**
    * Saves the content of the `DataFrame` at the specified path.
    *
@@ -377,6 +396,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
         DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
         DataSourceUtils.encodePartitioningColumns(columns))
     }
+    clusteringColumns.foreach { columns =>
+      extraOptions = extraOptions + (
+        DataSourceUtils.CLUSTERING_COLUMNS_KEY ->
+        DataSourceUtils.encodePartitioningColumns(columns))
+    }
 
     val optionsWithPath = getOptionsWithPath(path)
 
@@ -515,6 +539,12 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     }
   }
 
+  private def assertNotClustered(operation: String): Unit = {
+    if (clusteringColumns.isDefined) {
+      throw 
QueryCompilationErrors.operationNotSupportClusteringError(operation)
+    }
+  }
+
   /**
    * Saves the content of the `DataFrame` as the specified table.
    *
@@ -688,6 +718,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       CatalogTableType.MANAGED
     }
 
+    val properties = if (clusteringColumns.isEmpty) {
+      Map.empty[String, String]
+    } else {
+      Map(ClusterBySpec.toPropertyWithoutValidation(
+        ClusterBySpec.fromColumnNames(clusteringColumns.get)))
+    }
+
     val tableDesc = CatalogTable(
       identifier = tableIdent,
       tableType = tableType,
@@ -695,7 +732,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
       schema = new StructType,
       provider = Some(source),
       partitionColumnNames = partitioningColumns.getOrElse(Nil),
-      bucketSpec = getBucketSpec)
+      bucketSpec = getBucketSpec,
+      properties = properties)
 
     runCommand(df.sparkSession)(
       CreateTable(tableDesc, mode, Some(df.logicalPlan)))
@@ -708,7 +746,10 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     }.getOrElse(Seq.empty[Transform])
     val bucketing =
       getBucketSpec.map(spec => 
CatalogV2Implicits.BucketSpecHelper(spec).asTransform).toSeq
-    partitioning ++ bucketing
+    val clustering = clusteringColumns.map { colNames =>
+      ClusterByTransform(colNames.map(FieldReference(_)))
+    }
+    partitioning ++ bucketing ++ clustering
   }
 
   /**
@@ -719,11 +760,25 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     val v2Partitions = partitioningAsV2
     if (v2Partitions.isEmpty) return
     require(v2Partitions.sameElements(existingTable.partitioning()),
-      "The provided partitioning does not match of the table.\n" +
+      "The provided partitioning or clustering columns do not match the 
existing table's.\n" +
       s" - provided: ${v2Partitions.mkString(", ")}\n" +
       s" - table: ${existingTable.partitioning().mkString(", ")}")
   }
 
+  /**
+   * Validate that clusterBy is not used with partitionBy or bucketBy.
+   */
+  private def validatePartitioning(): Unit = {
+    if (clusteringColumns.nonEmpty) {
+      if (partitioningColumns.nonEmpty) {
+        throw QueryCompilationErrors.clusterByWithPartitionedBy()
+      }
+      if (getBucketSpec.nonEmpty) {
+        throw QueryCompilationErrors.clusterByWithBucketing()
+      }
+    }
+  }
+
   /**
    * Saves the content of the `DataFrame` to an external database table via 
JDBC. In the case the
    * table already exists in the external database, behavior of this function 
depends on the
@@ -750,6 +805,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
     assertNotPartitioned("jdbc")
     assertNotBucketed("jdbc")
+    assertNotClustered("jdbc")
     // connectionProperties should override settings in extraOptions.
     this.extraOptions ++= connectionProperties.asScala
     // explicit url and dbtable should override all
@@ -917,4 +973,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   private var numBuckets: Option[Int] = None
 
   private var sortColumnNames: Option[Seq[String]] = None
+
+  private var clusteringColumns: Option[Seq[String]] = None
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index b68a13ba2159..df1f8b5c6dfe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -24,7 +24,7 @@ import org.apache.spark.annotation.Experimental
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, 
Hours, Literal, Months, Years}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
-import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference, Transform}
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference, LogicalExpressions, NamedReference, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.types.IntegerType
@@ -54,6 +54,8 @@ final class DataFrameWriterV2[T] private[sql](table: String, 
ds: Dataset[T])
 
   private var partitioning: Option[Seq[Transform]] = None
 
+  private var clustering: Option[ClusterByTransform] = None
+
   override def using(provider: String): CreateTableWriter[T] = {
     this.provider = Some(provider)
     this
@@ -104,9 +106,27 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
     }
 
     this.partitioning = Some(asTransforms)
+    validatePartitioning()
+    this
+  }
+
+  @scala.annotation.varargs
+  override def clusterBy(colName: String, colNames: String*): 
CreateTableWriter[T] = {
+    this.clustering =
+      Some(ClusterByTransform((colName +: colNames).map(col => 
FieldReference(col))))
+    validatePartitioning()
     this
   }
 
+  /**
+   * Validate that clusterBy is not used with partitionBy.
+   */
+  private def validatePartitioning(): Unit = {
+    if (partitioning.nonEmpty && clustering.nonEmpty) {
+      throw QueryCompilationErrors.clusterByWithPartitionedBy()
+    }
+  }
+
   override def create(): Unit = {
     val tableSpec = UnresolvedTableSpec(
       properties = properties.toMap,
@@ -119,7 +139,7 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
     runCommand(
       CreateTableAsSelect(
         UnresolvedIdentifier(tableName),
-        partitioning.getOrElse(Seq.empty),
+        partitioning.getOrElse(Seq.empty) ++ clustering,
         logicalPlan,
         tableSpec,
         options.toMap,
@@ -207,7 +227,7 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
       external = false)
     runCommand(ReplaceTableAsSelect(
       UnresolvedIdentifier(tableName),
-      partitioning.getOrElse(Seq.empty),
+      partitioning.getOrElse(Seq.empty) ++ clustering,
       logicalPlan,
       tableSpec,
       writeOptions = options.toMap,
@@ -328,8 +348,22 @@ trait CreateTableWriter[T] extends 
WriteConfigMethods[CreateTableWriter[T]] {
    *
    * @since 3.0.0
    */
+  @scala.annotation.varargs
   def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T]
 
+  /**
+   * Clusters the output by the given columns on the storage. The rows with 
matching values in
+   * the specified clustering columns will be consolidated within the same 
group.
+   *
+   * For instance, if you cluster a dataset by date, the data sharing the same 
date will be stored
+   * together in a file. This arrangement improves query efficiency when you 
apply selective
+   * filters to these clustering columns, thanks to data skipping.
+   *
+   * @since 4.0.0
+   */
+  @scala.annotation.varargs
+  def clusterBy(colName: String, colNames: String*): CreateTableWriter[T]
+
   /**
    * Specifies a provider for the underlying output data source. Spark's 
default catalog supports
    * "parquet", "json", etc.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index c80dc8307967..81eadcc263c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -52,6 +52,11 @@ object DataSourceUtils extends PredicateHelper {
    */
   val PARTITION_OVERWRITE_MODE = "partitionOverwriteMode"
 
+  /**
+   * The key to use for storing clusterBy columns as options.
+   */
+  val CLUSTERING_COLUMNS_KEY = "__clustering_columns"
+
   /**
    * Utility methods for converting partitionBy columns to options and back.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5265c24b140d..9bc0793650ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -204,6 +204,18 @@ case class PreprocessTableCreation(catalog: 
SessionCatalog) extends Rule[Logical
           tableName, specifiedBucketString, existingBucketString)
       }
 
+      // Check if the specified clustering columns match the existing table.
+      val specifiedClusterBySpec = tableDesc.clusterBySpec
+      val existingClusterBySpec = existingTable.clusterBySpec
+      if (specifiedClusterBySpec != existingClusterBySpec) {
+        val specifiedClusteringString =
+          specifiedClusterBySpec.map(_.toString).getOrElse("")
+        val existingClusteringString =
+          existingClusterBySpec.map(_.toString).getOrElse("")
+        throw QueryCompilationErrors.mismatchedTableClusteringError(
+          tableName, specifiedClusteringString, existingClusteringString)
+      }
+
       val newQuery = if (adjustedColumns != query.output) {
         Project(adjustedColumns, query)
       } else {
@@ -325,7 +337,12 @@ case class PreprocessTableCreation(catalog: 
SessionCatalog) extends Rule[Logical
       }
     }
 
-    table.copy(partitionColumnNames = normalizedPartCols, bucketSpec = 
normalizedBucketSpec)
+    val normalizedProperties = table.properties ++ table.clusterBySpec.map { 
spec =>
+      ClusterBySpec.toProperty(schema, spec, conf.resolver)
+    }
+
+    table.copy(partitionColumnNames = normalizedPartCols, bucketSpec = 
normalizedBucketSpec,
+      properties = normalizedProperties)
   }
 
   private def normalizePartitionColumns(schema: StructType, table: 
CatalogTable): Seq[String] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 44d47abc93fa..2275d8c21397 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Ove
 import org.apache.spark.sql.connector.InMemoryV1Provider
 import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, 
InMemoryTableCatalog, TableCatalog}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.connector.expressions.{BucketTransform, 
DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, 
MonthsTransform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{BucketTransform, 
ClusterByTransform, DaysTransform, FieldReference, HoursTransform, 
IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -524,6 +524,18 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
         Seq(BucketTransform(LiteralValue(4, IntegerType), 
Seq(FieldReference("id")))))
   }
 
+  test("Create: cluster by") {
+    spark.table("source")
+      .writeTo("testcat.table_name")
+      .clusterBy("id")
+      .create()
+
+    val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
+
+    assert(table.name === "testcat.table_name")
+    assert(table.partitioning === 
Seq(ClusterByTransform(Seq(FieldReference("id")))))
+  }
+
   test("Create: fail if table already exists") {
     spark.sql(
       "CREATE TABLE testcat.table_name (id bigint, data string) USING foo 
PARTITIONED BY (id)")
@@ -634,6 +646,42 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
     assert(replaced.properties === defaultOwnership.asJava)
   }
 
+  test("Replace: clustered table") {
+    spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING 
foo")
+    spark.sql("INSERT INTO TABLE testcat.table_name SELECT * FROM source")
+
+    checkAnswer(
+      spark.table("testcat.table_name"),
+      Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c")))
+
+    val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
+
+    // validate the initial table
+    assert(table.name === "testcat.table_name")
+    assert(table.schema === new StructType().add("id", LongType).add("data", 
StringType))
+    assert(table.partitioning.isEmpty)
+    assert(table.properties === (Map("provider" -> "foo") ++ 
defaultOwnership).asJava)
+
+    spark.table("source2")
+        .withColumn("even_or_odd", when(($"id" % 2) === 0, 
"even").otherwise("odd"))
+        .writeTo("testcat.table_name").clusterBy("id").replace()
+
+    checkAnswer(
+      spark.table("testcat.table_name"),
+      Seq(Row(4L, "d", "even"), Row(5L, "e", "odd"), Row(6L, "f", "even")))
+
+    val replaced = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
+
+    // validate the replacement table
+    assert(replaced.name === "testcat.table_name")
+    assert(replaced.schema === new StructType()
+        .add("id", LongType)
+        .add("data", StringType)
+        .add("even_or_odd", StringType))
+    assert(replaced.partitioning === 
Seq(ClusterByTransform(Seq(FieldReference("id")))))
+    assert(replaced.properties === defaultOwnership.asJava)
+  }
+
   test("Replace: fail if table does not exist") {
     val exc = intercept[CannotReplaceMissingTableException] {
       spark.table("source").writeTo("testcat.table_name").replace()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
index 02e8a5e68999..c4e9ff93ef85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.functions.{col, struct}
 import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, 
StructType}
 
 /**
@@ -242,4 +243,54 @@ trait DescribeTableSuiteBase extends QueryTest with 
DDLCommandTestUtils {
           Row("# col_name", "data_type", "comment")))
     }
   }
+
+  test("describe a clustered table - dataframe writer v1") {
+    withNamespaceAndTable("ns", "tbl") { tbl =>
+      val df = spark.range(10).select(
+        col("id").cast("string").as("col1"),
+        struct(col("id").cast("int").as("x"), 
col("id").cast("int").as("y")).as("col2"))
+      df.write.mode("append").clusterBy("col1", "col2.x").saveAsTable(tbl)
+      val descriptionDf = sql(s"DESC $tbl")
+
+      descriptionDf.show(false)
+      assert(descriptionDf.schema.map(field => (field.name, field.dataType)) 
=== Seq(
+        ("col_name", StringType),
+        ("data_type", StringType),
+        ("comment", StringType)))
+      QueryTest.checkAnswer(
+        descriptionDf,
+        Seq(
+          Row("col1", "string", null),
+          Row("col2", "struct<x:int,y:int>", null),
+          Row("# Clustering Information", "", ""),
+          Row("# col_name", "data_type", "comment"),
+          Row("col2.x", "int", null),
+          Row("col1", "string", null)))
+    }
+  }
+
+  test("describe a clustered table - dataframe writer v2") {
+    withNamespaceAndTable("ns", "tbl") { tbl =>
+      val df = spark.range(10).select(
+        col("id").cast("string").as("col1"),
+        struct(col("id").cast("int").as("x"), 
col("id").cast("int").as("y")).as("col2"))
+      df.writeTo(tbl).clusterBy("col1", "col2.x").create()
+      val descriptionDf = sql(s"DESC $tbl")
+
+      descriptionDf.show(false)
+      assert(descriptionDf.schema.map(field => (field.name, field.dataType)) 
=== Seq(
+        ("col_name", StringType),
+        ("data_type", StringType),
+        ("comment", StringType)))
+      QueryTest.checkAnswer(
+        descriptionDf,
+        Seq(
+          Row("col1", "string", null),
+          Row("col2", "struct<x:int,y:int>", null),
+          Row("# Clustering Information", "", ""),
+          Row("# col_name", "data_type", "comment"),
+          Row("col2.x", "int", null),
+          Row("col1", "string", null)))
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 603ee74ce333..377c422b22ac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -286,6 +286,75 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
     assert(DataSourceUtils.decodePartitioningColumns(partColumns) === 
Seq("col1", "col2"))
   }
 
+  test("pass clusterBy as options") {
+    Seq(1).toDF().write
+      .format("org.apache.spark.sql.test")
+      .clusterBy("col1", "col2")
+      .save()
+
+    val clusteringColumns = 
LastOptions.parameters(DataSourceUtils.CLUSTERING_COLUMNS_KEY)
+    assert(DataSourceUtils.decodePartitioningColumns(clusteringColumns) === 
Seq("col1", "col2"))
+  }
+
+  test("Clustering columns should match when appending to existing data source 
tables") {
+    import testImplicits._
+    val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+    withTable("clusteredTable") {
+      df.write.mode("overwrite").clusterBy("a", 
"b").saveAsTable("clusteredTable")
+      // Misses some clustering columns
+      checkError(
+        exception = intercept[AnalysisException] {
+          df.write.mode("append").clusterBy("a").saveAsTable("clusteredTable")
+        },
+        errorClass = "CLUSTERING_COLUMNS_MISMATCH",
+        parameters = Map(
+          "tableName" -> "spark_catalog.default.clusteredtable",
+          "specifiedClusteringString" -> """[["a"]]""",
+          "existingClusteringString" -> """[["a"],["b"]]""")
+      )
+      // Wrong order
+      checkError(
+        exception = intercept[AnalysisException] {
+          df.write.mode("append").clusterBy("b", 
"a").saveAsTable("clusteredTable")
+        },
+        errorClass = "CLUSTERING_COLUMNS_MISMATCH",
+        parameters = Map(
+          "tableName" -> "spark_catalog.default.clusteredtable",
+          "specifiedClusteringString" -> """[["b"],["a"]]""",
+          "existingClusteringString" -> """[["a"],["b"]]""")
+      )
+      // Clustering columns not specified
+      checkError(
+        exception = intercept[AnalysisException] {
+          df.write.mode("append").saveAsTable("clusteredTable")
+        },
+        errorClass = "CLUSTERING_COLUMNS_MISMATCH",
+        parameters = Map(
+          "tableName" -> "spark_catalog.default.clusteredtable",
+          "specifiedClusteringString" -> "", "existingClusteringString" -> 
"""[["a"],["b"]]""")
+      )
+      assert(sql("select * from clusteredTable").collect().length == 1)
+      // Inserts new data successfully when clustering columns are correctly 
specified in
+      // clusterBy(...).
+      Seq((4, 5, 6)).toDF("a", "b", "c")
+        .write
+        .mode("append")
+        .clusterBy("a", "b")
+        .saveAsTable("clusteredTable")
+
+      Seq((7, 8, 9)).toDF("a", "b", "c")
+        .write
+        .mode("append")
+        .clusterBy("a", "b")
+        .saveAsTable("clusteredTable")
+
+      checkAnswer(
+        sql("select a, b, c from clusteredTable"),
+        Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil
+      )
+    }
+  }
+
   test ("SPARK-29537: throw exception when user defined a wrong base path") {
     withTempPath { p =>
       val path = new Path(p.toURI).toString
@@ -490,7 +559,7 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
     assert(LastOptions.parameters("doubleOpt") == "6.7")
   }
 
-  test("check jdbc() does not support partitioning, bucketBy or sortBy") {
+  test("check jdbc() does not support partitioning, bucketBy, clusterBy or 
sortBy") {
     val df = spark.read.text(Utils.createTempDir(namePrefix = 
"text").getCanonicalPath)
 
     var w = df.write.partitionBy("value")
@@ -505,6 +574,12 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
       
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
     }
 
+    w = df.write.clusterBy("value")
+    e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("jdbc", "clustering").foreach { s =>
+      
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
+    }
+
     w = df.write.sortBy("value")
     e = intercept[AnalysisException](w.jdbc(null, null, null))
     Seq("sortBy must be used together with bucketBy").foreach { s =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to