yihua opened a new pull request, #18291: URL: https://github.com/apache/hudi/pull/18291
### Describe the issue this Pull Request addresses When querying a Hudi table on Databricks Spark 3.4 Runtime (13.3 LTS, Scala 2.12, Spark 3.4.1), the following exception is thrown. <img width="645" height="433" alt="Screenshot 2026-03-06 at 22 00 58" src="https://github.com/user-attachments/assets/cd36ccf2-32cf-4052-a125-072a481dc56d" /> ``` spark.read.format("org.apache.hudi").load("s3://<table_path>").show() File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs) 46 start = time.perf_counter() 47 try: ---> 48 res = func(*args, **kwargs) 49 logger.log_success( 50 module_name, class_name, function_name, time.perf_counter() - start, signature 51 ) 52 return res File /databricks/spark/python/pyspark/sql/dataframe.py:934, in DataFrame.show(self, n, truncate, vertical) 928 raise PySparkTypeError( 929 error_class="NOT_BOOL", 930 message_parameters={"arg_name": "vertical", "arg_type": type(vertical).__name__}, 931 ) 933 if isinstance(truncate, bool) and truncate: --> 934 print(self._jdf.showString(n, 20, vertical)) 935 else: 936 try: File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o456.showString. : java.lang.ClassCastException: org.apache.hadoop.fs.FileStatus cannot be cast to org.apache.spark.sql.execution.datasources.FileStatusWithMetadata at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$setDriverMetrics$5(DataSourceScanExec.scala:1061) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$setDriverMetrics$5$adapted(DataSourceScanExec.scala:1060) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.setDriverMetrics(DataSourceScanExec.scala:1060) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$selectedPartitions$2(DataSourceScanExec.scala:691) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:696) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:684) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:2146) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:2146) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$dynamicallySelectedPartitions$3(DataSourceScanExec.scala:746) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.$anonfun$dynamicallySelectedPartitions$1(DataSourceScanExec.scala:706) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:706) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:701) at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:2146) at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:2146) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.createPartitionsForNonBucketedRead(DataSourceScanExec.scala:1494) at org.apache.spark.sql.execution.SparkOrAetherFileSourceScanLike.createPartitionsForNonBucketedRead$(DataSourceScanExec.scala:1491) at org.apache.spark.sql.execution.FileSourceScanExec.createPartitionsForNonBucketedRead(DataSourceScanExec.scala:2146) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:2182) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:2163) at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:2259) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:317) at org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:347) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:343) at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:313) at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:526) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:317) at org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:347) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:343) at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:313) at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:229) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:55) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:754) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$2(SparkPlan.scala:289) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:289) at org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:347) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:343) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:284) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:553) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545) at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3675) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4600) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3383) at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4591) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1021) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4589) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:312) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:576) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:234) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:161) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:525) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4589) at org.apache.spark.sql.Dataset.head(Dataset.scala:3383) at org.apache.spark.sql.Dataset.take(Dataset.scala:3606) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:321) at org.apache.spark.sql.Dataset.showString(Dataset.scala:356) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750) ``` ### Summary and Changelog This PR fixes compatibility with the Databricks Spark 3.4 Runtime (13.3 LTS) for reading Hudi tables. The main difference between Databricks Spark 3.4 Runtime and OSS Spark 3.4 is that Databricks Spark 3.4 Runtime backports `FileStatusWithMetadata` from Spark 3.5 so that `PartitionDirectory` object stores `FileStatusWithMetadata`, while OSS Spark 3.4 uses `FileStatus`. In the source read, Databricks Spark 3.4 Runtime expects `FileStatusWithMetadata` and fails due to `ClassCastException` if `FileStatus` objects are provided. To fix the compatibility, the following changes are made: - Introduces `DatabricksRuntimeHelper` to handle Databricks Spark runtime compatibility via reflection to handle `PartitionDirectory` expecting `Seq[FileStatusWithMetadata]` instead of `Seq[FileStatus]` - Construct `FileStatusWithMetadata` instances via the companion object's `apply(FileStatus)` method - Construct `PartitionDirectory(InternalRow, Seq[FileStatusWithMetadata])` via its reflected constructor - Updates `HoodieSpark34PartitionedFileUtils.newPartitionDirectory` to delegate to `DatabricksRuntimeHelper` - On Databricks Spark 3.4 Runtime, the `FileStatusWithMetadata` is available and the logic through reflection is used; - On standard Spark (where `FileStatusWithMetadata` doesn't exist), falls back to normal `PartitionDirectory` construction via a caller-provided fallback function ### Impact Hudi tables are now readable on Databricks Spark 3.4 Runtime (13.3 LTS). ### Risk Level none ### Documentation Update release doc update ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Enough context is provided in the sections above - [ ] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
