This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 85b351bba320 [SPARK-55976][SQL] Use Set instead of Seq for write 
privileges
85b351bba320 is described below

commit 85b351bba320dedb29534dd8c409ff9087d8aa66
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Mar 12 13:23:21 2026 -0700

    [SPARK-55976][SQL] Use Set instead of Seq for write privileges
    
    ### What changes were proposed in this pull request?
    
    This PR migrates code handling write privileges to use `Set` instead of 
`Seq`.
    
    ### Why are the changes needed?
    
      The current implementation uses `Seq` throughout the codebase, but this 
choice is semantically incorrect and introduces unnecessary overhead:
    
      1. The underlying Java API in `TableCatalog` expects 
`Set<TableWritePrivilege>`:
    ```
      default Table loadTable(
          Identifier ident,
          Set<TableWritePrivilege> writePrivileges) throws NoSuchTableException
    ```
      2. The code converts to `Set` internally to deduplicate, then immediately 
converts back to `Seq`:
    ```
      (matchedActions ++ notMatchedActions ++ notMatchedBySourceActions)
        .collect { ... }
        .toSet   // deduplicate
        .toSeq   // convert back for no reason
    ```
      3.  `TableWritePrivilege` is an enum with 3 values (INSERT, UPDATE, 
DELETE) representing capability flags, not ordered operations. Using `Seq` 
suggests that the order matters (it doesn't) and that duplicates are meaningful 
(they aren't).
      4. At the boundary between Scala and Java, we already convert to Set:
    
    ```
      val writePrivileges = writePrivilegesString.get.split(",")
        .map(_.trim)
        .map(TableWritePrivilege.valueOf)
        .toSet  // Already converting to Set
        .asJava
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude Sonnet 4.5 (claude-sonnet-4-5-20250929)
    
    Closes #54773 from aokolnychyi/spark-55976.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/catalyst/analysis/unresolved.scala   |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 25 +++++++++++-----------
 .../sql/catalyst/plans/logical/v2Commands.scala    |  5 ++---
 .../apache/spark/sql/classic/DataFrameWriter.scala |  6 +++---
 .../spark/sql/classic/DataFrameWriterV2.scala      |  6 +++---
 .../command/v2/CreateFlowCommandSuite.scala        |  2 +-
 6 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 5726c573ee7d..622b0c319f99 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -125,7 +125,7 @@ case class UnresolvedRelation(
 
   override def name: String = tableName
 
-  def requireWritePrivileges(privileges: Seq[TableWritePrivilege]): 
UnresolvedRelation = {
+  def requireWritePrivileges(privileges: Set[TableWritePrivilege]): 
UnresolvedRelation = {
     if (privileges.nonEmpty) {
       val newOptions = new java.util.HashMap[String, String]
       newOptions.putAll(options)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a12add91f10e..36c9ef453cea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.parser
 import java.util.{List, Locale}
 import java.util.concurrent.TimeUnit
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Set}
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer}
 import scala.jdk.CollectionConverters._
 
 import org.antlr.v4.runtime.{ParserRuleContext, RuleContext, Token}
@@ -904,7 +905,7 @@ class AstBuilder extends DataTypeAstBuilder
               ctx = insertParams.relationCtx,
               ident = ident,
               optionsClause = insertParams.options,
-              writePrivileges = Seq(TableWritePrivilege.INSERT),
+              writePrivileges = Set(TableWritePrivilege.INSERT),
               isStreaming = false),
             partitionSpec = insertParams.partitionSpec,
             userSpecifiedCols = insertParams.userSpecifiedCols,
@@ -922,7 +923,7 @@ class AstBuilder extends DataTypeAstBuilder
               ctx = insertParams.relationCtx,
               ident = ident,
               optionsClause = insertParams.options,
-              writePrivileges = Seq(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE),
+              writePrivileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE),
               isStreaming = false),
             partitionSpec = insertParams.partitionSpec,
             userSpecifiedCols = insertParams.userSpecifiedCols,
@@ -936,7 +937,7 @@ class AstBuilder extends DataTypeAstBuilder
         val options = Option(ctx.optionsClause())
         withIdentClause(ctx.identifierReference, Seq(query), (ident, 
otherPlans) => {
           val table = createUnresolvedRelation(ctx.identifierReference, ident, 
options,
-            Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), 
isStreaming = false)
+            Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), 
isStreaming = false)
           val deleteExpr = expression(ctx.whereClause().booleanExpression())
           val isByName = ctx.NAME() != null
           val schemaEvolutionWriteOption: Map[String, String] =
@@ -1049,7 +1050,7 @@ class AstBuilder extends DataTypeAstBuilder
   override def visitDeleteFromTable(
       ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
     val table = createUnresolvedRelation(
-      ctx.identifierReference, writePrivileges = 
Seq(TableWritePrivilege.DELETE))
+      ctx.identifierReference, writePrivileges = 
Set(TableWritePrivilege.DELETE))
     val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), 
"DELETE")
     val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
     val predicate = if (ctx.whereClause() != null) {
@@ -1062,7 +1063,7 @@ class AstBuilder extends DataTypeAstBuilder
 
   override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = 
withOrigin(ctx) {
     val table = createUnresolvedRelation(
-      ctx.identifierReference, writePrivileges = 
Seq(TableWritePrivilege.UPDATE))
+      ctx.identifierReference, writePrivileges = 
Set(TableWritePrivilege.UPDATE))
     val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), 
"UPDATE")
     val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
     val assignments = withAssignments(ctx.setClause().assignmentList())
@@ -2513,7 +2514,7 @@ class AstBuilder extends DataTypeAstBuilder
       ctx = ctx,
       ident = ident,
       optionsClause = Option(ctx.optionsClause),
-      writePrivileges = Seq.empty,
+      writePrivileges = Set.empty,
       isStreaming = true)
 
     val table = mayApplyAliasPlan(ctx.tableAlias, relation)
@@ -3854,7 +3855,7 @@ class AstBuilder extends DataTypeAstBuilder
   private def createUnresolvedRelation(
       ctx: IdentifierReferenceContext,
       optionsClause: Option[OptionsClauseContext] = None,
-      writePrivileges: Seq[TableWritePrivilege] = Nil): LogicalPlan = 
withOrigin(ctx) {
+      writePrivileges: Set[TableWritePrivilege] = Set.empty): LogicalPlan = 
withOrigin(ctx) {
     val options = resolveOptions(optionsClause)
     withIdentClause(ctx, parts => {
       val relation = new UnresolvedRelation(parts, options, isStreaming = 
false)
@@ -3869,7 +3870,7 @@ class AstBuilder extends DataTypeAstBuilder
       ctx: ParserRuleContext,
       ident: Seq[String],
       optionsClause: Option[OptionsClauseContext],
-      writePrivileges: Seq[TableWritePrivilege],
+      writePrivileges: Set[TableWritePrivilege],
       isStreaming: Boolean): UnresolvedRelation = withOrigin(ctx) {
     val options = resolveOptions(optionsClause)
     val relation = new UnresolvedRelation(ident, options, isStreaming)
@@ -3949,8 +3950,8 @@ class AstBuilder extends DataTypeAstBuilder
       ctx: ParserRuleContext,
       calendarInterval: CalendarInterval,
       units: Seq[String]): Literal = {
-    val yearMonthFields = Set.empty[Byte]
-    val dayTimeFields = Set.empty[Byte]
+    val yearMonthFields = mutable.Set.empty[Byte]
+    val dayTimeFields = mutable.Set.empty[Byte]
     for (unit <- units) {
       if (YearMonthIntervalType.stringToField.contains(unit)) {
         yearMonthFields += YearMonthIntervalType.stringToField(unit)
@@ -6100,7 +6101,7 @@ class AstBuilder extends DataTypeAstBuilder
             ctx.identifierReference,
             ident,
             None,
-            writePrivileges = Nil,
+            writePrivileges = Set.empty,
             isStreaming = false),
           ident, isLazy, options)
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index a38e067861d4..6be0c4800ca4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -937,7 +937,7 @@ case class MergeIntoTable(
 
 object MergeIntoTable {
 
-  def getWritePrivileges(merge: MergeIntoTable): Seq[TableWritePrivilege] = {
+  def getWritePrivileges(merge: MergeIntoTable): Set[TableWritePrivilege] = {
     getWritePrivileges(
       merge.matchedActions,
       merge.notMatchedActions,
@@ -947,7 +947,7 @@ object MergeIntoTable {
   def getWritePrivileges(
       matchedActions: Iterable[MergeAction],
       notMatchedActions: Iterable[MergeAction],
-      notMatchedBySourceActions: Iterable[MergeAction]): 
Seq[TableWritePrivilege] = {
+      notMatchedBySourceActions: Iterable[MergeAction]): 
Set[TableWritePrivilege] = {
     (matchedActions ++ notMatchedActions ++ notMatchedBySourceActions)
       .collect {
         case _: DeleteAction => TableWritePrivilege.DELETE
@@ -955,7 +955,6 @@ object MergeIntoTable {
         case _: InsertAction | _: InsertStarAction => 
TableWritePrivilege.INSERT
       }
       .toSet
-      .toSeq
   }
 
   def schemaChanges(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
index 079e92451021..f0359b33f431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
@@ -372,9 +372,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
extends sql.DataFram
       ifPartitionNotExists = false)
   }
 
-  private def getWritePrivileges: Seq[TableWritePrivilege] = curmode match {
-    case SaveMode.Overwrite => Seq(INSERT, DELETE)
-    case _ => Seq(INSERT)
+  private def getWritePrivileges: Set[TableWritePrivilege] = curmode match {
+    case SaveMode.Overwrite => Set(INSERT, DELETE)
+    case _ => Set(INSERT)
   }
 
   private def getBucketSpec: Option[BucketSpec] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
index 1309b346ffa5..169822db96c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
@@ -194,7 +194,7 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
 
   private[sql] def appendCommand(): LogicalPlan = {
     AppendData.byName(
-      UnresolvedRelation(tableName).requireWritePrivileges(Seq(INSERT)),
+      UnresolvedRelation(tableName).requireWritePrivileges(Set(INSERT)),
       logicalPlan, options.toMap)
   }
 
@@ -206,7 +206,7 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
 
   private[sql] def overwriteCommand(condition: Column): LogicalPlan = {
     OverwriteByExpression.byName(
-      UnresolvedRelation(tableName).requireWritePrivileges(Seq(INSERT, 
DELETE)),
+      UnresolvedRelation(tableName).requireWritePrivileges(Set(INSERT, 
DELETE)),
       logicalPlan, expression(condition), options.toMap)
   }
 
@@ -218,7 +218,7 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
 
   private[sql] def overwritePartitionsCommand(): LogicalPlan = {
     OverwritePartitionsDynamic.byName(
-      UnresolvedRelation(tableName).requireWritePrivileges(Seq(INSERT, 
DELETE)),
+      UnresolvedRelation(tableName).requireWritePrivileges(Set(INSERT, 
DELETE)),
       logicalPlan, options.toMap)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
index dd000ab8fd6d..5298434cfe6f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
@@ -61,7 +61,7 @@ class CreateFlowCommandSuite extends CommandSuiteBase with 
AnalysisTest {
     val cmd = plan.asInstanceOf[CreateFlowCommand]
     assert(cmd.right == InsertIntoStatement(
       table = UnresolvedRelation(Seq("a"))
-        .requireWritePrivileges(Seq(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE)),
+        .requireWritePrivileges(Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE)),
       partitionSpec = Map("col1" -> None),
       userSpecifiedCols = Seq.empty,
       query = parser.parsePlan("SELECT col1, col2 FROM b"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to