This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bb112e2089f5 [SPARK-52326][SQL] Add partitions related 
ExternalCatalogEvent and post them in corresponding operations
bb112e2089f5 is described below

commit bb112e2089f5028d3b9e0200a1211517ea139351
Author: Xiang Li <[email protected]>
AuthorDate: Wed Dec 17 09:25:34 2025 +0800

    [SPARK-52326][SQL] Add partitions related ExternalCatalogEvent and post 
them in corresponding operations
    
    ### What changes were proposed in this pull request?
    1. Add `partitions` related events, for the following external catalog 
operations: create / drop / alter / rename. A base "PartitionsEvent" is added 
by extending "TableEvent". One "PartitionsEvent" (and its subs) could contain 
the operation for one or more partitions. So it is named as "Partition`s`Event" 
(in the plural form of partition), so are its subs.
    
    2. Post those events in the corresponding external catalog operations
    
    ### Why are the changes needed?
    The operation list extracted from Spark events are of great help for user 
to summarize what have been done against a db/table/function/partition, for the 
purpose of logging and/or auditing.
    In 
[ExternalCatalogWithListener](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala),
 there are events posted against db, table and function for all registered 
listeners. But those operations against partition(s) do not have their events 
posted.
    
    ### Does this PR introduce _any_ user-facing change?
    With this change, partition(s) related operations are posted into register 
listener as events
    
    ### How was this patch tested?
    Enrich UT and also tested on local cluster
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53439 from waterlx/partitions_event.
    
    Authored-by: Xiang Li <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalog/ExternalCatalogWithListener.scala      |  10 ++
 .../apache/spark/sql/catalyst/catalog/events.scala |  85 ++++++++++++++++
 .../catalog/ExternalCatalogEventSuite.scala        | 108 +++++++++++++++++++++
 3 files changed, 203 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
index 33f088079caa..bfc47d2f348d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala
@@ -204,7 +204,10 @@ class ExternalCatalogWithListener(delegate: 
ExternalCatalog)
       table: String,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
+    val partSpecs = parts.map(_.spec)
+    postToAll(CreatePartitionsPreEvent(db, table, partSpecs))
     delegate.createPartitions(db, table, parts, ignoreIfExists)
+    postToAll(CreatePartitionsEvent(db, table, partSpecs))
   }
 
   override def dropPartitions(
@@ -214,7 +217,9 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
       ignoreIfNotExists: Boolean,
       purge: Boolean,
       retainData: Boolean): Unit = {
+    postToAll(DropPartitionsPreEvent(db, table, partSpecs))
     delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, 
retainData)
+    postToAll(DropPartitionsEvent(db, table, partSpecs))
   }
 
   override def renamePartitions(
@@ -222,14 +227,19 @@ class ExternalCatalogWithListener(delegate: 
ExternalCatalog)
       table: String,
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = {
+    postToAll(RenamePartitionsPreEvent(db, table, specs, newSpecs))
     delegate.renamePartitions(db, table, specs, newSpecs)
+    postToAll(RenamePartitionsEvent(db, table, specs, newSpecs))
   }
 
   override def alterPartitions(
       db: String,
       table: String,
       parts: Seq[CatalogTablePartition]): Unit = {
+    val partSpecs = parts.map(_.spec)
+    postToAll(AlterPartitionsPreEvent(db, table, partSpecs))
     delegate.alterPartitions(db, table, parts)
+    postToAll(AlterPartitionsEvent(db, table, partSpecs))
   }
 
   override def getPartition(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
index 974c225afbae..27f4f39aa5ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 
 /**
  * Event emitted by the external catalog when it is modified. Events are 
either fired before or
@@ -203,3 +204,87 @@ case class RenameFunctionEvent(
     name: String,
     newName: String)
   extends FunctionEvent
+
+/**
+ * Event fired when some partitions (of a table) are created, dropped, 
renamed, altered.
+ */
+trait PartitionsEvent extends TableEvent {
+  /**
+   * Specs of the partitions which are touched.
+   */
+  val partSpecs: Seq[TablePartitionSpec]
+}
+
+/**
+ * Event fired before some partitions (of a table) are created.
+ */
+case class CreatePartitionsPreEvent(
+    database: String,
+    name /* of table */: String,
+    partSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been created.
+ */
+case class CreatePartitionsEvent(
+    database: String,
+    name /* of table */: String,
+    partSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired before some partitions (of a table) are dropped.
+ */
+case class DropPartitionsPreEvent(
+    database: String,
+    name /* of table */ : String,
+    partSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been dropped.
+ */
+case class DropPartitionsEvent(
+    database: String,
+    name /* of table */ : String,
+    partSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired before some partitions (of a table) are renamed.
+ */
+case class RenamePartitionsPreEvent(
+    database: String,
+    name /* of table */ : String,
+    partSpecs: Seq[TablePartitionSpec],
+    newPartSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been renamed.
+ */
+case class RenamePartitionsEvent(
+    database: String,
+    name /* of table */ : String,
+    partSpecs: Seq[TablePartitionSpec],
+    newPartSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired before some partitions (of a table) are altered.
+ */
+case class AlterPartitionsPreEvent(
+    database: String,
+    name /* of table */ : String,
+    partSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
+
+/**
+ * Event fired after some partitions (of a table) have been altered.
+ */
+case class AlterPartitionsEvent(
+    database: String,
+    name /* of table */ : String,
+    partSpecs: Seq[TablePartitionSpec])
+  extends PartitionsEvent
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
index 15858bf2cc69..f332393e503f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -209,4 +209,112 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
     catalog.dropFunction("db5", "fn4")
     checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", 
"fn4") :: Nil)
   }
+
+  testWithCatalog("partitions") { (catalog, checkEvents) =>
+    // Prepare db
+    val db = "db1"
+    val dbUri = preparePath(Files.createTempDirectory(db + "_"))
+    val dbDefinition = CatalogDatabase(
+      name = db,
+      description = "",
+      locationUri = dbUri,
+      properties = Map.empty)
+
+    catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+    checkEvents(
+      CreateDatabasePreEvent(db) ::
+      CreateDatabaseEvent(db) :: Nil)
+
+    // Prepare table
+    val table = "table1"
+    val tableUri = preparePath(Files.createTempDirectory(table + "_"))
+    val tableDefinition = CatalogTable(
+      identifier = TableIdentifier(table, Some(db)),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty.copy(locationUri = 
Option(tableUri)),
+      schema = new StructType()
+        .add("year", "int")
+        .add("month", "int")
+        .add("sales", "long"))
+
+    catalog.createTable(tableDefinition, ignoreIfExists = false)
+    checkEvents(
+      CreateTablePreEvent(db, table) ::
+      CreateTableEvent(db, table) :: Nil)
+
+    // Prepare partitions
+    val storageFormat = CatalogStorageFormat(
+      locationUri = Some(tableUri),
+      inputFormat = Some("tableInputFormat"),
+      outputFormat = Some("tableOutputFormat"),
+      serde = None,
+      compressed = false,
+      properties = Map.empty)
+    val parts = Seq(CatalogTablePartition(Map("year" -> "2025", "month" -> 
"Jan"), storageFormat))
+    val partSpecs = parts.map(_.spec)
+
+    val newPartSpecs = Seq(Map("year" -> "2026", "month" -> "Feb"))
+
+    // CREATE
+    catalog.createPartitions(db, table, parts, ignoreIfExists = false)
+    checkEvents(
+      CreatePartitionsPreEvent(db, table, partSpecs) ::
+      CreatePartitionsEvent(db, table, partSpecs) :: Nil)
+
+    // Re-create with ignoreIfExists as true
+    catalog.createPartitions(db, table, parts, ignoreIfExists = true)
+    checkEvents(
+      CreatePartitionsPreEvent(db, table, partSpecs) ::
+      CreatePartitionsEvent(db, table, partSpecs) :: Nil)
+
+    // createPartitions() failed because re-creating with ignoreIfExists as 
false, so PreEvent only
+    intercept[AnalysisException] {
+      catalog.createPartitions(db, table, parts, ignoreIfExists = false)
+    }
+    checkEvents(CreatePartitionsPreEvent(db, table, partSpecs) :: Nil)
+
+    // ALTER
+    catalog.alterPartitions(db, table, parts)
+    checkEvents(
+      AlterPartitionsPreEvent(db, table, partSpecs) ::
+      AlterPartitionsEvent(db, table, partSpecs) ::
+      Nil)
+
+    // RENAME
+    catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
+    checkEvents(
+      RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) ::
+      RenamePartitionsEvent(db, table, partSpecs, newPartSpecs) :: Nil)
+
+    // renamePartitions() failed because partitions have been renamed 
according to newPartSpecs,
+    // so PreEvent only
+    intercept[AnalysisException] {
+      catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
+    }
+    checkEvents(RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) 
:: Nil)
+
+    // DROP
+    // dropPartitions() failed
+    // because partition of (old) partSpecs do not exist and ignoreIfNotExists 
is false,
+    // So PreEvent only
+    intercept[AnalysisException] {
+      catalog.dropPartitions(db, table, partSpecs,
+        ignoreIfNotExists = false, purge = true, retainData = true)
+    }
+    checkEvents(DropPartitionsPreEvent(db, table, partSpecs) :: Nil)
+
+    // Drop the renamed partitions
+    catalog.dropPartitions(db, table, newPartSpecs,
+      ignoreIfNotExists = false, purge = true, retainData = true)
+    checkEvents(
+      DropPartitionsPreEvent(db, table, newPartSpecs) ::
+      DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
+
+    // Re-drop with ignoreIfNotExists being true
+    catalog.dropPartitions(db, table, newPartSpecs,
+      ignoreIfNotExists = true, purge = true, retainData = true)
+    checkEvents(
+      DropPartitionsPreEvent(db, table, newPartSpecs) ::
+      DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to