[
https://issues.apache.org/jira/browse/SPARK-51119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun reassigned SPARK-51119:
-------------------------------------
Assignee: Szehon Ho
> Readers on executors resolving EXISTS_DEFAULT should not call catalogs
> ----------------------------------------------------------------------
>
> Key: SPARK-51119
> URL: https://issues.apache.org/jira/browse/SPARK-51119
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.0.0
> Reporter: Szehon Ho
> Assignee: Szehon Ho
> Priority: Major
> Labels: pull-request-available
>
> Summary: Spark executors unnecessary contacts catalogs when resolving
> EXISTS_DEFAULTS (used for default values for existing data)
> Detailed explanation: The code path for default values first runs an
> analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) ,
> and uses the result sql for EXISTS_DEFAULT. EXISTS_DEFAULT is saved in order
> to avoid having to rewrite existing data using backfill to fill this value in
> the files.
> When reading existing files, Spark then attempts to resolve the
> EXISTS_DEFAULT metadata and use the value for null values it finds in that
> column. But this step redundantly runs all the analyzer rules again and
> finish analysis rules, some of which contact the catalog unnecessarily.
> This may cause exceptions if the executors are not configured properly to
> reach the catalog, such as:
> {code}
> Caused by: org.apache.spark.SparkException: Failed during instantiating
> constructor for catalog 'spark_catalog':
> org.apache.spark.sql.delta.catalog.DeltaCatalog. at
> org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
> at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84)
> at
> org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
> at
> org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
> at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at
> org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
> at scala.Option.map(Option.scala:230) at
> org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
> at
> org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
> at
> org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
> at
> org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
> at
> org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
> at
> org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
> at
> org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
> at
> scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
> at scala.collection.immutable.List.foldLeft(List.scala:91) at
> org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
> at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
> at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266)
> at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
> at scala.Option.map(Option.scala:230) at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
> scala.collection.TraversableLike.map(TraversableLike.scala:286) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:279) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
> at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
> at scala.Option.getOrElse(Option.scala:189) at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
> at
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
> ... 21 more Caused by: java.lang.IllegalStateException: No active or default
> Spark session found
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]