This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c5b7e59335 [CORE] Follow-up: switch to direct class references guarded
by component's runtime-compatibility validation (#10999)
c5b7e59335 is described below
commit c5b7e59335201f960cda49dff7edc315b36ed05e
Author: PHILO-HE <[email protected]>
AuthorDate: Tue Nov 4 21:35:37 2025 +0800
[CORE] Follow-up: switch to direct class references guarded by component's
runtime-compatibility validation (#10999)
---
.../apache/gluten/component/CHDeltaComponent.scala | 5 ++
.../gluten/component/CHIcebergComponent.scala | 9 +++-
.../gluten/component/VeloxDeltaComponent.scala | 8 +--
.../gluten/component/VeloxHudiComponent.scala | 8 +--
.../gluten/component/VeloxIcebergComponent.scala | 10 +---
.../gluten/component/VeloxPaimonComponent.scala | 10 +---
.../apache/spark/util/SparkReflectionUtil.scala | 9 ++--
.../apache/gluten/extension/OffloadDeltaScan.scala | 3 +-
.../gluten/execution/IcebergScanTransformer.scala | 2 +-
.../spark/source/GlutenIcebergSourceUtil.scala | 6 ++-
.../iceberg/spark/source/IcebergWriteUtil.scala | 57 ++++++++++++++--------
.../gluten/execution/PaimonScanTransformer.scala | 2 +-
12 files changed, 68 insertions(+), 61 deletions(-)
diff --git
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
index c4d8c8d8cb..f9fd8e7245 100644
---
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
+++
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
@@ -29,6 +29,7 @@ import org.apache.gluten.sql.shims.DeltaShimLoader
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
+import org.apache.spark.util.SparkReflectionUtil
class CHDeltaComponent extends Component {
override def name(): String = "ch-delta"
@@ -36,6 +37,10 @@ class CHDeltaComponent extends Component {
Component.BuildInfo("CHDelta", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend]
:: Nil
+ override def isRuntimeCompatible: Boolean = {
+
SparkReflectionUtil.isClassPresent("io.delta.sql.DeltaSparkSessionExtension")
+ }
+
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit =
DeltaShimLoader.getDeltaShims.onDriverStart(sc, pc)
diff --git
a/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
b/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
index 7da4d1ec90..cbd5a09362 100644
---
a/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
+++
b/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
@@ -14,18 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gluten.component
import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.extension.OffloadIcebergScan
import org.apache.gluten.extension.injector.Injector
+import org.apache.spark.util.SparkReflectionUtil
+
class CHIcebergComponent extends Component {
override def name(): String = "clickhouse-iceberg"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("ClickHouseIceberg", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend]
:: Nil
+
+ override def isRuntimeCompatible: Boolean = {
+ SparkReflectionUtil.isClassPresent(
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ }
+
override def injectRules(injector: Injector): Unit = {
OffloadIcebergScan.inject(injector)
}
diff --git
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
index 18f92919e7..b1da590246 100644
---
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
+++
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -34,13 +34,7 @@ class VeloxDeltaComponent extends Component {
override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
override def isRuntimeCompatible: Boolean = {
- try {
-
SparkReflectionUtil.classForName("io.delta.sql.DeltaSparkSessionExtension")
- true
- } catch {
- case _: ClassNotFoundException =>
- false
- }
+
SparkReflectionUtil.isClassPresent("io.delta.sql.DeltaSparkSessionExtension")
}
override def injectRules(injector: Injector): Unit = {
diff --git
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
index e31c7bb35b..b347955441 100644
---
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
+++
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
@@ -34,13 +34,7 @@ class VeloxHudiComponent extends Component {
override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
override def isRuntimeCompatible: Boolean = {
- try {
-
SparkReflectionUtil.classForName("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
- true
- } catch {
- case _: ClassNotFoundException =>
- false
- }
+
SparkReflectionUtil.isClassPresent("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
}
override def injectRules(injector: Injector): Unit = {
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
index 4b49f0e3c8..6b5d75ef57 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
@@ -29,14 +29,8 @@ class VeloxIcebergComponent extends Component {
override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
override def isRuntimeCompatible: Boolean = {
- try {
- SparkReflectionUtil.classForName(
- "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- true
- } catch {
- case _: ClassNotFoundException =>
- false
- }
+ SparkReflectionUtil.isClassPresent(
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
}
override def injectRules(injector: Injector): Unit = {
diff --git
a/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
b/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
index 625480c6b4..dbe1da3af4 100644
---
a/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
+++
b/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
@@ -34,14 +34,8 @@ class VeloxPaimonComponent extends Component {
override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
override def isRuntimeCompatible: Boolean = {
- try {
- SparkReflectionUtil.classForName(
- "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
- true
- } catch {
- case _: ClassNotFoundException =>
- false
- }
+ SparkReflectionUtil.isClassPresent(
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
}
override def injectRules(injector: Injector): Unit = {
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
index 60a15c8be9..c1c8a85c85 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
@@ -28,12 +28,13 @@ object SparkReflectionUtil {
Utils.classForName(className, initialize, noSparkClassLoader)
}
- def isInstanceOfClassName(obj: Any, className: String): Boolean = {
+ def isClassPresent(className: String): Boolean = {
try {
- val cls = classForName(className)
- cls.isInstance(obj)
+ classForName(className)
+ true
} catch {
- case _: ClassNotFoundException => false
+ case _: ClassNotFoundException =>
+ false
}
}
}
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
index e7e91ae788..5fe1b4ba86 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
@@ -25,8 +25,7 @@ import org.apache.spark.sql.execution.{FileSourceScanExec,
SparkPlan}
case class OffloadDeltaScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case scan: FileSourceScanExec
- if scan.relation.fileFormat.getClass.getName ==
- classOf[DeltaParquetFileFormat].getName =>
+ if scan.relation.fileFormat.getClass ==
classOf[DeltaParquetFileFormat] =>
DeltaScanTransformer(scan)
case other => other
}
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 7bfa522a48..a601ce70a9 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -253,7 +253,7 @@ object IcebergScanTransformer {
}
def supportsBatchScan(scan: Scan): Boolean = {
- scan.getClass.getName ==
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
+ scan.getClass == GlutenIcebergSourceUtil.getClassOfSparkBatchQueryScan
}
private def containsUuidOrFixedType(dataType: Type): Boolean = {
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 2436166e92..bb551a4c83 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -30,13 +30,17 @@ import org.apache.spark.sql.types.StructType
import org.apache.iceberg._
import org.apache.iceberg.spark.SparkSchemaUtil
-import java.lang.{Long => JLong}
+import java.lang.{Class, Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList,
Map => JMap}
import scala.collection.JavaConverters._
object GlutenIcebergSourceUtil {
+ def getClassOfSparkBatchQueryScan(): Class[SparkBatchQueryScan] = {
+ classOf[SparkBatchQueryScan]
+ }
+
def genSplitInfo(
partition: SparkDataSourceRDDPartition,
readPartitionSchema: StructType): SplitInfo = {
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index 1d15259732..c7c4c4e672 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -17,7 +17,6 @@
package org.apache.iceberg.spark.source
import org.apache.spark.sql.connector.write.{Write, WriterCommitMessage}
-import org.apache.spark.util.SparkReflectionUtil
import org.apache.iceberg._
import org.apache.iceberg.spark.SparkWriteConf
@@ -28,8 +27,38 @@ import org.apache.iceberg.types.Types.{ListType, MapType}
object IcebergWriteUtil {
+ private lazy val writeSchemaField = {
+ val field = classOf[SparkWrite].getDeclaredField("writeSchema")
+ field.setAccessible(true)
+ field
+ }
+
+ private lazy val writePropertiesField = {
+ val field = classOf[SparkWrite].getDeclaredField("writeProperties")
+ field.setAccessible(true)
+ field
+ }
+
+ private lazy val writeConfField = {
+ val field = classOf[SparkWrite].getDeclaredField("writeConf")
+ field.setAccessible(true)
+ field
+ }
+
+ private lazy val tableField = {
+ val field = classOf[SparkWrite].getDeclaredField("table")
+ field.setAccessible(true)
+ field
+ }
+
+ private lazy val fileFormatField = {
+ val field = classOf[SparkWrite].getDeclaredField("format")
+ field.setAccessible(true)
+ field
+ }
+
def supportsWrite(write: Write): Boolean = {
- SparkReflectionUtil.isInstanceOfClassName(write,
"org.apache.iceberg.spark.source.SparkWrite")
+ write.isInstanceOf[SparkWrite]
}
def hasUnsupportedDataType(write: Write): Boolean = {
@@ -50,38 +79,26 @@ object IcebergWriteUtil {
def getWriteSchema(write: Write): Schema = {
assert(write.isInstanceOf[SparkWrite])
- val field = classOf[SparkWrite].getDeclaredField("writeSchema")
- field.setAccessible(true)
- field.get(write).asInstanceOf[Schema]
+ writeSchemaField.get(write).asInstanceOf[Schema]
}
def getWriteProperty(write: Write): java.util.Map[String, String] = {
- val field = classOf[SparkWrite].getDeclaredField("writeProperties")
- field.setAccessible(true)
- field.get(write).asInstanceOf[java.util.Map[String, String]]
+ writePropertiesField.get(write).asInstanceOf[java.util.Map[String, String]]
}
def getWriteConf(write: Write): SparkWriteConf = {
- val field = classOf[SparkWrite].getDeclaredField("writeConf")
- field.setAccessible(true)
- field.get(write).asInstanceOf[SparkWriteConf]
+ writeConfField.get(write).asInstanceOf[SparkWriteConf]
}
def getTable(write: Write): Table = {
- val field = classOf[SparkWrite].getDeclaredField("table")
- field.setAccessible(true)
- field.get(write).asInstanceOf[Table]
+ tableField.get(write).asInstanceOf[Table]
}
def getFileFormat(write: Write): FileFormat = {
- val field = classOf[SparkWrite].getDeclaredField("format")
- field.setAccessible(true)
- field.get(write).asInstanceOf[FileFormat]
+ fileFormatField.get(write).asInstanceOf[FileFormat]
}
def getDirectory(write: Write): String = {
- val field = classOf[SparkWrite].getDeclaredField("table")
- field.setAccessible(true)
val loc = getTable(write).locationProvider().newDataLocation("")
loc.substring(0, loc.length - 1)
}
@@ -91,8 +108,6 @@ object IcebergWriteUtil {
}
def getPartitionSpec(write: Write): PartitionSpec = {
- val field = classOf[SparkWrite].getDeclaredField("table")
- field.setAccessible(true)
getTable(write).specs().get(getWriteConf(write).outputSpecId())
}
diff --git
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index a4911c8263..0141bf12ce 100644
---
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -210,6 +210,6 @@ object PaimonScanTransformer {
}
def supportsBatchScan(scan: Scan): Boolean = {
- scan.getClass.getName == "org.apache.paimon.spark.PaimonScan"
+ scan.getClass == classOf[PaimonScan]
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]