Szehon Ho created SPARK-51119:
---------------------------------
Summary: Readers on executors should not read catalogs resolving
EXISTS_DEFAULT
Key: SPARK-51119
URL: https://issues.apache.org/jira/browse/SPARK-51119
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.0.0
Reporter: Szehon Ho
Summary: Spark executors unnecessary contacts catalogs when resolving
EXISTS_DEFAULTS (used for default values for existing data)
Detailed explanation: The code path for default values first runs an analysis
of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , and uses the
result sql for EXISTS_DEFAULT. EXISTS_DEFAULT is saved in order to avoid
having to rewrite existing data using backfill to fill this value in the files.
When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT
metadata and use the value for null values it finds in that column. But this
step redundantly runs all the analyzer rules again and finish analysis rules,
some of which contact the catalog unnecessarily.
This may cause exceptions if the executors are not configured properly to reach
the catalog, such as:
```
Caused by: org.apache.spark.SparkException: Failed during instantiating
constructor for catalog 'spark_catalog':
org.apache.spark.sql.delta.catalog.DeltaCatalog. at
org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at
org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
at
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
at scala.Option.map(Option.scala:230) at
org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
at
org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
at
org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
at
org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
at
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
at
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
at
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91) at
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266)
at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
at scala.Option.map(Option.scala:230) at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
scala.collection.TraversableLike.map(TraversableLike.scala:286) at
scala.collection.TraversableLike.map$(TraversableLike.scala:279) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
at scala.Option.getOrElse(Option.scala:189) at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
at
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
... 21 more Caused by: java.lang.IllegalStateException: No active or default
Spark session found
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]