This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 70691d0bf Use thread context classloader for Iceberg class loading 
(#3738)
70691d0bf is described below

commit 70691d0bf466aacf5e4fb68bad3953b867a65ea8
Author: Karuppayya <[email protected]>
AuthorDate: Fri Mar 20 06:20:29 2026 -0700

    Use thread context classloader for Iceberg class loading (#3738)
---
 .../apache/comet/iceberg/IcebergReflection.scala   | 35 ++++++++++++++++------
 .../serde/operator/CometIcebergNativeScan.scala    | 31 +++++++++----------
 2 files changed, 40 insertions(+), 26 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala 
b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
index d77821239..62710c28d 100644
--- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
+++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
@@ -77,6 +77,29 @@ object IcebergReflection extends Logging {
     val UNKNOWN = "unknown"
   }
 
+  /**
+   * Loads a class using the thread context classloader first, then falls back 
to the system
+   * classloader.
+   *
+   * @param className
+   *   Fully qualified class name to load
+   * @return
+   *   The loaded Class object
+   */
+  def loadClass(className: String): Class[_] = {
+    val classLoader = Thread.currentThread().getContextClassLoader
+    if (classLoader != null) {
+      // scalastyle:off classforname
+      Class.forName(className, true, classLoader)
+      // scalastyle:on classforname
+    } else {
+      // Fallback to default classloader if context classloader is null
+      // scalastyle:off classforname
+      Class.forName(className)
+      // scalastyle:on classforname
+    }
+  }
+
   /**
    * Searches through class hierarchy to find a method (including protected 
methods).
    */
@@ -124,9 +147,7 @@ object IcebergReflection extends Logging {
    */
   def extractFileLocation(file: Any): Option[String] = {
     try {
-      // scalastyle:off classforname
-      val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
-      // scalastyle:on classforname
+      val contentFileClass = loadClass(ClassNames.CONTENT_FILE)
       extractFileLocation(contentFileClass, file)
     } catch {
       case _: Exception => None
@@ -387,9 +408,7 @@ object IcebergReflection extends Logging {
    */
   def getEqualityFieldIds(deleteFile: Any): java.util.List[_] = {
     try {
-      // scalastyle:off classforname
-      val deleteFileClass = Class.forName(ClassNames.DELETE_FILE)
-      // scalastyle:on classforname
+      val deleteFileClass = loadClass(ClassNames.DELETE_FILE)
       val equalityFieldIdsMethod = 
deleteFileClass.getMethod("equalityFieldIds")
       val ids = 
equalityFieldIdsMethod.invoke(deleteFile).asInstanceOf[java.util.List[_]]
       if (ids == null) new java.util.ArrayList[Any]() else ids
@@ -515,9 +534,7 @@ object IcebergReflection extends Logging {
     val fieldsMethod = partitionSpec.getClass.getMethod("fields")
     val fields = 
fieldsMethod.invoke(partitionSpec).asInstanceOf[java.util.List[_]]
 
-    // scalastyle:off classforname
-    val partitionFieldClass = Class.forName(ClassNames.PARTITION_FIELD)
-    // scalastyle:on classforname
+    val partitionFieldClass = loadClass(ClassNames.PARTITION_FIELD)
     val sourceIdMethod = partitionFieldClass.getMethod("sourceId")
     val findFieldMethod = schema.getClass.getMethod("findField", classOf[Int])
 
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index 458bc52fb..3f240b11f 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -227,9 +227,7 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
       fileScanTaskClass: Class[_],
       fileIO: Option[Any]): Seq[OperatorOuterClass.IcebergDeleteFile] = {
     try {
-      // scalastyle:off classforname
-      val deleteFileClass = 
Class.forName(IcebergReflection.ClassNames.DELETE_FILE)
-      // scalastyle:on classforname
+      val deleteFileClass = 
IcebergReflection.loadClass(IcebergReflection.ClassNames.DELETE_FILE)
 
       val deletes = IcebergReflection.getDeleteFilesFromTask(task, 
fileScanTaskClass)
 
@@ -336,13 +334,11 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
       if (spec != null) {
         // Deduplicate partition spec
         try {
-          // scalastyle:off classforname
           val partitionSpecParserClass =
-            Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER)
+            
IcebergReflection.loadClass(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER)
           val toJsonMethod = partitionSpecParserClass.getMethod(
             "toJson",
-            Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC))
-          // scalastyle:on classforname
+            
IcebergReflection.loadClass(IcebergReflection.ClassNames.PARTITION_SPEC))
           val partitionSpecJson = toJsonMethod
             .invoke(null, spec)
             .asInstanceOf[String]
@@ -685,9 +681,7 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
    */
   private def convertIcebergLiteral(icebergLiteral: Any, sparkType: DataType): 
Literal = {
     // Load Literal interface to get value() method (use interface to avoid 
package-private issues)
-    // scalastyle:off classforname
-    val literalClass = Class.forName(IcebergReflection.ClassNames.LITERAL)
-    // scalastyle:on classforname
+    val literalClass = 
IcebergReflection.loadClass(IcebergReflection.ClassNames.LITERAL)
     val valueMethod = literalClass.getMethod("value")
     val value = valueMethod.invoke(icebergLiteral)
 
@@ -790,13 +784,16 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
     }
 
     // Load Iceberg classes once (avoid repeated class loading in loop)
-    // scalastyle:off classforname
-    val contentScanTaskClass = 
Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK)
-    val fileScanTaskClass = 
Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK)
-    val contentFileClass = 
Class.forName(IcebergReflection.ClassNames.CONTENT_FILE)
-    val schemaParserClass = 
Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER)
-    val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA)
-    // scalastyle:on classforname
+    val contentScanTaskClass =
+      
IcebergReflection.loadClass(IcebergReflection.ClassNames.CONTENT_SCAN_TASK)
+    val fileScanTaskClass =
+      IcebergReflection.loadClass(IcebergReflection.ClassNames.FILE_SCAN_TASK)
+    val contentFileClass =
+      IcebergReflection.loadClass(IcebergReflection.ClassNames.CONTENT_FILE)
+    val schemaParserClass =
+      IcebergReflection.loadClass(IcebergReflection.ClassNames.SCHEMA_PARSER)
+    val schemaClass =
+      IcebergReflection.loadClass(IcebergReflection.ClassNames.SCHEMA)
 
     // Cache method lookups (avoid repeated getMethod in loop)
     val fileMethod = contentScanTaskClass.getMethod("file")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to