This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4bad4b6c4bb [SPARK-45454][SQL] Set the table's default owner to
current_user
4bad4b6c4bb is described below
commit 4bad4b6c4bb70a1a19c386466af134077e2106c1
Author: Yuming Wang <[email protected]>
AuthorDate: Sun Oct 8 09:31:29 2023 -0700
[SPARK-45454][SQL] Set the table's default owner to current_user
### What changes were proposed in this pull request?
This PR sets the table's default owner to `CURRENT_USER`.
### Why are the changes needed?
In thrift server mode, the owner of the table is inconsistent with the
`SELECT CURRENT_USER();`, the owner of the table is always the user who
started the thrift server.
### Does this PR introduce _any_ user-facing change?
The table owner may be changed to `CURRENT_USER`.
For example:
```
Before this PR:
yumwangG9L07H60PK spark-3.5.0-bin-hadoop3 % bin/beeline -u
"jdbc:hive2://localhost:10000/" -n test_table_owner -e "create table t(id int)
using parquet; desc formatted t;" | grep Owner
Connecting to jdbc:hive2://localhost:10000/
Connected to: Spark SQL (version 3.5.0)
Driver: Hive JDBC (version 2.3.9)
Transaction isolation: TRANSACTION_REPEATABLE_READ
No rows selected (0.36 seconds)
No rows selected (0.1 seconds)
| Owner | yumwang
| |
16 rows selected (0.055 seconds)
Beeline version 2.3.9 by Apache Hive
Closing: 0: jdbc:hive2://localhost:10000/
After this PR:
yumwangG9L07H60PK spark-4.0.0-SNAPSHOT-bin-3.3.6 % bin/beeline -u
"jdbc:hive2://localhost:10000/" -n test_table_owner -e "create table t(id int)
using parquet; desc formatted t;" | grep Owner
Connecting to jdbc:hive2://localhost:10000/
Connected to: Spark SQL (version 4.0.0-SNAPSHOT)
Driver: Hive JDBC (version 2.3.9)
Transaction isolation: TRANSACTION_REPEATABLE_READ
No rows selected (0.719 seconds)
No rows selected (0.335 seconds)
| Owner | test_table_owner
| |
16 rows selected (0.065 seconds)
Beeline version 2.3.9 by Apache Hive
Closing: 0: jdbc:hive2://localhost:10000/
```
### How was this patch tested?
Unit test and manual test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43264 from wangyum/SPARK-45454.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/catalyst/CurrentUserContext.scala | 6 ++++++
.../apache/spark/sql/catalyst/catalog/interface.scala | 4 ++--
.../spark/sql/catalyst/optimizer/finishAnalysis.scala | 5 ++---
.../spark/sql/connector/catalog/CatalogV2Util.scala | 4 ++--
.../spark/sql/catalyst/analysis/CatalogSuite.scala | 17 +++++++++++++++++
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 13 +++++++++++++
.../ThriftServerWithSparkContextSuite.scala | 16 ++++++++++++++++
7 files changed, 58 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
index 16960db5f20..9dea473a34c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
@@ -17,8 +17,14 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.util.Utils
+
object CurrentUserContext {
val CURRENT_USER: InheritableThreadLocal[String] = new
InheritableThreadLocal[String] {
override protected def initialValue(): String = null
}
+
+ def getCurrentUser: String =
Option(CURRENT_USER.get()).getOrElse(Utils.getCurrentUserName())
+
+ def getCurrentUserOrEmpty: String = Option(CURRENT_USER.get()).getOrElse("")
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4b04cfddbe8..26b38676b07 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow,
SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier,
InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation,
UnresolvedLeafNode}
import
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, Cast, ExprId, Literal}
@@ -241,7 +241,7 @@ case class CatalogTable(
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
- owner: String = "",
+ owner: String = CurrentUserContext.getCurrentUserOrEmpty,
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
createVersion: String = "",
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 466781fa1de..4052ccd6496 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import java.time.{Instant, LocalDateTime}
-import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
+import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate,
convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros,
localDateTimeToMicros}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
/**
@@ -109,7 +108,7 @@ case class ReplaceCurrentLike(catalogManager:
CatalogManager) extends Rule[Logic
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val currentNamespace = catalogManager.currentNamespace.quoted
val currentCatalog = catalogManager.currentCatalog.name()
- val currentUser =
Option(CURRENT_USER.get()).getOrElse(Utils.getCurrentUserName())
+ val currentUser = CurrentUserContext.getCurrentUser
plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
case CurrentDatabase() =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 72cb1e58c7e..f0f02c156ae 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -23,6 +23,7 @@ import java.util.Collections
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion,
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException,
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
@@ -34,7 +35,6 @@ import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata,
MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.Utils
private[sql] object CatalogV2Util {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -423,7 +423,7 @@ private[sql] object CatalogV2Util {
}
def withDefaultOwnership(properties: Map[String, String]): Map[String,
String] = {
- properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
+ properties ++ Map(TableCatalog.PROP_OWNER ->
CurrentUserContext.getCurrentUser)
}
def getTableProviderCatalog(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
index d670053ba1b..0ac25e628a8 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
import org.apache.spark.sql.types.StructType
@@ -34,4 +35,20 @@ class CatalogSuite extends AnalysisTest {
provider = Some("parquet"))
table.toLinkedHashMap
}
+
+ test("SPARK-45454: Set table owner to current_user") {
+ val testOwner = "test_table_owner"
+ try {
+ CURRENT_USER.set(testOwner)
+ val table = CatalogTable(
+ identifier = TableIdentifier("tbl", Some("db1")),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("col1", "int").add("col2", "string"),
+ provider = Some("parquet"))
+ assert(table.owner === testOwner)
+ } finally {
+ CURRENT_USER.remove()
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 2855d7b06f5..ae639b272a2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -3294,6 +3295,18 @@ class DataSourceV2SQLSuiteV1Filter
}
}
+ test("SPARK-45454: Set table owner to current_user if it is set") {
+ val testOwner = "test_table_owner"
+ try {
+ CURRENT_USER.set(testOwner)
+ spark.sql("CREATE TABLE testcat.table_name (id int) USING foo")
+ val table =
catalog("testcat").asTableCatalog.loadTable(Identifier.of(Array(),
"table_name"))
+ assert(table.properties.get(TableCatalog.PROP_OWNER) === testOwner)
+ } finally {
+ CURRENT_USER.remove()
+ }
+ }
+
private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 318328d71a8..0589f9de609 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.cli.{GetInfoType,
HiveSQLException, OperationHand
import org.apache.spark.{ErrorMessageFormat, TaskKilled}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
@@ -254,6 +255,21 @@ trait ThriftServerWithSparkContextSuite extends
SharedThriftServer {
assertThrows[SQLException](rs1.beforeFirst())
}
}
+
+ test("SPARK-45454: Set table owner to current_user") {
+ val testOwner = "test_table_owner"
+ val tableName = "t"
+ withTable(tableName) {
+ withCLIServiceClient(testOwner) { client =>
+ val sessionHandle = client.openSession(testOwner, "")
+ val confOverlay = new java.util.HashMap[java.lang.String,
java.lang.String]
+ val exec: String => OperationHandle =
client.executeStatement(sessionHandle, _, confOverlay)
+ exec(s"CREATE TABLE $tableName(id int) using parquet")
+ val owner =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).owner
+ assert(owner === testOwner)
+ }
+ }
+ }
}
class ThriftServerWithSparkContextInBinarySuite extends
ThriftServerWithSparkContextSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]