This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 051ea2b17989 [SPARK-56756][SQL] Add error class for
recursiveFileLookup conflict with partitioned data source
051ea2b17989 is described below
commit 051ea2b17989a2f22c287d4de22af0537e033cff
Author: Mark Jarvin <[email protected]>
AuthorDate: Fri May 15 01:35:59 2026 +0800
[SPARK-56756][SQL] Add error class for recursiveFileLookup conflict with
partitioned data source
### What changes were proposed in this pull request?
`PartitioningAwareFileIndex.listFiles` rejects the combination of
`recursiveFileLookup=true` and a non-empty `partitionSpec().partitionColumns`
by throwing a raw `java.lang.IllegalArgumentException` with the message
"Datasource with partition do not allow recursive file loading."
This PR replaces that with a tagged `AnalysisException` using a new error
class:
- New error class
`RECURSIVE_FILE_LOOKUP_NOT_SUPPORTED_FOR_PARTITIONED_DATA_SOURCE` (`sqlState
0A000`) in `error-conditions.json`.
- New helper
`QueryCompilationErrors.recursiveFileLookupNotSupportedForPartitionedDataSourceError()`.
- Throw site in `PartitioningAwareFileIndex.scala` updated to use the
helper.
### Why are the changes needed?
The raw `IllegalArgumentException` is unclassified and does not surface as
a user-facing error with a clear message. Replacing it with an
`AnalysisException` using a proper error class ensures it is correctly
classified as a user error with an actionable message.
### Does this PR introduce _any_ user-facing change?
Yes. Users who hit this error will now see a clearer message:
> Recursive file loading is not supported when the data source has explicit
partition columns. Either remove the option "recursiveFileLookup", or read the
data without supplying partition columns (for example, do not read a
partitioned table or set partition-column options such as
"cloudFiles.partitionColumns").
Previously the error was a raw `IllegalArgumentException` with the message
"Datasource with partition do not allow recursive file loading."
### How was this patch tested?
Added `"recursiveFileLookup with a partitioned catalog table is rejected"`
in `FileBasedDataSourceSuite`, which creates a partitioned Parquet catalog
table, then asserts that reading it with `recursiveFileLookup=true` throws an
`AnalysisException` with condition
`RECURSIVE_FILE_LOOKUP_NOT_SUPPORTED_FOR_PARTITIONED_DATA_SOURCE`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (claude-sonnet-4-6)
Closes #55721 from markj-db/recursive-file-lookup-error-class.
Authored-by: Mark Jarvin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 ++++++
.../spark/sql/errors/QueryCompilationErrors.scala | 6 ++++++
.../sql/execution/datasources/DataSource.scala | 5 +++++
.../datasources/PartitioningAwareFileIndex.scala | 4 ++--
.../spark/sql/FileBasedDataSourceSuite.scala | 23 ++++++++++++++++++++
.../sql/execution/datasources/FileIndexSuite.scala | 25 +++++++++++++++++++++-
6 files changed, 66 insertions(+), 3 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index bbe5c3ced6f5..1d4f4317461f 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6039,6 +6039,12 @@
],
"sqlState" : "42836"
},
+ "RECURSIVE_FILE_LOOKUP_NOT_SUPPORTED_FOR_PARTITIONED_DATA_SOURCE" : {
+ "message" : [
+ "Recursive file loading is not supported when the data source has
explicit partition columns. Either remove the option \"recursiveFileLookup\",
or read the data without supplying partition columns (for example, do not read
a partitioned table)."
+ ],
+ "sqlState" : "0A000"
+ },
"RECURSIVE_PROTOBUF_SCHEMA" : {
"message" : [
"Found recursive reference in Protobuf schema, which can not be
processed by Spark by default: <fieldDescriptor>. try setting the option
`recursive.fields.max.depth` 1 to 10. Going beyond 10 levels of recursion is
not allowed."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9cd94e984cae..9b899867a9e3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3432,6 +3432,12 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"newPath" -> newPath.map(toSQLId).mkString(" -> ")))
}
+ def recursiveFileLookupNotSupportedForPartitionedDataSourceError():
Throwable = {
+ new AnalysisException(
+ errorClass =
"RECURSIVE_FILE_LOOKUP_NOT_SUPPORTED_FOR_PARTITIONED_DATA_SOURCE",
+ messageParameters = Map.empty)
+ }
+
def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(
viewNameParts: Seq[String],
attr: Attribute): Throwable = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9b51d3763abb..4a95f681fb6e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -420,6 +420,11 @@ case class DataSource(
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog
&&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if
(useCatalogFileIndex) {
+ if (caseInsensitiveOptions.getOrElse(
+ FileIndexOptions.RECURSIVE_FILE_LOOKUP, "false").toBoolean) {
+ throw QueryCompilationErrors
+ .recursiveFileLookupNotSupportedForPartitionedDataSourceError()
+ }
val defaultTableSize = conf.defaultSizeInBytes
val index = new CatalogFileIndex(
sparkSession,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 1bf0d2f0301f..8cea2c95e694 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ArrayImplicits._
@@ -89,8 +90,7 @@ abstract class PartitioningAwareFileIndex(
PartitionDirectory(InternalRow.empty,
allFiles().toArray.filter(isNonEmptyFile))) :: Nil
} else {
if (recursiveFileLookup) {
- throw new IllegalArgumentException(
- "Datasource with partition do not allow recursive file loading.")
+ throw
QueryCompilationErrors.recursiveFileLookupNotSupportedForPartitionedDataSourceError()
}
prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 8aa6f5a5d0e6..1fc45e9703f9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -878,6 +878,29 @@ class FileBasedDataSourceSuite extends SharedSparkSession
assert(fileList.toSet === expectedFileList.toSet)
}
+ test("recursiveFileLookup with a partitioned catalog table is rejected") {
+ withTable("part_tbl") {
+ sql(
+ """
+ |CREATE TABLE part_tbl (id INT, value STRING)
+ |USING parquet
+ |PARTITIONED BY (year INT)
+ |""".stripMargin)
+ sql("INSERT INTO part_tbl PARTITION (year = 2024) VALUES (1, 'a')")
+ sql("INSERT INTO part_tbl PARTITION (year = 2025) VALUES (2, 'b')")
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.read
+ .option("recursiveFileLookup", "true")
+ .table("part_tbl")
+ .collect()
+ },
+ condition =
"RECURSIVE_FILE_LOOKUP_NOT_SUPPORTED_FOR_PARTITIONED_DATA_SOURCE",
+ parameters = Map.empty[String, String]
+ )
+ }
+ }
+
test("Return correct results when data columns overlap with partition
columns") {
Seq("parquet", "orc", "json").foreach { format =>
withTempPath { path =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 1150f6163b97..f4de8a52810e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -31,7 +31,8 @@ import org.mockito.Mockito.{mock, when}
import org.apache.spark.{SparkException, SparkRuntimeException}
import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -657,6 +658,28 @@ class FileIndexSuite extends SharedSparkSession {
assert(FileIndexOptions.isValidOption("pathglobfilter"))
}
+ test("recursiveFileLookup with a user-specified partition spec is rejected")
{
+ withTempDir { dir =>
+ val partitionSchema = StructType(Seq(StructField("year", IntegerType,
nullable = true)))
+ val partitionSpec = PartitionSpec(
+ partitionSchema,
+ Seq(PartitionPath(InternalRow(2024), new Path(dir.getCanonicalPath))))
+ val fileIndex = new InMemoryFileIndex(
+ spark,
+ rootPathsSpecified = Seq(new Path(dir.getCanonicalPath)),
+ parameters = Map("recursiveFileLookup" -> "true"),
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionSpec = Some(partitionSpec))
+ checkError(
+ exception = intercept[AnalysisException] {
+ fileIndex.listFiles(Nil, Nil)
+ },
+ condition =
"RECURSIVE_FILE_LOOKUP_NOT_SUPPORTED_FOR_PARTITIONED_DATA_SOURCE",
+ parameters = Map.empty[String, String]
+ )
+ }
+ }
+
test("SPARK-52339: Correctly compare root paths") {
withTempDir { dir =>
val file1 = new File(dir, "text1.txt")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]