This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d0675a35a [KYUUBI #4879] Refactor and promote relection utils and
cleanup similar reflection methods
d0675a35a is described below
commit d0675a35a749baa45c1fc590cd62425ee23af0aa
Author: liangbowen <[email protected]>
AuthorDate: Tue Jun 6 18:59:18 2023 +0800
[KYUUBI #4879] Refactor and promote relection utils and cleanup similar
reflection methods
### _Why are the changes needed?_
- apply the usage of `ReflectUtils` and `Dyn*` to the modules of engines
and plugins (eg. Spark engine, Authz plugin, lineage plugin, beeline)
- remove similar redundant methods for calling reflected methods or getting
field values
- unified reflection helper methods with type casting support, as
`getField[T]` for getting field values from `getFields`, `invokeAs[T]` for
invoking methods in `getMethods`.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4879 from bowenliang123/reflect-use.
Closes #4879
c685fb67d [liangbowen] bug fix for "Cannot bind static field options" when
executing "bin/beeline"
fc1fdf1de [liangbowen] import
59c3dd032 [liangbowen] comment
c435c131d [liangbowen] reflect util usage
Authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../plugin/spark/authz/ranger/AccessRequest.scala | 8 +-
.../ranger/RuleReplaceShowObjectCommands.scala | 2 +-
.../plugin/spark/authz/serde/Descriptor.scala | 20 ++---
.../spark/authz/serde/databaseExtractors.scala | 8 +-
.../plugin/spark/authz/serde/tableExtractors.scala | 12 +--
.../plugin/spark/authz/util/AuthZUtils.scala | 10 ++-
.../spark/authz/util/RangerConfigProvider.scala | 8 +-
.../authz/ranger/RangerSparkExtensionSuite.scala | 2 +-
.../helper/SparkSQLLineageParseHelper.scala | 99 +++++++---------------
.../jdbc/connection/ConnectionProvider.scala | 13 +--
.../main/scala/org/apache/spark/ui/EngineTab.scala | 23 +++--
.../org/apache/kyuubi/KyuubiSQLException.scala | 8 +-
.../EngineSecuritySecretProvider.scala | 8 +-
.../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 12 +--
.../zookeeper/ZookeeperDiscoveryClientSuite.scala | 10 ++-
kyuubi-hive-beeline/pom.xml | 6 ++
.../org/apache/hive/beeline/KyuubiBeeLine.java | 57 ++++++-------
kyuubi-hive-jdbc/pom.xml | 5 ++
.../kyuubi/jdbc/hive/KyuubiSQLException.java | 6 +-
kyuubi-rest-client/pom.xml | 6 ++
.../client/auth/SpnegoAuthHeaderGenerator.java | 20 +++--
.../org/apache/kyuubi/plugin/PluginLoader.scala | 7 +-
.../apache/kyuubi/util/reflect/ReflectUtils.scala | 68 ++++++++++-----
.../KubernetesSparkBlockCleanerSuite.scala | 7 +-
24 files changed, 219 insertions(+), 206 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
index 7d4999fde..8fc8028e6 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
@@ -50,7 +50,7 @@ object AccessRequest {
"getRolesFromUserAndGroups",
(classOf[String], userName),
(classOf[JSet[String]], userGroups))
- invoke(req, "setUserRoles", (classOf[JSet[String]], roles))
+ invokeAs[Unit](req, "setUserRoles", (classOf[JSet[String]], roles))
} catch {
case _: Exception =>
}
@@ -61,7 +61,7 @@ object AccessRequest {
}
try {
val clusterName = invokeAs[String](SparkRangerAdminPlugin,
"getClusterName")
- invoke(req, "setClusterName", (classOf[String], clusterName))
+ invokeAs[Unit](req, "setClusterName", (classOf[String], clusterName))
} catch {
case _: Exception =>
}
@@ -74,8 +74,8 @@ object AccessRequest {
private def getUserGroupsFromUserStore(user: UserGroupInformation):
Option[JSet[String]] = {
try {
- val storeEnricher = invoke(SparkRangerAdminPlugin,
"getUserStoreEnricher")
- val userStore = invoke(storeEnricher, "getRangerUserStore")
+ val storeEnricher = invokeAs[AnyRef](SparkRangerAdminPlugin,
"getUserStoreEnricher")
+ val userStore = invokeAs[AnyRef](storeEnricher, "getRangerUserStore")
val userGroupMapping =
invokeAs[JHashMap[String, JSet[String]]](userStore,
"getUserGroupMapping")
Some(userGroupMapping.get(user.getShortUserName))
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
index 6e86ab9fb..bf762109c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
@@ -46,7 +46,7 @@ class RuleReplaceShowObjectCommands extends Rule[LogicalPlan]
{
case class FilteredShowTablesCommand(delegated: RunnableCommand)
extends FilteredShowObjectCommand(delegated) {
- private val isExtended = getFieldVal[Boolean](delegated, "isExtended")
+ private val isExtended = getField[Boolean](delegated, "isExtended")
override protected def isAllowed(r: Row, ugi: UserGroupInformation): Boolean
= {
val database = r.getString(0)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
index 3eb3fecea..fc660ce14 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
@@ -72,7 +72,7 @@ case class ColumnDesc(
fieldName: String,
fieldExtractor: String) extends Descriptor {
override def extract(v: AnyRef): Seq[String] = {
- val columnsVal = invoke(v, fieldName)
+ val columnsVal = invokeAs[AnyRef](v, fieldName)
val columnExtractor = lookupExtractor[ColumnExtractor](fieldExtractor)
columnExtractor(columnsVal)
}
@@ -91,7 +91,7 @@ case class DatabaseDesc(
catalogDesc: Option[CatalogDesc] = None,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Database = {
- val databaseVal = invoke(v, fieldName)
+ val databaseVal = invokeAs[AnyRef](v, fieldName)
val databaseExtractor = lookupExtractor[DatabaseExtractor](fieldExtractor)
val db = databaseExtractor(databaseVal)
if (db.catalog.isEmpty && catalogDesc.nonEmpty) {
@@ -119,7 +119,7 @@ case class FunctionTypeDesc(
}
def extract(v: AnyRef, spark: SparkSession): FunctionType = {
- val functionTypeVal = invoke(v, fieldName)
+ val functionTypeVal = invokeAs[AnyRef](v, fieldName)
val functionTypeExtractor =
lookupExtractor[FunctionTypeExtractor](fieldExtractor)
functionTypeExtractor(functionTypeVal, spark)
}
@@ -145,7 +145,7 @@ case class FunctionDesc(
functionTypeDesc: Option[FunctionTypeDesc] = None,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Function = {
- val functionVal = invoke(v, fieldName)
+ val functionVal = invokeAs[AnyRef](v, fieldName)
val functionExtractor = lookupExtractor[FunctionExtractor](fieldExtractor)
var function = functionExtractor(functionVal)
if (function.database.isEmpty) {
@@ -170,7 +170,7 @@ case class QueryDesc(
fieldName: String,
fieldExtractor: String = "LogicalPlanQueryExtractor") extends Descriptor {
override def extract(v: AnyRef): Option[LogicalPlan] = {
- val queryVal = invoke(v, fieldName)
+ val queryVal = invokeAs[AnyRef](v, fieldName)
val queryExtractor = lookupExtractor[QueryExtractor](fieldExtractor)
queryExtractor(queryVal)
}
@@ -192,7 +192,7 @@ case class TableTypeDesc(
}
def extract(v: AnyRef, spark: SparkSession): TableType = {
- val tableTypeVal = invoke(v, fieldName)
+ val tableTypeVal = invokeAs[AnyRef](v, fieldName)
val tableTypeExtractor =
lookupExtractor[TableTypeExtractor](fieldExtractor)
tableTypeExtractor(tableTypeVal, spark)
}
@@ -230,7 +230,7 @@ case class TableDesc(
}
def extract(v: AnyRef, spark: SparkSession): Option[Table] = {
- val tableVal = invoke(v, fieldName)
+ val tableVal = invokeAs[AnyRef](v, fieldName)
val tableExtractor = lookupExtractor[TableExtractor](fieldExtractor)
val maybeTable = tableExtractor(spark, tableVal)
maybeTable.map { t =>
@@ -257,7 +257,7 @@ case class ActionTypeDesc(
actionType: Option[String] = None) extends Descriptor {
override def extract(v: AnyRef): PrivilegeObjectActionType = {
actionType.map(PrivilegeObjectActionType.withName).getOrElse {
- val actionTypeVal = invoke(v, fieldName)
+ val actionTypeVal = invokeAs[AnyRef](v, fieldName)
val actionTypeExtractor =
lookupExtractor[ActionTypeExtractor](fieldExtractor)
actionTypeExtractor(actionTypeVal)
}
@@ -274,7 +274,7 @@ case class CatalogDesc(
fieldName: String = "catalog",
fieldExtractor: String = "CatalogPluginCatalogExtractor") extends
Descriptor {
override def extract(v: AnyRef): Option[String] = {
- val catalogVal = invoke(v, fieldName)
+ val catalogVal = invokeAs[AnyRef](v, fieldName)
val catalogExtractor = lookupExtractor[CatalogExtractor](fieldExtractor)
catalogExtractor(catalogVal)
}
@@ -292,7 +292,7 @@ case class ScanDesc(
val tableVal = if (fieldName == null) {
v
} else {
- invoke(v, fieldName)
+ invokeAs[AnyRef](v, fieldName)
}
val tableExtractor = lookupExtractor[TableExtractor](fieldExtractor)
val maybeTable = tableExtractor(spark, tableVal)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala
index f952c816e..713d3e3fb 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala
@@ -69,9 +69,9 @@ class StringSeqOptionDatabaseExtractor extends
DatabaseExtractor {
*/
class ResolvedNamespaceDatabaseExtractor extends DatabaseExtractor {
override def apply(v1: AnyRef): Database = {
- val catalogVal = invoke(v1, "catalog")
+ val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
- val namespace = getFieldVal[Seq[String]](v1, "namespace")
+ val namespace = getField[Seq[String]](v1, "namespace")
Database(catalog, quote(namespace))
}
}
@@ -81,9 +81,9 @@ class ResolvedNamespaceDatabaseExtractor extends
DatabaseExtractor {
*/
class ResolvedDBObjectNameDatabaseExtractor extends DatabaseExtractor {
override def apply(v1: AnyRef): Database = {
- val catalogVal = invoke(v1, "catalog")
+ val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
- val namespace = getFieldVal[Seq[String]](v1, "nameParts")
+ val namespace = getField[Seq[String]](v1, "nameParts")
Database(catalog, quote(namespace))
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 4579349ee..a8a08ed93 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -47,7 +47,7 @@ object TableExtractor {
*/
def getOwner(v: AnyRef): Option[String] = {
// org.apache.spark.sql.connector.catalog.Table
- val table = invoke(v, "table")
+ val table = invokeAs[AnyRef](v, "table")
val properties = invokeAs[JMap[String, String]](table,
"properties").asScala
properties.get("owner")
}
@@ -97,9 +97,9 @@ class CatalogTableOptionTableExtractor extends TableExtractor
{
*/
class ResolvedTableTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
- val catalogVal = invoke(v1, "catalog")
+ val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
- val identifier = invoke(v1, "identifier")
+ val identifier = invokeAs[AnyRef](v1, "identifier")
val maybeTable = lookupExtractor[IdentifierTableExtractor].apply(spark,
identifier)
val maybeOwner = TableExtractor.getOwner(v1)
maybeTable.map(_.copy(catalog = catalog, owner = maybeOwner))
@@ -157,7 +157,7 @@ class LogicalRelationTableExtractor extends TableExtractor {
*/
class ResolvedDbObjectNameTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
- val catalogVal = invoke(v1, "catalog")
+ val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val nameParts = invokeAs[Seq[String]](v1, "nameParts")
val namespace = nameParts.init.toArray
@@ -173,9 +173,9 @@ class ResolvedIdentifierTableExtractor extends
TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
v1.getClass.getName match {
case "org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier" =>
- val catalogVal = invoke(v1, "catalog")
+ val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
- val identifier = invoke(v1, "identifier")
+ val identifier = invokeAs[AnyRef](v1, "identifier")
val maybeTable =
lookupExtractor[IdentifierTableExtractor].apply(spark, identifier)
maybeTable.map(_.copy(catalog = catalog))
case _ => None
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index e13968e2e..9ac3bfef3 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -32,6 +32,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.util.ReservedKeys._
import org.apache.kyuubi.util.SemanticVersion
+import org.apache.kyuubi.util.reflect.DynConstructors
import org.apache.kyuubi.util.reflect.ReflectUtils._
private[authz] object AuthZUtils {
@@ -61,7 +62,7 @@ private[authz] object AuthZUtils {
def hasResolvedPermanentView(plan: LogicalPlan): Boolean = {
plan match {
case view: View if view.resolved && isSparkV31OrGreater =>
- !getFieldVal[Boolean](view, "isTempView")
+ !getField[Boolean](view, "isTempView")
case _ =>
false
}
@@ -69,7 +70,12 @@ private[authz] object AuthZUtils {
lazy val isRanger21orGreater: Boolean = {
try {
- classOf[RangerBasePlugin].getConstructor(classOf[String],
classOf[String], classOf[String])
+ DynConstructors.builder().impl(
+ classOf[RangerBasePlugin],
+ classOf[String],
+ classOf[String],
+ classOf[String])
+ .buildChecked[RangerBasePlugin]()
true
} catch {
case _: NoSuchMethodException =>
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
index cb3a2371b..806914286 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
@@ -34,18 +34,18 @@ trait RangerConfigProvider {
* org.apache.ranger.authorization.hadoop.config.RangerConfiguration
* for Ranger 2.0 and below
*/
- def getRangerConf: Configuration = {
+ val getRangerConf: Configuration = {
if (isRanger21orGreater) {
// for Ranger 2.1+
DynMethods.builder("getConfig")
.impl("org.apache.ranger.plugin.service.RangerBasePlugin")
- .build()
- .invoke[Configuration](this)
+ .buildChecked(this)
+ .invoke[Configuration]()
} else {
// for Ranger 2.0 and below
DynMethods.builder("getInstance")
.impl("org.apache.ranger.authorization.hadoop.config.RangerConfiguration")
- .buildStatic()
+ .buildStaticChecked()
.invoke[Configuration]()
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 89b81ccee..b5dcf63cb 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -458,7 +458,7 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
admin, {
val hiveTableRelation = sql(s"SELECT * FROM $table")
.queryExecution.optimizedPlan.collectLeaves().head.asInstanceOf[HiveTableRelation]
- assert(getFieldVal[Option[Statistics]](hiveTableRelation,
"tableStats").nonEmpty)
+ assert(getField[Option[Statistics]](hiveTableRelation,
"tableStats").nonEmpty)
})
}
}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index f060cc994..f2806f216 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.plugin.lineage.helper
import scala.collection.immutable.ListMap
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
import org.apache.spark.internal.Logging
import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
@@ -38,6 +38,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, Data
import org.apache.kyuubi.plugin.lineage.Lineage
import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
+import org.apache.kyuubi.util.reflect.ReflectUtils._
trait LineageParser {
def sparkSession: SparkSession
@@ -189,7 +190,7 @@ trait LineageParser {
plan match {
// For command
case p if p.nodeName == "CommandResult" =>
- val commandPlan = getPlanField[LogicalPlan]("commandLogicalPlan", plan)
+ val commandPlan = getField[LogicalPlan](plan, "commandLogicalPlan")
extractColumnsLineage(commandPlan, parentColumnsLineage)
case p if p.nodeName == "AlterViewAsCommand" =>
val query =
@@ -198,22 +199,22 @@ trait LineageParser {
} else {
getQuery(plan)
}
- val view = getPlanField[TableIdentifier]("name", plan).unquotedString
+ val view = getField[TableIdentifier](plan, "name").unquotedString
extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) =>
k.withName(s"$view.${k.name}") -> v
}
case p
if p.nodeName == "CreateViewCommand"
- && getPlanField[ViewType]("viewType", plan) == PersistedView =>
- val view = getPlanField[TableIdentifier]("name", plan).unquotedString
+ && getField[ViewType](plan, "viewType") == PersistedView =>
+ val view = getField[TableIdentifier](plan, "name").unquotedString
val outputCols =
- getPlanField[Seq[(String, Option[String])]]("userSpecifiedColumns",
plan).map(_._1)
+ getField[Seq[(String, Option[String])]](plan,
"userSpecifiedColumns").map(_._1)
val query =
if (isSparkVersionAtMost("3.1")) {
-
sparkSession.sessionState.analyzer.execute(getPlanField[LogicalPlan]("child",
plan))
+
sparkSession.sessionState.analyzer.execute(getField[LogicalPlan](plan, "child"))
} else {
- getPlanField[LogicalPlan]("plan", plan)
+ getField[LogicalPlan](plan, "plan")
}
extractColumnsLineage(query, parentColumnsLineage).zipWithIndex.map {
@@ -222,7 +223,7 @@ trait LineageParser {
}
case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" =>
- val table = getPlanField[CatalogTable]("table", plan).qualifiedName
+ val table = getField[CatalogTable](plan, "table").qualifiedName
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -230,7 +231,7 @@ trait LineageParser {
case p
if p.nodeName == "CreateHiveTableAsSelectCommand" ||
p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" =>
- val table = getPlanField[CatalogTable]("tableDesc", plan).qualifiedName
+ val table = getField[CatalogTable](plan, "tableDesc").qualifiedName
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -241,15 +242,15 @@ trait LineageParser {
val (table, namespace, catalog) =
if (isSparkVersionAtMost("3.2")) {
(
- getPlanField[Identifier]("tableName", plan).name,
- getPlanField[Identifier]("tableName",
plan).namespace.mkString("."),
- getPlanField[TableCatalog]("catalog", plan).name())
+ getField[Identifier](plan, "tableName").name,
+ getField[Identifier](plan, "tableName").namespace.mkString("."),
+ getField[TableCatalog](plan, "catalog").name())
} else {
(
- getPlanMethod[Identifier]("tableName", plan).name(),
- getPlanMethod[Identifier]("tableName",
plan).namespace().mkString("."),
- getCurrentPlanField[CatalogPlugin](
- getPlanMethod[LogicalPlan]("left", plan),
+ invokeAs[Identifier](plan, "tableName").name(),
+ invokeAs[Identifier](plan,
"tableName").namespace().mkString("."),
+ getField[CatalogPlugin](
+ invokeAs[LogicalPlan](plan, "left"),
"catalog").name())
}
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
@@ -257,7 +258,7 @@ trait LineageParser {
}
case p if p.nodeName == "InsertIntoDataSourceCommand" =>
- val logicalRelation = getPlanField[LogicalRelation]("logicalRelation",
plan)
+ val logicalRelation = getField[LogicalRelation](plan,
"logicalRelation")
val table =
logicalRelation.catalogTable.map(_.qualifiedName).getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
@@ -266,8 +267,8 @@ trait LineageParser {
case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" =>
val table =
- getPlanField[Option[CatalogTable]]("catalogTable",
plan).map(_.qualifiedName).getOrElse(
- "")
+ getField[Option[CatalogTable]](plan,
"catalogTable").map(_.qualifiedName)
+ .getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
@@ -277,15 +278,15 @@ trait LineageParser {
if p.nodeName == "InsertIntoDataSourceDirCommand" ||
p.nodeName == "InsertIntoHiveDirCommand" =>
val dir =
- getPlanField[CatalogStorageFormat]("storage",
plan).locationUri.map(_.toString).getOrElse(
- "")
+ getField[CatalogStorageFormat](plan,
"storage").locationUri.map(_.toString)
+ .getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if dir.nonEmpty =>
k.withName(s"`$dir`.${k.name}") -> v
}
case p if p.nodeName == "InsertIntoHiveTable" =>
- val table = getPlanField[CatalogTable]("table", plan).qualifiedName
+ val table = getField[CatalogTable](plan, "table").qualifiedName
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -297,14 +298,14 @@ trait LineageParser {
if p.nodeName == "AppendData"
|| p.nodeName == "OverwriteByExpression"
|| p.nodeName == "OverwritePartitionsDynamic" =>
- val table = getPlanField[NamedRelation]("table", plan).name
+ val table = getField[NamedRelation](plan, "table").name
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "MergeIntoTable" =>
- val matchedActions = getPlanField[Seq[MergeAction]]("matchedActions",
plan)
- val notMatchedActions =
getPlanField[Seq[MergeAction]]("notMatchedActions", plan)
+ val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
+ val notMatchedActions = getField[Seq[MergeAction]](plan,
"notMatchedActions")
val allAssignments = (matchedActions ++ notMatchedActions).collect {
case UpdateAction(_, assignments) => assignments
case InsertAction(_, assignments) => assignments
@@ -314,8 +315,8 @@ trait LineageParser {
assignment.key.asInstanceOf[Attribute],
assignment.value.references)
}: _*)
- val targetTable = getPlanField[LogicalPlan]("targetTable", plan)
- val sourceTable = getPlanField[LogicalPlan]("sourceTable", plan)
+ val targetTable = getField[LogicalPlan](plan, "targetTable")
+ val sourceTable = getField[LogicalPlan](plan, "sourceTable")
val targetColumnsLineage = extractColumnsLineage(
targetTable,
nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) })
@@ -474,47 +475,7 @@ trait LineageParser {
}
}
- private def getPlanField[T](field: String, plan: LogicalPlan): T = {
- getFieldVal[T](plan, field)
- }
-
- private def getCurrentPlanField[T](curPlan: LogicalPlan, field: String): T =
{
- getFieldVal[T](curPlan, field)
- }
-
- private def getPlanMethod[T](name: String, plan: LogicalPlan): T = {
- getMethod[T](plan, name)
- }
-
- private def getQuery(plan: LogicalPlan): LogicalPlan = {
- getPlanField[LogicalPlan]("query", plan)
- }
-
- private def getFieldVal[T](o: Any, name: String): T = {
- Try {
- val field = o.getClass.getDeclaredField(name)
- field.setAccessible(true)
- field.get(o)
- } match {
- case Success(value) => value.asInstanceOf[T]
- case Failure(e) =>
- val candidates =
o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
- throw new RuntimeException(s"$name not in $candidates", e)
- }
- }
-
- private def getMethod[T](o: Any, name: String): T = {
- Try {
- val method = o.getClass.getDeclaredMethod(name)
- method.invoke(o)
- } match {
- case Success(value) => value.asInstanceOf[T]
- case Failure(e) =>
- val candidates =
o.getClass.getDeclaredMethods.map(_.getName).mkString("[", ",", "]")
- throw new RuntimeException(s"$name not in $candidates", e)
- }
- }
-
+ private def getQuery(plan: LogicalPlan): LogicalPlan =
getField[LogicalPlan](plan, "query")
}
case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends
LineageParser
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
index 0dea6a2c1..cb6e4b6c5 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala
@@ -16,24 +16,25 @@
*/
package org.apache.kyuubi.engine.jdbc.connection
-import java.sql.{Connection, DriverManager}
+import java.sql.{Connection, Driver, DriverManager}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PROVIDER,
ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_DRIVER_CLASS}
+import org.apache.kyuubi.util.reflect.DynClasses
import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class AbstractConnectionProvider extends Logging {
protected val providers = loadProviders()
def getProviderClass(kyuubiConf: KyuubiConf): String = {
- val specifiedDriverClass = kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS)
- specifiedDriverClass.foreach(Class.forName)
-
- specifiedDriverClass.getOrElse {
+ val driverClass: Class[_ <: Driver] = Option(
+ DynClasses.builder().impl(kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS).get)
+ .orNull().build[Driver]()).getOrElse {
val url = kyuubiConf.get(ENGINE_JDBC_CONNECTION_URL).get
- DriverManager.getDriver(url).getClass.getCanonicalName
+ DriverManager.getDriver(url).getClass
}
+ driverClass.getCanonicalName
}
def create(kyuubiConf: KyuubiConf): Connection = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
index 14ef3d3fd..52edcf220 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.spark.events.EngineEventsStore
import org.apache.kyuubi.service.ServiceState
-import org.apache.kyuubi.util.reflect.DynClasses
+import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
/**
* Note that [[SparkUITab]] is private for Spark
@@ -68,30 +68,29 @@ case class EngineTab(
val sparkServletContextHandlerClz = DynClasses.builder()
.impl("org.sparkproject.jetty.servlet.ServletContextHandler")
.impl("org.eclipse.jetty.servlet.ServletContextHandler")
- .build()
- val attachHandlerMethod = Class.forName("org.apache.spark.ui.SparkUI")
- .getMethod("attachHandler", sparkServletContextHandlerClz)
- val createRedirectHandlerMethod =
Class.forName("org.apache.spark.ui.JettyUtils")
- .getMethod(
- "createRedirectHandler",
+ .buildChecked()
+ val attachHandlerMethod = DynMethods.builder("attachHandler")
+ .impl("org.apache.spark.ui.SparkUI", sparkServletContextHandlerClz)
+ .buildChecked(ui)
+ val createRedirectHandlerMethod =
DynMethods.builder("createRedirectHandler")
+ .impl(
+ "org.apache.spark.ui.JettyUtils",
classOf[String],
classOf[String],
- classOf[(HttpServletRequest) => Unit],
+ classOf[HttpServletRequest => Unit],
classOf[String],
classOf[Set[String]])
+ .buildStaticChecked()
attachHandlerMethod
.invoke(
- ui,
createRedirectHandlerMethod
- .invoke(null, "/kyuubi/stop", "/kyuubi", handleKillRequest _, "",
Set("GET", "POST")))
+ .invoke("/kyuubi/stop", "/kyuubi", handleKillRequest _, "",
Set("GET", "POST")))
attachHandlerMethod
.invoke(
- ui,
createRedirectHandlerMethod
.invoke(
- null,
"/kyuubi/gracefulstop",
"/kyuubi",
handleGracefulKillRequest _,
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
index a9e486fb2..c9fd8203c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TStatus, TStatusCode}
import org.apache.kyuubi.Utils.stringifyException
+import org.apache.kyuubi.util.reflect.DynConstructors
/**
* @param reason a description of the exception
@@ -139,9 +140,10 @@ object KyuubiSQLException {
}
private def newInstance(className: String, message: String, cause:
Throwable): Throwable = {
try {
- Class.forName(className)
- .getConstructor(classOf[String], classOf[Throwable])
- .newInstance(message, cause).asInstanceOf[Throwable]
+ DynConstructors.builder()
+ .impl(className, classOf[String], classOf[Throwable])
+ .buildChecked[Throwable]()
+ .newInstance(message, cause)
} catch {
case _: Exception => new RuntimeException(className + ":" + message,
cause)
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala
index 2bcfe9a67..3216a43be 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.service.authentication
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.util.reflect.DynConstructors
trait EngineSecuritySecretProvider {
@@ -50,9 +51,10 @@ class SimpleEngineSecuritySecretProviderImpl extends
EngineSecuritySecretProvide
object EngineSecuritySecretProvider {
def create(conf: KyuubiConf): EngineSecuritySecretProvider = {
- val providerClass =
Class.forName(conf.get(ENGINE_SECURITY_SECRET_PROVIDER))
- val provider = providerClass.getConstructor().newInstance()
- .asInstanceOf[EngineSecuritySecretProvider]
+ val provider = DynConstructors.builder()
+ .impl(conf.get(ENGINE_SECURITY_SECRET_PROVIDER))
+ .buildChecked[EngineSecuritySecretProvider]()
+ .newInstance(conf)
provider.initialize(conf)
provider
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index 28806e915..4959c845d 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -33,12 +33,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.util.reflect.ReflectUtils._
object KyuubiHadoopUtils extends Logging {
- private val tokenMapField = classOf[Credentials].getDeclaredField("tokenMap")
- tokenMapField.setAccessible(true)
-
def newHadoopConf(
conf: KyuubiConf,
loadDefaults: Boolean = true): Configuration = {
@@ -76,12 +74,8 @@ object KyuubiHadoopUtils extends Logging {
* Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]]
is not present before
* Hadoop 3.2.1.
*/
- def getTokenMap(credentials: Credentials): Map[Text, Token[_ <:
TokenIdentifier]] = {
- tokenMapField.get(credentials)
- .asInstanceOf[JMap[Text, Token[_ <: TokenIdentifier]]]
- .asScala
- .toMap
- }
+ def getTokenMap(credentials: Credentials): Map[Text, Token[_ <:
TokenIdentifier]] =
+ getField[JMap[Text, Token[_ <: TokenIdentifier]]](credentials,
"tokenMap").asScala.toMap
def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Option[Long] = {
token.decodeIdentifier() match {
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index de849f83e..0a5db3b43 100644
---
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -39,6 +39,7 @@ import
org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
import org.apache.kyuubi.shaded.zookeeper.data.ACL
+import org.apache.kyuubi.util.reflect.DynFields
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT
@@ -156,12 +157,13 @@ abstract class ZookeeperDiscoveryClientSuite extends
DiscoveryClientTests
assert(service.getServiceState === ServiceState.STARTED)
stopZk()
- val isServerLostM =
discovery.getClass.getSuperclass.getDeclaredField("isServerLost")
- isServerLostM.setAccessible(true)
- val isServerLost = isServerLostM.get(discovery)
+ val isServerLost = DynFields.builder()
+ .hiddenImpl(discovery.getClass.getSuperclass, "isServerLost")
+ .buildChecked[AtomicBoolean]()
+ .get(discovery)
eventually(timeout(10.seconds), interval(100.millis)) {
- assert(isServerLost.asInstanceOf[AtomicBoolean].get())
+ assert(isServerLost.get())
assert(discovery.getServiceState === ServiceState.STOPPED)
assert(service.getServiceState === ServiceState.STOPPED)
}
diff --git a/kyuubi-hive-beeline/pom.xml b/kyuubi-hive-beeline/pom.xml
index beacba438..6c5f255dc 100644
--- a/kyuubi-hive-beeline/pom.xml
+++ b/kyuubi-hive-beeline/pom.xml
@@ -40,6 +40,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-beeline</artifactId>
diff --git
a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java
index 7ca767148..b3a2fa307 100644
---
a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java
+++
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java
@@ -19,8 +19,6 @@ package org.apache.hive.beeline;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.sql.Driver;
import java.util.Arrays;
import java.util.Collections;
@@ -29,12 +27,15 @@ import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.kyuubi.util.reflect.DynConstructors;
+import org.apache.kyuubi.util.reflect.DynFields;
+import org.apache.kyuubi.util.reflect.DynMethods;
public class KyuubiBeeLine extends BeeLine {
public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER =
"org.apache.kyuubi.jdbc.KyuubiHiveDriver";
protected KyuubiCommands commands = new KyuubiCommands(this);
- private Driver defaultDriver = null;
+ private Driver defaultDriver;
public KyuubiBeeLine() {
this(true);
@@ -44,20 +45,16 @@ public class KyuubiBeeLine extends BeeLine {
public KyuubiBeeLine(boolean isBeeLine) {
super(isBeeLine);
try {
- Field commandsField = BeeLine.class.getDeclaredField("commands");
- commandsField.setAccessible(true);
- commandsField.set(this, commands);
+ DynFields.builder().hiddenImpl(BeeLine.class,
"commands").buildChecked(this).set(commands);
} catch (Throwable t) {
throw new ExceptionInInitializerError("Failed to inject kyuubi
commands");
}
try {
defaultDriver =
- (Driver)
- Class.forName(
- KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER,
- true,
- Thread.currentThread().getContextClassLoader())
- .newInstance();
+ DynConstructors.builder()
+ .impl(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER)
+ .<Driver>buildChecked()
+ .newInstance();
} catch (Throwable t) {
throw new ExceptionInInitializerError(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER
+ "-missing");
}
@@ -115,25 +112,26 @@ public class KyuubiBeeLine extends BeeLine {
BeelineParser beelineParser;
boolean connSuccessful;
boolean exit;
- Field exitField;
+ DynFields.BoundField<Boolean> exitField;
try {
- Field optionsField = BeeLine.class.getDeclaredField("options");
- optionsField.setAccessible(true);
- Options options = (Options) optionsField.get(this);
+ Options options =
+ DynFields.builder()
+ .hiddenImpl(BeeLine.class, "options")
+ .<Options>buildStaticChecked()
+ .get();
beelineParser = new BeelineParser();
cl = beelineParser.parse(options, args);
- Method connectUsingArgsMethod =
- BeeLine.class.getDeclaredMethod(
- "connectUsingArgs", BeelineParser.class, CommandLine.class);
- connectUsingArgsMethod.setAccessible(true);
- connSuccessful = (boolean) connectUsingArgsMethod.invoke(this,
beelineParser, cl);
+ connSuccessful =
+ DynMethods.builder("connectUsingArgs")
+ .hiddenImpl(BeeLine.class, BeelineParser.class,
CommandLine.class)
+ .buildChecked(this)
+ .invoke(beelineParser, cl);
- exitField = BeeLine.class.getDeclaredField("exit");
- exitField.setAccessible(true);
- exit = (boolean) exitField.get(this);
+ exitField = DynFields.builder().hiddenImpl(BeeLine.class,
"exit").buildChecked(this);
+ exit = exitField.get();
} catch (ParseException e1) {
output(e1.getMessage());
@@ -149,10 +147,11 @@ public class KyuubiBeeLine extends BeeLine {
// no-op if the file is not present
if (!connSuccessful && !exit) {
try {
- Method defaultBeelineConnectMethod =
- BeeLine.class.getDeclaredMethod("defaultBeelineConnect",
CommandLine.class);
- defaultBeelineConnectMethod.setAccessible(true);
- connSuccessful = (boolean) defaultBeelineConnectMethod.invoke(this,
cl);
+ connSuccessful =
+ DynMethods.builder("defaultBeelineConnect")
+ .hiddenImpl(BeeLine.class, CommandLine.class)
+ .buildChecked(this)
+ .invoke(beelineParser, cl);
} catch (Exception t) {
error(t.getMessage());
@@ -184,7 +183,7 @@ public class KyuubiBeeLine extends BeeLine {
}
try {
exit = true;
- exitField.set(this, exit);
+ exitField.set(exit);
} catch (Exception e) {
error(e.getMessage());
return 1;
diff --git a/kyuubi-hive-jdbc/pom.xml b/kyuubi-hive-jdbc/pom.xml
index d6889e365..36c27efe0 100644
--- a/kyuubi-hive-jdbc/pom.xml
+++ b/kyuubi-hive-jdbc/pom.xml
@@ -35,6 +35,11 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java
index 1ac0adf04..7d26f8078 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hive.service.rpc.thrift.TStatus;
+import org.apache.kyuubi.util.reflect.DynConstructors;
public class KyuubiSQLException extends SQLException {
@@ -186,7 +187,10 @@ public class KyuubiSQLException extends SQLException {
private static Throwable newInstance(String className, String message) {
try {
- return (Throwable)
Class.forName(className).getConstructor(String.class).newInstance(message);
+ return DynConstructors.builder()
+ .impl(className, String.class)
+ .<Throwable>buildChecked()
+ .newInstance(message);
} catch (Exception e) {
return new RuntimeException(className + ":" + message);
}
diff --git a/kyuubi-rest-client/pom.xml b/kyuubi-rest-client/pom.xml
index a9ceb9bb3..176051deb 100644
--- a/kyuubi-rest-client/pom.xml
+++ b/kyuubi-rest-client/pom.xml
@@ -77,6 +77,12 @@
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java
index 435a85014..c66c6465e 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java
@@ -17,13 +17,13 @@
package org.apache.kyuubi.client.auth;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.Base64;
import javax.security.auth.Subject;
import org.apache.kyuubi.client.exception.KyuubiRestException;
+import org.apache.kyuubi.util.reflect.DynFields;
+import org.apache.kyuubi.util.reflect.DynMethods;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@@ -61,13 +61,17 @@ public class SpnegoAuthHeaderGenerator implements
AuthHeaderGenerator {
private String generateToken(String server) throws Exception {
Subject subject;
try {
- Class<?> ugiClz = Class.forName(UGI_CLASS);
- Method ugiGetCurrentUserMethod =
ugiClz.getDeclaredMethod("getCurrentUser");
- Object ugiCurrentUser = ugiGetCurrentUserMethod.invoke(null);
+ Object ugiCurrentUser =
+ DynMethods.builder("getCurrentUser")
+ .hiddenImpl(Class.forName(UGI_CLASS))
+ .buildStaticChecked()
+ .invoke();
LOG.debug("The user credential is {}", ugiCurrentUser);
- Field ugiSubjectField =
ugiCurrentUser.getClass().getDeclaredField("subject");
- ugiSubjectField.setAccessible(true);
- subject = (Subject) ugiSubjectField.get(ugiCurrentUser);
+ subject =
+ DynFields.builder()
+ .hiddenImpl(ugiCurrentUser.getClass(), "subject")
+ .<Subject>buildChecked(ugiCurrentUser)
+ .get();
} catch (ClassNotFoundException e) {
// TODO do kerberos authentication using JDK class directly
LOG.error("Hadoop UGI class {} is required for SPNEGO authentication.",
UGI_CLASS);
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
index 17ad69524..da4c8e4a9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
@@ -21,6 +21,7 @@ import scala.util.control.NonFatal
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.util.reflect.DynConstructors
private[kyuubi] object PluginLoader {
@@ -31,8 +32,7 @@ private[kyuubi] object PluginLoader {
}
try {
- Class.forName(advisorClass.get).getConstructor().newInstance()
- .asInstanceOf[SessionConfAdvisor]
+
DynConstructors.builder.impl(advisorClass.get).buildChecked[SessionConfAdvisor].newInstance()
} catch {
case _: ClassCastException =>
throw new KyuubiException(
@@ -45,8 +45,7 @@ private[kyuubi] object PluginLoader {
def loadGroupProvider(conf: KyuubiConf): GroupProvider = {
val groupProviderClass = conf.get(KyuubiConf.GROUP_PROVIDER)
try {
- Class.forName(groupProviderClass).getConstructor().newInstance()
- .asInstanceOf[GroupProvider]
+
DynConstructors.builder().impl(groupProviderClass).buildChecked[GroupProvider]().newInstance()
} catch {
case _: ClassCastException =>
throw new KyuubiException(
diff --git
a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
index 6e306e371..5ded0af2c 100644
---
a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
+++
b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
@@ -21,7 +21,7 @@ import java.util.ServiceLoader
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
object ReflectUtils {
/**
@@ -37,32 +37,56 @@ object ReflectUtils {
DynClasses.builder().loader(cl).impl(className).buildChecked()
}.isSuccess
- def getFieldVal[T](target: Any, fieldName: String): T =
- Try {
- DynFields.builder().hiddenImpl(target.getClass,
fieldName).build[T]().get(target)
- } match {
- case Success(value) => value
- case Failure(e) =>
- val candidates =
target.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
- throw new RuntimeException(s"$fieldName not in ${target.getClass}
$candidates", e)
+ /**
+ * get the field value of the given object
+ * @param target the target object
+ * @param fieldName the field name from declared field names
+ * @tparam T the expected return class type
+ * @return
+ */
+ def getField[T](target: Any, fieldName: String): T = {
+ val targetClass = target.getClass
+ try {
+ DynFields.builder()
+ .hiddenImpl(targetClass, fieldName)
+ .buildChecked[T](target)
+ .get()
+ } catch {
+ case e: Exception =>
+ val candidates = targetClass.getDeclaredFields.map(_.getName).sorted
+ throw new RuntimeException(
+ s"Field $fieldName not in $targetClass
[${candidates.mkString(",")}]",
+ e)
}
+ }
- def getFieldValOpt[T](target: Any, name: String): Option[T] =
- Try(getFieldVal[T](target, name)).toOption
-
- def invoke(target: AnyRef, methodName: String, args: (Class[_], AnyRef)*):
AnyRef =
+ /**
+ * Invoke a method with the given name and arguments on the given target
object.
+ * @param target the target object
+ * @param methodName the method name from declared field names
+ * @param args pairs of class and values for the arguments
+ * @tparam T the expected return class type,
+ * returning type Nothing if it's not provided or inferable
+ * @return
+ */
+ def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_],
AnyRef)*): T = {
+ val targetClass = target.getClass
+ val argClasses = args.map(_._1)
try {
- val (types, values) = args.unzip
- DynMethods.builder(methodName).hiddenImpl(target.getClass, types:
_*).build()
- .invoke(target, values: _*)
+ DynMethods.builder(methodName)
+ .hiddenImpl(targetClass, argClasses: _*)
+ .buildChecked(target)
+ .invoke[T](args.map(_._2): _*)
} catch {
- case e: NoSuchMethodException =>
- val candidates =
target.getClass.getMethods.map(_.getName).mkString("[", ",", "]")
- throw new RuntimeException(s"$methodName not in ${target.getClass}
$candidates", e)
+ case e: Exception =>
+ val candidates = targetClass.getDeclaredMethods.map(_.getName).sorted
+ val argClassesNames = argClasses.map(_.getClass.getName)
+ throw new RuntimeException(
+ s"Method $methodName (${argClassesNames.mkString(",")})" +
+ s" not found in $targetClass [${candidates.mkString(",")}]",
+ e)
}
-
- def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_],
AnyRef)*): T =
- invoke(target, methodName, args: _*).asInstanceOf[T]
+ }
/**
* Creates a iterator for with a new service loader for the given service
type and class
diff --git
a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala
b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala
index dfaa1f412..ae4651fe2 100644
---
a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala
+++
b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala
@@ -19,9 +19,11 @@ package org.apache.kyuubi.tools
import java.io.File
import java.nio.file.Files
+import java.util.{Map => JMap}
import java.util.UUID
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite {
import KubernetesSparkBlockCleanerConstants._
@@ -83,10 +85,7 @@ class KubernetesSparkBlockCleanerSuite extends
KyuubiFunSuite {
}
private def updateEnv(name: String, value: String): Unit = {
- val env = System.getenv
- val field = env.getClass.getDeclaredField("m")
- field.setAccessible(true)
- field.get(env).asInstanceOf[java.util.Map[String, String]].put(name, value)
+ getField[JMap[String, String]](System.getenv, "m").put(name, value)
}
test("test clean") {