This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bad4b6c4bb [SPARK-45454][SQL] Set the table's default owner to 
current_user
4bad4b6c4bb is described below

commit 4bad4b6c4bb70a1a19c386466af134077e2106c1
Author: Yuming Wang <[email protected]>
AuthorDate: Sun Oct 8 09:31:29 2023 -0700

    [SPARK-45454][SQL] Set the table's default owner to current_user
    
    ### What changes were proposed in this pull request?
    
    This PR sets the table's default owner to `CURRENT_USER`.
    
    ### Why are the changes needed?
    
    In thrift server mode, the owner of the table is inconsistent with the 
`SELECT CURRENT_USER();`,  the owner of the table is always the user who 
started the thrift server.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The table owner may be changed to `CURRENT_USER`.
    
    For example:
    ```
    Before this PR:
    yumwangG9L07H60PK spark-3.5.0-bin-hadoop3 % bin/beeline -u 
"jdbc:hive2://localhost:10000/" -n test_table_owner -e "create table t(id int) 
using parquet; desc formatted t;" | grep Owner
    Connecting to jdbc:hive2://localhost:10000/
    Connected to: Spark SQL (version 3.5.0)
    Driver: Hive JDBC (version 2.3.9)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    No rows selected (0.36 seconds)
    No rows selected (0.1 seconds)
    | Owner                         | yumwang                                   
         |          |
    16 rows selected (0.055 seconds)
    Beeline version 2.3.9 by Apache Hive
    Closing: 0: jdbc:hive2://localhost:10000/
    
    After this PR:
    yumwangG9L07H60PK spark-4.0.0-SNAPSHOT-bin-3.3.6 % bin/beeline -u 
"jdbc:hive2://localhost:10000/" -n test_table_owner -e "create table t(id int) 
using parquet; desc formatted t;" | grep Owner
    Connecting to jdbc:hive2://localhost:10000/
    Connected to: Spark SQL (version 4.0.0-SNAPSHOT)
    Driver: Hive JDBC (version 2.3.9)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    No rows selected (0.719 seconds)
    No rows selected (0.335 seconds)
    | Owner                         | test_table_owner                          
         |          |
    16 rows selected (0.065 seconds)
    Beeline version 2.3.9 by Apache Hive
    Closing: 0: jdbc:hive2://localhost:10000/
    ```
    
    ### How was this patch tested?
    
    Unit test and manual test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43264 from wangyum/SPARK-45454.
    
    Authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/catalyst/CurrentUserContext.scala  |  6 ++++++
 .../apache/spark/sql/catalyst/catalog/interface.scala   |  4 ++--
 .../spark/sql/catalyst/optimizer/finishAnalysis.scala   |  5 ++---
 .../spark/sql/connector/catalog/CatalogV2Util.scala     |  4 ++--
 .../spark/sql/catalyst/analysis/CatalogSuite.scala      | 17 +++++++++++++++++
 .../spark/sql/connector/DataSourceV2SQLSuite.scala      | 13 +++++++++++++
 .../ThriftServerWithSparkContextSuite.scala             | 16 ++++++++++++++++
 7 files changed, 58 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
index 16960db5f20..9dea473a34c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala
@@ -17,8 +17,14 @@
 
 package org.apache.spark.sql.catalyst
 
+import org.apache.spark.util.Utils
+
 object CurrentUserContext {
   val CURRENT_USER: InheritableThreadLocal[String] = new 
InheritableThreadLocal[String] {
     override protected def initialValue(): String = null
   }
+
+  def getCurrentUser: String = 
Option(CURRENT_USER.get()).getOrElse(Utils.getCurrentUserName())
+
+  def getCurrentUserOrEmpty: String = Option(CURRENT_USER.get()).getOrElse("")
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4b04cfddbe8..26b38676b07 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, 
SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, 
InternalRow, SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
UnresolvedLeafNode}
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Cast, ExprId, Literal}
@@ -241,7 +241,7 @@ case class CatalogTable(
     provider: Option[String] = None,
     partitionColumnNames: Seq[String] = Seq.empty,
     bucketSpec: Option[BucketSpec] = None,
-    owner: String = "",
+    owner: String = CurrentUserContext.getCurrentUserOrEmpty,
     createTime: Long = System.currentTimeMillis,
     lastAccessTime: Long = -1,
     createVersion: String = "",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 466781fa1de..4052ccd6496 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import java.time.{Instant, LocalDateTime}
 
-import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
+import org.apache.spark.sql.catalyst.CurrentUserContext
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, 
localDateTimeToMicros}
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
 
 
 /**
@@ -109,7 +108,7 @@ case class ReplaceCurrentLike(catalogManager: 
CatalogManager) extends Rule[Logic
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     val currentNamespace = catalogManager.currentNamespace.quoted
     val currentCatalog = catalogManager.currentCatalog.name()
-    val currentUser = 
Option(CURRENT_USER.get()).getOrElse(Utils.getCurrentUserName())
+    val currentUser = CurrentUserContext.getCurrentUser
 
     plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
       case CurrentDatabase() =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 72cb1e58c7e..f0f02c156ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -23,6 +23,7 @@ import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.CurrentUserContext
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, 
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, 
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
@@ -34,7 +35,6 @@ import org.apache.spark.sql.connector.expressions.LiteralValue
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, 
MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.Utils
 
 private[sql] object CatalogV2Util {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -423,7 +423,7 @@ private[sql] object CatalogV2Util {
   }
 
   def withDefaultOwnership(properties: Map[String, String]): Map[String, 
String] = {
-    properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
+    properties ++ Map(TableCatalog.PROP_OWNER -> 
CurrentUserContext.getCurrentUser)
   }
 
   def getTableProviderCatalog(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
index d670053ba1b..0ac25e628a8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.types.StructType
@@ -34,4 +35,20 @@ class CatalogSuite extends AnalysisTest {
       provider = Some("parquet"))
     table.toLinkedHashMap
   }
+
+  test("SPARK-45454: Set table owner to current_user") {
+    val testOwner = "test_table_owner"
+    try {
+      CURRENT_USER.set(testOwner)
+      val table = CatalogTable(
+        identifier = TableIdentifier("tbl", Some("db1")),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("col1", "int").add("col2", "string"),
+        provider = Some("parquet"))
+      assert(table.owner === testOwner)
+    } finally {
+      CURRENT_USER.remove()
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 2855d7b06f5..ae639b272a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -3294,6 +3295,18 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-45454: Set table owner to current_user if it is set") {
+    val testOwner = "test_table_owner"
+    try {
+      CURRENT_USER.set(testOwner)
+      spark.sql("CREATE TABLE testcat.table_name (id int) USING foo")
+      val table = 
catalog("testcat").asTableCatalog.loadTable(Identifier.of(Array(), 
"table_name"))
+      assert(table.properties.get(TableCatalog.PROP_OWNER) === testOwner)
+    } finally {
+      CURRENT_USER.remove()
+    }
+  }
+
   private def testNotSupportedV2Command(
       sqlCommand: String,
       sqlParams: String,
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 318328d71a8..0589f9de609 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.cli.{GetInfoType, 
HiveSQLException, OperationHand
 
 import org.apache.spark.{ErrorMessageFormat, TaskKilled}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.internal.SQLConf
 
 trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
@@ -254,6 +255,21 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       assertThrows[SQLException](rs1.beforeFirst())
     }
   }
+
+  test("SPARK-45454: Set table owner to current_user") {
+    val testOwner = "test_table_owner"
+    val tableName = "t"
+    withTable(tableName) {
+      withCLIServiceClient(testOwner) { client =>
+        val sessionHandle = client.openSession(testOwner, "")
+        val confOverlay = new java.util.HashMap[java.lang.String, 
java.lang.String]
+        val exec: String => OperationHandle = 
client.executeStatement(sessionHandle, _, confOverlay)
+        exec(s"CREATE TABLE $tableName(id int) using parquet")
+        val owner = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).owner
+        assert(owner === testOwner)
+      }
+    }
+  }
 }
 
 class ThriftServerWithSparkContextInBinarySuite extends 
ThriftServerWithSparkContextSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to