This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4deb98cd4 [KYUUBI #4970] Unified reflection methods invokeAs and
getField
4deb98cd4 is described below
commit 4deb98cd429ed33b469494d0edfb4ad4028835b8
Author: liangbowen <[email protected]>
AuthorDate: Fri Jun 16 20:08:42 2023 +0800
[KYUUBI #4970] Unified reflection methods invokeAs and getField
### _Why are the changes needed?_
- comment https://github.com/apache/kyuubi/pull/4963#discussion_r1230490326
- simplify reflection calling with unified `invokeAs` / `getField` method
for either declared, inherited, or static methods / fields
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4970 from bowenliang123/unify-invokeas.
Closes #4970
592833459 [liangbowen] Revert "dedicate invokeStaticAs method"
ad45ff3fd [liangbowen] dedicate invokeStaticAs method
f08528c0f [liangbowen] nit
42aeb9fcf [liangbowen] add ut case
b5b384120 [liangbowen] nit
072add599 [liangbowen] add ut
8d019ab35 [liangbowen] unified invokeAs and getField
Authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../spark/authz/util/RangerConfigProvider.scala | 12 +--
.../kyuubi/engine/flink/FlinkEngineUtils.scala | 37 +++------
.../kyuubi/engine/flink/shim/FlinkResultSet.scala | 51 +-----------
.../engine/flink/shim/FlinkSessionManager.scala | 92 +++-------------------
.../engine/spark/util/SparkCatalogUtils.scala | 6 +-
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 11 +--
.../operation/SparkArrowbasedOperationSuite.scala | 10 +--
.../operation/log/Log4j2DivertAppender.scala | 9 +--
.../zookeeper/ZookeeperDiscoveryClientSuite.scala | 8 +-
.../apache/kyuubi/util/reflect/ReflectUtils.scala | 54 +++++++++----
.../kyuubi/util/reflect/ReflectUtilsSuite.scala | 68 +++++++++++++++-
11 files changed, 146 insertions(+), 212 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
index 806914286..a61d94a8f 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.hadoop.conf.Configuration
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
-import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.ReflectUtils._
trait RangerConfigProvider {
@@ -37,16 +37,10 @@ trait RangerConfigProvider {
val getRangerConf: Configuration = {
if (isRanger21orGreater) {
// for Ranger 2.1+
- DynMethods.builder("getConfig")
- .impl("org.apache.ranger.plugin.service.RangerBasePlugin")
- .buildChecked(this)
- .invoke[Configuration]()
+ invokeAs(this, "getConfig")
} else {
// for Ranger 2.0 and below
- DynMethods.builder("getInstance")
-
.impl("org.apache.ranger.authorization.hadoop.config.RangerConfiguration")
- .buildStaticChecked()
- .invoke[Configuration]()
+
invokeAs("org.apache.ranger.authorization.hadoop.config.RangerConfiguration",
"getInstance")
}
}
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 66d9ed2c5..81441ffdf 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -41,7 +41,8 @@ import org.apache.flink.util.JarUtils
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.util.SemanticVersion
-import org.apache.kyuubi.util.reflect.{DynConstructors, DynFields, DynMethods}
+import org.apache.kyuubi.util.reflect._
+import org.apache.kyuubi.util.reflect.ReflectUtils._
object FlinkEngineUtils extends Logging {
@@ -127,43 +128,27 @@ object FlinkEngineUtils extends Logging {
.newInstance(flinkConf, commandLines)
.asInstanceOf[DefaultContext]
} else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
- DynMethods.builder("load")
- .impl(
- classOf[DefaultContext],
- classOf[Configuration],
- classOf[JList[URL]],
- classOf[Boolean],
- classOf[Boolean])
- .buildStatic()
- .invoke[DefaultContext](
- flinkConf,
- dependencies,
- new JBoolean(true),
- new JBoolean(false))
+ invokeAs[DefaultContext](
+ classOf[DefaultContext],
+ "load",
+ (classOf[Configuration], flinkConf),
+ (classOf[JList[URL]], dependencies),
+ (classOf[Boolean], JBoolean.TRUE),
+ (classOf[Boolean], JBoolean.FALSE))
} else {
throw new KyuubiException(
s"Flink version ${EnvironmentInformation.getVersion} are not supported
currently.")
}
}
- def getSessionContext(session: Session): SessionContext = {
- DynFields.builder()
- .hiddenImpl(classOf[Session], "sessionContext")
- .build()
- .get(session)
- .asInstanceOf[SessionContext]
- }
+ def getSessionContext(session: Session): SessionContext = getField(session,
"sessionContext")
def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
if (FlinkEngineUtils.isFlinkVersionAtMost("1.16")) {
return None
}
try {
- Option(DynFields.builder()
- .hiddenImpl(classOf[ResultFetcher], "jobID")
- .build()
- .get(resultFetch)
- .asInstanceOf[JobID])
+ Option(getField[JobID](resultFetch, "jobID"))
} catch {
case _: NullPointerException => None
case e: Throwable =>
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
index a9c76082d..7fb05c844 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
@@ -23,56 +23,13 @@ import java.util
import org.apache.flink.table.data.RowData
import org.apache.flink.table.gateway.api.results.ResultSet.ResultType
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils
-import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class FlinkResultSet(resultSet: AnyRef) {
- def getData: util.List[RowData] = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("getData")
- .impl("org.apache.flink.table.gateway.api.results.ResultSet")
- .build()
- .invoke(resultSet)
- .asInstanceOf[util.List[RowData]]
- } else {
- DynMethods.builder("getData")
- .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
- .build()
- .invoke(resultSet)
- .asInstanceOf[util.List[RowData]]
- }
- }
+ def getData: util.List[RowData] = invokeAs(resultSet, "getData")
- def getNextToken: JLong = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("getNextToken")
- .impl("org.apache.flink.table.gateway.api.results.ResultSet")
- .build()
- .invoke(resultSet)
- .asInstanceOf[JLong]
- } else {
- DynMethods.builder("getNextToken")
- .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
- .build()
- .invoke(resultSet)
- .asInstanceOf[JLong]
- }
- }
+ def getNextToken: JLong = invokeAs(resultSet, "getNextToken")
- def getResultType: ResultType = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("getResultType")
- .impl("org.apache.flink.table.gateway.api.results.ResultSet")
- .build()
- .invoke(resultSet)
- .asInstanceOf[ResultType]
- } else {
- DynMethods.builder("getResultType")
- .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
- .build()
- .invoke(resultSet)
- .asInstanceOf[ResultType]
- }
- }
+ def getResultType: ResultType = invokeAs(resultSet, "getResultType")
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
index ce5bdfbd9..f1a6afed1 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
@@ -22,7 +22,8 @@ import
org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.flink.table.gateway.service.session.Session
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
-import org.apache.kyuubi.util.reflect.{DynConstructors, DynMethods}
+import org.apache.kyuubi.util.reflect._
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class FlinkSessionManager(engineContext: DefaultContext) {
@@ -42,89 +43,16 @@ class FlinkSessionManager(engineContext: DefaultContext) {
}
}
- def start(): Unit = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("start")
- .impl("org.apache.flink.table.gateway.service.session.SessionManager")
- .build()
- .invoke(sessionManager)
- } else {
- DynMethods.builder("start")
-
.impl("org.apache.flink.table.gateway.service.session.SessionManagerImpl")
- .build()
- .invoke(sessionManager)
- }
- }
+ def start(): Unit = invokeAs(sessionManager, "start")
- def stop(): Unit = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("stop")
- .impl("org.apache.flink.table.gateway.service.session.SessionManager")
- .build()
- .invoke(sessionManager)
- } else {
- DynMethods.builder("stop")
-
.impl("org.apache.flink.table.gateway.service.session.SessionManagerImpl")
- .build()
- .invoke(sessionManager)
- }
- }
+ def stop(): Unit = invokeAs(sessionManager, "stop")
- def getSession(sessionHandle: SessionHandle): Session = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("getSession")
- .impl(
- "org.apache.flink.table.gateway.service.session.SessionManager",
- classOf[SessionHandle])
- .build()
- .invoke(sessionManager, sessionHandle)
- .asInstanceOf[Session]
- } else {
- DynMethods.builder("getSession")
- .impl(
- "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
- classOf[SessionHandle])
- .build()
- .invoke(sessionManager, sessionHandle)
- .asInstanceOf[Session]
- }
- }
+ def getSession(sessionHandle: SessionHandle): Session =
+ invokeAs(sessionManager, "getSession", (classOf[SessionHandle],
sessionHandle))
- def openSession(environment: SessionEnvironment): Session = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("openSession")
- .impl(
- "org.apache.flink.table.gateway.service.session.SessionManager",
- classOf[SessionEnvironment])
- .build()
- .invoke(sessionManager, environment)
- .asInstanceOf[Session]
- } else {
- DynMethods.builder("openSession")
- .impl(
- "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
- classOf[SessionEnvironment])
- .build()
- .invoke(sessionManager, environment)
- .asInstanceOf[Session]
- }
- }
+ def openSession(environment: SessionEnvironment): Session =
+ invokeAs(sessionManager, "openSession", (classOf[SessionEnvironment],
environment))
- def closeSession(sessionHandle: SessionHandle): Unit = {
- if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
- DynMethods.builder("closeSession")
- .impl(
- "org.apache.flink.table.gateway.service.session.SessionManager",
- classOf[SessionHandle])
- .build()
- .invoke(sessionManager, sessionHandle)
- } else {
- DynMethods.builder("closeSession")
- .impl(
- "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
- classOf[SessionHandle])
- .build()
- .invoke(sessionManager, sessionHandle)
- }
- }
+ def closeSession(sessionHandle: SessionHandle): Unit =
+ invokeAs(sessionManager, "closeSession", (classOf[SessionHandle],
sessionHandle))
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
index 13b87665d..e05e8731d 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
@@ -27,8 +27,7 @@ import org.apache.spark.sql.types.StructField
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.spark.schema.SchemaHelper
-import org.apache.kyuubi.util.reflect.DynMethods
-import org.apache.kyuubi.util.reflect.ReflectUtils.{getField, invokeAs}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
/**
* A shim that defines the interface interact with Spark's catalogs
@@ -56,8 +55,7 @@ object SparkCatalogUtils extends Logging {
val sessionCatalog = invokeAs[AnyRef](catalogMgr, "v2SessionCatalog")
val defaultCatalog = catalogMgr.currentCatalog
- val defaults = Seq(sessionCatalog, defaultCatalog).distinct.map(catalog =>
-
DynMethods.builder("name").impl(catalog.getClass).buildChecked(catalog).invoke[String]())
+ val defaults = Seq(sessionCatalog,
defaultCatalog).distinct.map(invokeAs[String](_, "name"))
val catalogs = getField[scala.collection.Map[String, _]](catalogMgr,
"catalogs")
(catalogs.keys ++: defaults).distinct.map(Row(_))
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 06eb9a144..170f108b2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.schema.RowSet
import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.ReflectUtils._
object SparkDatasetHelper extends Logging {
@@ -236,15 +237,9 @@ object SparkDatasetHelper extends Logging {
private def withFinalPlanUpdate[T](
adaptiveSparkPlanExec: AdaptiveSparkPlanExec,
fun: SparkPlan => T): T = {
- val getFinalPhysicalPlan = DynMethods.builder("getFinalPhysicalPlan")
- .hiddenImpl(adaptiveSparkPlanExec.getClass)
- .build()
- val plan = getFinalPhysicalPlan.invoke[SparkPlan](adaptiveSparkPlanExec)
+ val plan = invokeAs[SparkPlan](adaptiveSparkPlanExec,
"getFinalPhysicalPlan")
val result = fun(plan)
- val finalPlanUpdate = DynMethods.builder("finalPlanUpdate")
- .hiddenImpl(adaptiveSparkPlanExec.getClass)
- .build()
- finalPlanUpdate.invoke[Unit](adaptiveSparkPlanExec)
+ invokeAs[Unit](adaptiveSparkPlanExec, "finalPlanUpdate")
result
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index b8c64b5f2..8818ae7a9 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -43,6 +43,7 @@ import org.apache.kyuubi.engine.spark.{SparkSQLEngine,
WithSparkSQLEngine}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.SparkDataTypeTests
import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with
SparkDataTypeTests
with SparkMetricsTestUtils {
@@ -527,13 +528,8 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
* TODO: Once we drop support for Spark 3.1.x, we can directly call
* [[SQLConf.isStaticConfigKey()]].
*/
- private def isStaticConfigKey(key: String): Boolean = {
- val staticConfKeys = DynFields.builder()
- .hiddenImpl(SQLConf.getClass, "staticConfKeys")
- .build[JSet[String]](SQLConf)
- .get()
- staticConfKeys.contains(key)
- }
+ private def isStaticConfigKey(key: String): Boolean =
+ getField[JSet[String]]((SQLConf.getClass, SQLConf),
"staticConfKeys").contains(key)
// the signature of function [[ArrowConverters.fromBatchIterator]] is
changed in SPARK-43528
// (since Spark 3.5)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
index 1e5684d4c..0daaeae48 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala
@@ -28,7 +28,7 @@ import
org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.apache.logging.log4j.core.layout.PatternLayout
-import org.apache.kyuubi.util.reflect.DynFields
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class Log4j2DivertAppender(
name: String,
@@ -63,11 +63,8 @@ class Log4j2DivertAppender(
}
})
- private val writeLock = DynFields.builder()
- .hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
- .build[ReadWriteLock](this)
- .get()
- .writeLock
+ private val writeLock =
+ getField[ReadWriteLock]((classOf[AbstractWriterAppender[_]], this),
"readWriteLock").writeLock
/**
* Overrides AbstractWriterAppender.append(), which does the real logging.
No need
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index 0a5db3b43..dd78e1fb8 100644
---
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -39,7 +39,7 @@ import
org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
import org.apache.kyuubi.shaded.zookeeper.data.ACL
-import org.apache.kyuubi.util.reflect.DynFields
+import org.apache.kyuubi.util.reflect.ReflectUtils._
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT
@@ -157,10 +157,8 @@ abstract class ZookeeperDiscoveryClientSuite extends
DiscoveryClientTests
assert(service.getServiceState === ServiceState.STARTED)
stopZk()
- val isServerLost = DynFields.builder()
- .hiddenImpl(discovery.getClass.getSuperclass, "isServerLost")
- .buildChecked[AtomicBoolean]()
- .get(discovery)
+ val isServerLost =
+ getField[AtomicBoolean]((discovery.getClass.getSuperclass, discovery),
"isServerLost")
eventually(timeout(10.seconds), interval(100.millis)) {
assert(isServerLost.get())
diff --git
a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
index 5ded0af2c..78b0e1df7 100644
---
a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
+++
b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala
@@ -20,8 +20,10 @@ package org.apache.kyuubi.util.reflect
import java.util.ServiceLoader
import scala.collection.JavaConverters._
+import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
+
object ReflectUtils {
/**
@@ -44,18 +46,24 @@ object ReflectUtils {
* @tparam T the expected return class type
* @return
*/
- def getField[T](target: Any, fieldName: String): T = {
- val targetClass = target.getClass
+ def getField[T](target: AnyRef, fieldName: String): T = {
+ val (clz, obj) = getTargetClass(target)
try {
- DynFields.builder()
- .hiddenImpl(targetClass, fieldName)
- .buildChecked[T](target)
- .get()
+ val field = DynFields.builder
+ .hiddenImpl(clz, fieldName)
+ .impl(clz, fieldName)
+ .build[T]
+ if (field.isStatic) {
+ field.asStatic.get
+ } else {
+ field.bind(obj).get
+ }
} catch {
case e: Exception =>
- val candidates = targetClass.getDeclaredFields.map(_.getName).sorted
+ val candidates =
+ (clz.getDeclaredFields ++
clz.getFields).map(_.getName).distinct.sorted
throw new RuntimeException(
- s"Field $fieldName not in $targetClass
[${candidates.mkString(",")}]",
+ s"Field $fieldName not in $clz [${candidates.mkString(",")}]",
e)
}
}
@@ -70,20 +78,26 @@ object ReflectUtils {
* @return
*/
def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_],
AnyRef)*): T = {
- val targetClass = target.getClass
+ val (clz, obj) = getTargetClass(target)
val argClasses = args.map(_._1)
try {
- DynMethods.builder(methodName)
- .hiddenImpl(targetClass, argClasses: _*)
- .buildChecked(target)
- .invoke[T](args.map(_._2): _*)
+ val method = DynMethods.builder(methodName)
+ .hiddenImpl(clz, argClasses: _*)
+ .impl(clz, argClasses: _*)
+ .buildChecked
+ if (method.isStatic) {
+ method.asStatic.invoke[T](args.map(_._2): _*)
+ } else {
+ method.bind(obj).invoke[T](args.map(_._2): _*)
+ }
} catch {
case e: Exception =>
- val candidates = targetClass.getDeclaredMethods.map(_.getName).sorted
+ val candidates =
+ (clz.getDeclaredMethods ++
clz.getMethods).map(_.getName).distinct.sorted
val argClassesNames = argClasses.map(_.getClass.getName)
throw new RuntimeException(
- s"Method $methodName (${argClassesNames.mkString(",")})" +
- s" not found in $targetClass [${candidates.mkString(",")}]",
+ s"Method $methodName(${argClassesNames.mkString(",")})" +
+ s" not found in $clz [${candidates.mkString(",")}]",
e)
}
}
@@ -101,4 +115,12 @@ object ReflectUtils {
def loadFromServiceLoader[T](cl: ClassLoader =
Thread.currentThread().getContextClassLoader)(
implicit ct: ClassTag[T]): Iterator[T] =
ServiceLoader.load(ct.runtimeClass,
cl).iterator().asScala.map(_.asInstanceOf[T])
+
+ private def getTargetClass(target: AnyRef): (Class[_], AnyRef) = target
match {
+ case clz: Class[_] => (clz, None)
+ case clzName: String => (DynClasses.builder.impl(clzName).buildChecked,
None)
+ case (clz: Class[_], o: AnyRef) => (clz, o)
+ case (clzName: String, o: AnyRef) =>
(DynClasses.builder.impl(clzName).buildChecked, o)
+ case o => (o.getClass, o)
+ }
}
diff --git
a/kyuubi-util-scala/src/test/java/org/apache/kyuubi/util/reflect/ReflectUtilsSuite.scala
b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/util/reflect/ReflectUtilsSuite.scala
index f28024f18..dbfd234de 100644
---
a/kyuubi-util-scala/src/test/java/org/apache/kyuubi/util/reflect/ReflectUtilsSuite.scala
+++
b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/util/reflect/ReflectUtilsSuite.scala
@@ -16,14 +16,78 @@
*/
package org.apache.kyuubi.util.reflect
-import org.apache.kyuubi.util.reflect.ReflectUtils._
-// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
+
+// scalastyle:off
+import org.apache.kyuubi.util.reflect.ReflectUtils._
class ReflectUtilsSuite extends AnyFunSuite {
// scalastyle:on
+ private val obj1 = new ClassA
+ private val obj2 = new ClassB
+
test("check class loadable") {
assert(isClassLoadable(getClass.getName))
assert(!isClassLoadable("org.apache.kyuubi.NonExistClass"))
}
+
+ test("check invokeAs on base class") {
+ assertResult("method1")(invokeAs[String](obj1, "method1"))
+ assertResult("method2")(invokeAs[String](obj1, "method2"))
+ }
+ test("check invokeAs on sub class") {
+ assertResult("method1")(invokeAs[String](obj2, "method1"))
+ assertResult("method2")(invokeAs[String]((classOf[ClassA], obj2),
"method2"))
+ assertResult("method3")(invokeAs[String](obj2, "method3"))
+ assertResult("method4")(invokeAs[String](obj2, "method4"))
+ }
+
+ test("check invokeAs on object and static class") {
+ assertResult("method5")(invokeAs[String](ObjectA, "method5"))
+ assertResult("method6")(invokeAs[String](ObjectA, "method6"))
+
assertResult("method5")(invokeAs[String]("org.apache.kyuubi.util.reflect.ObjectA",
"method5"))
+ }
+
+ test("check getField on base class") {
+ assertResult("field0")(getField[String](obj1, "field0"))
+ assertResult("field1")(getField[String](obj1, "field1"))
+ assertResult("field2")(getField[String](obj1, "field2"))
+ }
+
+ test("check getField on subclass") {
+ assertResult("field0")(getField[String]((classOf[ClassA], obj2), "field0"))
+ assertResult("field1")(getField[String]((classOf[ClassA], obj2), "field1"))
+ assertResult("field2")(getField[String]((classOf[ClassA], obj2), "field2"))
+ assertResult("field3")(getField[String](obj2, "field3"))
+ assertResult("field4")(getField[String](obj2, "field4"))
+ }
+
+ test("check getField on object and static class") {
+ assertResult("field5")(getField[String](ObjectA, "field5"))
+ assertResult("field6")(getField[String](ObjectA, "field6"))
+ }
+}
+
+class ClassA(val field0: String = "field0") {
+ val field1 = "field1"
+ private val field2 = "field2"
+
+ def method1(): String = "method1"
+ private def method2(): String = "method2"
+}
+
+class ClassB extends ClassA {
+ val field3 = "field3"
+ private val field4 = "field4"
+
+ def method3(): String = "method3"
+ private def method4(): String = "method4"
+}
+
+object ObjectA {
+ val field5 = "field5"
+ private val field6 = "field6"
+
+ def method5(): String = "method5"
+ private def method6(): String = "method6"
}