yihua opened a new pull request, #18291:
URL: https://github.com/apache/hudi/pull/18291

   ### Describe the issue this Pull Request addresses
   
   When querying a Hudi table on Databricks Spark 3.4 Runtime (13.3 LTS, Scala 
2.12, Spark 3.4.1), the following exception is thrown.
   <img width="645" height="433" alt="Screenshot 2026-03-06 at 22 00 58" 
src="https://github.com/user-attachments/assets/cd36ccf2-32cf-4052-a125-072a481dc56d";
 />
   ```
   spark.read.format("org.apache.hudi").load("s3://<table_path>").show()
   
   File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in 
_wrap_function.<locals>.wrapper(*args, **kwargs)
        46 start = time.perf_counter()
        47 try:
   ---> 48     res = func(*args, **kwargs)
        49     logger.log_success(
        50         module_name, class_name, function_name, time.perf_counter() 
- start, signature
        51     )
        52     return res
   
   File /databricks/spark/python/pyspark/sql/dataframe.py:934, in 
DataFrame.show(self, n, truncate, vertical)
       928     raise PySparkTypeError(
       929         error_class="NOT_BOOL",
       930         message_parameters={"arg_name": "vertical", "arg_type": 
type(vertical).__name__},
       931     )
       933 if isinstance(truncate, bool) and truncate:
   --> 934     print(self._jdf.showString(n, 20, vertical))
       935 else:
       936     try:
   
   File 
/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, 
in JavaMember.__call__(self, *args)
      1349 command = proto.CALL_COMMAND_NAME +\
      1350     self.command_header +\
      1351     args_command +\
      1352     proto.END_COMMAND_PART
      1354 answer = self.gateway_client.send_command(command)
   -> 1355 return_value = get_return_value(
      1356     answer, self.gateway_client, self.target_id, self.name)
      1358 for temp_arg in temp_args:
      1359     if hasattr(temp_arg, "_detach"):
   
   File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in 
capture_sql_exception.<locals>.deco(*a, **kw)
       186 def deco(*a: Any, **kw: Any) -> Any:
       187     try:
   --> 188         return f(*a, **kw)
       189     except Py4JJavaError as e:
       190         converted = convert_exception(e.java_exception)
   
   File 
/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in 
get_return_value(answer, gateway_client, target_id, name)
       324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325 if answer[1] == REFERENCE_TYPE:
   --> 326     raise Py4JJavaError(
       327         "An error occurred while calling {0}{1}{2}.\n".
       328         format(target_id, ".", name), value)
       329 else:
       330     raise Py4JError(
       331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
       332         format(target_id, ".", name, value))
   
   Py4JJavaError: An error occurred while calling o456.showString.
   : java.lang.ClassCastException: org.apache.hadoop.fs.FileStatus cannot be 
cast to org.apache.spark.sql.execution.datasources.FileStatusWithMetadata
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$setDriverMetrics$5(DataSourceScanExec.scala:1061)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$setDriverMetrics$5$adapted(DataSourceScanExec.scala:1060)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.setDriverMetrics(DataSourceScanExec.scala:1060)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$selectedPartitions$2(DataSourceScanExec.scala:691)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:696)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:684)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:2146)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:2146)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$dynamicallySelectedPartitions$3(DataSourceScanExec.scala:746)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$dynamicallySelectedPartitions$1(DataSourceScanExec.scala:706)
        at 
com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:706)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:701)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:2146)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:2146)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.createPartitionsForNonBucketedRead(DataSourceScanExec.scala:1494)
        at 
org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.createPartitionsForNonBucketedRead$(DataSourceScanExec.scala:1491)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.createPartitionsForNonBucketedRead(DataSourceScanExec.scala:2146)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:2182)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:2163)
        at 
org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:2259)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:317)
        at 
org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:122)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:347)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:343)
        at 
org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:313)
        at 
org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:526)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:317)
        at 
org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:122)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:347)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:343)
        at 
org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:313)
        at 
org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:229)
        at 
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:55)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:754)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$2(SparkPlan.scala:289)
        at 
com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:289)
        at 
org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:122)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:347)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:343)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:284)
        at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112)
        at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124)
        at 
org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126)
        at 
org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114)
        at 
org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
        at 
org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:553)
        at 
com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at 
org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545)
        at 
org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565)
        at 
org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419)
        at 
org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519)
        at 
com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516)
        at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3675)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4600)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3383)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4591)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1021)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4589)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:312)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:576)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:234)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
        at 
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:161)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:525)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4589)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3383)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3606)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:321)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:356)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
        at py4j.Gateway.invoke(Gateway.java:306)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   ### Summary and Changelog
   
   This PR fixes compatibility with the Databricks Spark 3.4 Runtime (13.3 LTS) 
for reading Hudi tables.
   
   The main difference between Databricks Spark 3.4 Runtime and OSS Spark 3.4 
is that Databricks Spark 3.4 Runtime backports `FileStatusWithMetadata` from 
Spark 3.5 so that `PartitionDirectory` object stores `FileStatusWithMetadata`, 
while OSS Spark 3.4 uses `FileStatus`.  In the source read, Databricks Spark 
3.4 Runtime expects `FileStatusWithMetadata` and fails due to 
`ClassCastException` if `FileStatus` objects are provided.
   
   To fix the compatibility, the following changes are made:
   
   - Introduces `DatabricksRuntimeHelper` to handle Databricks Spark runtime 
compatibility via reflection to handle `PartitionDirectory` expecting 
`Seq[FileStatusWithMetadata]` instead of `Seq[FileStatus]`
     - Construct `FileStatusWithMetadata` instances via the companion object's 
`apply(FileStatus)` method
     - Construct `PartitionDirectory(InternalRow, Seq[FileStatusWithMetadata])` 
via its reflected constructor
   - Updates `HoodieSpark34PartitionedFileUtils.newPartitionDirectory` to 
delegate to `DatabricksRuntimeHelper`
     - On Databricks Spark 3.4 Runtime, the `FileStatusWithMetadata` is 
available and the logic through reflection is used;
     - On standard Spark (where `FileStatusWithMetadata` doesn't exist), falls 
back to normal `PartitionDirectory` construction via a caller-provided fallback 
function
   
   ### Impact
   
   Hudi tables are now readable on Databricks Spark 3.4 Runtime (13.3 LTS).
   
   ### Risk Level
   
   none
   
   ### Documentation Update
   
   release doc update
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Enough context is provided in the sections above
   - [ ] Adequate tests were added if applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to