This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 85b351bba320 [SPARK-55976][SQL] Use Set instead of Seq for write
privileges
85b351bba320 is described below
commit 85b351bba320dedb29534dd8c409ff9087d8aa66
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Mar 12 13:23:21 2026 -0700
[SPARK-55976][SQL] Use Set instead of Seq for write privileges
### What changes were proposed in this pull request?
This PR migrates code handling write privileges to use `Set` instead of
`Seq`.
### Why are the changes needed?
The current implementation uses `Seq` throughout the codebase, but this
choice is semantically incorrect and introduces unnecessary overhead:
1. The underlying Java API in `TableCatalog` expects
`Set<TableWritePrivilege>`:
```
default Table loadTable(
Identifier ident,
Set<TableWritePrivilege> writePrivileges) throws NoSuchTableException
```
2. The code converts to `Set` internally to deduplicate, then immediately
converts back to `Seq`:
```
(matchedActions ++ notMatchedActions ++ notMatchedBySourceActions)
.collect { ... }
.toSet // deduplicate
.toSeq // convert back for no reason
```
3. `TableWritePrivilege` is an enum with 3 values (INSERT, UPDATE,
DELETE) representing capability flags, not ordered operations. Using `Seq`
suggests that the order matters (it doesn't) and that duplicates are meaningful
(they aren't).
4. At the boundary between Scala and Java, we already convert to Set:
```
val writePrivileges = writePrivilegesString.get.split(",")
.map(_.trim)
.map(TableWritePrivilege.valueOf)
.toSet // Already converting to Set
.asJava
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude Sonnet 4.5 (claude-sonnet-4-5-20250929)
Closes #54773 from aokolnychyi/spark-55976.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/catalyst/analysis/unresolved.scala | 2 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 25 +++++++++++-----------
.../sql/catalyst/plans/logical/v2Commands.scala | 5 ++---
.../apache/spark/sql/classic/DataFrameWriter.scala | 6 +++---
.../spark/sql/classic/DataFrameWriterV2.scala | 6 +++---
.../command/v2/CreateFlowCommandSuite.scala | 2 +-
6 files changed, 23 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 5726c573ee7d..622b0c319f99 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -125,7 +125,7 @@ case class UnresolvedRelation(
override def name: String = tableName
- def requireWritePrivileges(privileges: Seq[TableWritePrivilege]):
UnresolvedRelation = {
+ def requireWritePrivileges(privileges: Set[TableWritePrivilege]):
UnresolvedRelation = {
if (privileges.nonEmpty) {
val newOptions = new java.util.HashMap[String, String]
newOptions.putAll(options)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a12add91f10e..36c9ef453cea 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.parser
import java.util.{List, Locale}
import java.util.concurrent.TimeUnit
-import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Set}
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer}
import scala.jdk.CollectionConverters._
import org.antlr.v4.runtime.{ParserRuleContext, RuleContext, Token}
@@ -904,7 +905,7 @@ class AstBuilder extends DataTypeAstBuilder
ctx = insertParams.relationCtx,
ident = ident,
optionsClause = insertParams.options,
- writePrivileges = Seq(TableWritePrivilege.INSERT),
+ writePrivileges = Set(TableWritePrivilege.INSERT),
isStreaming = false),
partitionSpec = insertParams.partitionSpec,
userSpecifiedCols = insertParams.userSpecifiedCols,
@@ -922,7 +923,7 @@ class AstBuilder extends DataTypeAstBuilder
ctx = insertParams.relationCtx,
ident = ident,
optionsClause = insertParams.options,
- writePrivileges = Seq(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE),
+ writePrivileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE),
isStreaming = false),
partitionSpec = insertParams.partitionSpec,
userSpecifiedCols = insertParams.userSpecifiedCols,
@@ -936,7 +937,7 @@ class AstBuilder extends DataTypeAstBuilder
val options = Option(ctx.optionsClause())
withIdentClause(ctx.identifierReference, Seq(query), (ident,
otherPlans) => {
val table = createUnresolvedRelation(ctx.identifierReference, ident,
options,
- Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE),
isStreaming = false)
+ Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE),
isStreaming = false)
val deleteExpr = expression(ctx.whereClause().booleanExpression())
val isByName = ctx.NAME() != null
val schemaEvolutionWriteOption: Map[String, String] =
@@ -1049,7 +1050,7 @@ class AstBuilder extends DataTypeAstBuilder
override def visitDeleteFromTable(
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedRelation(
- ctx.identifierReference, writePrivileges =
Seq(TableWritePrivilege.DELETE))
+ ctx.identifierReference, writePrivileges =
Set(TableWritePrivilege.DELETE))
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(),
"DELETE")
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
val predicate = if (ctx.whereClause() != null) {
@@ -1062,7 +1063,7 @@ class AstBuilder extends DataTypeAstBuilder
override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan =
withOrigin(ctx) {
val table = createUnresolvedRelation(
- ctx.identifierReference, writePrivileges =
Seq(TableWritePrivilege.UPDATE))
+ ctx.identifierReference, writePrivileges =
Set(TableWritePrivilege.UPDATE))
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(),
"UPDATE")
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
val assignments = withAssignments(ctx.setClause().assignmentList())
@@ -2513,7 +2514,7 @@ class AstBuilder extends DataTypeAstBuilder
ctx = ctx,
ident = ident,
optionsClause = Option(ctx.optionsClause),
- writePrivileges = Seq.empty,
+ writePrivileges = Set.empty,
isStreaming = true)
val table = mayApplyAliasPlan(ctx.tableAlias, relation)
@@ -3854,7 +3855,7 @@ class AstBuilder extends DataTypeAstBuilder
private def createUnresolvedRelation(
ctx: IdentifierReferenceContext,
optionsClause: Option[OptionsClauseContext] = None,
- writePrivileges: Seq[TableWritePrivilege] = Nil): LogicalPlan =
withOrigin(ctx) {
+ writePrivileges: Set[TableWritePrivilege] = Set.empty): LogicalPlan =
withOrigin(ctx) {
val options = resolveOptions(optionsClause)
withIdentClause(ctx, parts => {
val relation = new UnresolvedRelation(parts, options, isStreaming =
false)
@@ -3869,7 +3870,7 @@ class AstBuilder extends DataTypeAstBuilder
ctx: ParserRuleContext,
ident: Seq[String],
optionsClause: Option[OptionsClauseContext],
- writePrivileges: Seq[TableWritePrivilege],
+ writePrivileges: Set[TableWritePrivilege],
isStreaming: Boolean): UnresolvedRelation = withOrigin(ctx) {
val options = resolveOptions(optionsClause)
val relation = new UnresolvedRelation(ident, options, isStreaming)
@@ -3949,8 +3950,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx: ParserRuleContext,
calendarInterval: CalendarInterval,
units: Seq[String]): Literal = {
- val yearMonthFields = Set.empty[Byte]
- val dayTimeFields = Set.empty[Byte]
+ val yearMonthFields = mutable.Set.empty[Byte]
+ val dayTimeFields = mutable.Set.empty[Byte]
for (unit <- units) {
if (YearMonthIntervalType.stringToField.contains(unit)) {
yearMonthFields += YearMonthIntervalType.stringToField(unit)
@@ -6100,7 +6101,7 @@ class AstBuilder extends DataTypeAstBuilder
ctx.identifierReference,
ident,
None,
- writePrivileges = Nil,
+ writePrivileges = Set.empty,
isStreaming = false),
ident, isLazy, options)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index a38e067861d4..6be0c4800ca4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -937,7 +937,7 @@ case class MergeIntoTable(
object MergeIntoTable {
- def getWritePrivileges(merge: MergeIntoTable): Seq[TableWritePrivilege] = {
+ def getWritePrivileges(merge: MergeIntoTable): Set[TableWritePrivilege] = {
getWritePrivileges(
merge.matchedActions,
merge.notMatchedActions,
@@ -947,7 +947,7 @@ object MergeIntoTable {
def getWritePrivileges(
matchedActions: Iterable[MergeAction],
notMatchedActions: Iterable[MergeAction],
- notMatchedBySourceActions: Iterable[MergeAction]):
Seq[TableWritePrivilege] = {
+ notMatchedBySourceActions: Iterable[MergeAction]):
Set[TableWritePrivilege] = {
(matchedActions ++ notMatchedActions ++ notMatchedBySourceActions)
.collect {
case _: DeleteAction => TableWritePrivilege.DELETE
@@ -955,7 +955,6 @@ object MergeIntoTable {
case _: InsertAction | _: InsertStarAction =>
TableWritePrivilege.INSERT
}
.toSet
- .toSeq
}
def schemaChanges(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
index 079e92451021..f0359b33f431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
@@ -372,9 +372,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
extends sql.DataFram
ifPartitionNotExists = false)
}
- private def getWritePrivileges: Seq[TableWritePrivilege] = curmode match {
- case SaveMode.Overwrite => Seq(INSERT, DELETE)
- case _ => Seq(INSERT)
+ private def getWritePrivileges: Set[TableWritePrivilege] = curmode match {
+ case SaveMode.Overwrite => Set(INSERT, DELETE)
+ case _ => Set(INSERT)
}
private def getBucketSpec: Option[BucketSpec] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
index 1309b346ffa5..169822db96c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala
@@ -194,7 +194,7 @@ final class DataFrameWriterV2[T] private[sql](table:
String, ds: Dataset[T])
private[sql] def appendCommand(): LogicalPlan = {
AppendData.byName(
- UnresolvedRelation(tableName).requireWritePrivileges(Seq(INSERT)),
+ UnresolvedRelation(tableName).requireWritePrivileges(Set(INSERT)),
logicalPlan, options.toMap)
}
@@ -206,7 +206,7 @@ final class DataFrameWriterV2[T] private[sql](table:
String, ds: Dataset[T])
private[sql] def overwriteCommand(condition: Column): LogicalPlan = {
OverwriteByExpression.byName(
- UnresolvedRelation(tableName).requireWritePrivileges(Seq(INSERT,
DELETE)),
+ UnresolvedRelation(tableName).requireWritePrivileges(Set(INSERT,
DELETE)),
logicalPlan, expression(condition), options.toMap)
}
@@ -218,7 +218,7 @@ final class DataFrameWriterV2[T] private[sql](table:
String, ds: Dataset[T])
private[sql] def overwritePartitionsCommand(): LogicalPlan = {
OverwritePartitionsDynamic.byName(
- UnresolvedRelation(tableName).requireWritePrivileges(Seq(INSERT,
DELETE)),
+ UnresolvedRelation(tableName).requireWritePrivileges(Set(INSERT,
DELETE)),
logicalPlan, options.toMap)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
index dd000ab8fd6d..5298434cfe6f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
@@ -61,7 +61,7 @@ class CreateFlowCommandSuite extends CommandSuiteBase with
AnalysisTest {
val cmd = plan.asInstanceOf[CreateFlowCommand]
assert(cmd.right == InsertIntoStatement(
table = UnresolvedRelation(Seq("a"))
- .requireWritePrivileges(Seq(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE)),
+ .requireWritePrivileges(Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE)),
partitionSpec = Map("col1" -> None),
userSpecifiedCols = Seq.empty,
query = parser.parsePlan("SELECT col1, col2 FROM b"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]