[ 
https://issues.apache.org/jira/browse/SPARK-38328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498025#comment-17498025
 ] 

Andrea Picasso Ratto commented on SPARK-38328:
----------------------------------------------

The problem was enlightened from the issues linked here but it has never been 
completely solved.

> SQLConf.get flaky causes NON-default spark session settings being lost
> ----------------------------------------------------------------------
>
>                 Key: SPARK-38328
>                 URL: https://issues.apache.org/jira/browse/SPARK-38328
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2, 3.2.1
>         Environment: We have used two configurations:
> 1 Spark 3.1.2
> * spark on k8s mode
> * delta 1.0.0 (for spark 3.1.2)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> 2 Spark 3.2.1
> * spark on k8s mode
> * delta 1.1.0 (for spark 3.2.1)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
>            Reporter: Andrea Picasso Ratto
>            Priority: Major
>              Labels: config, spark-sql
>
> We have been using two different setups and the bug is present in both:1 
> Spark 3.1.2
> * spark on k8s mode
> * delta 1.0.0 (for spark 3.1.2)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> 2 Spark 3.2.1
> * spark on k8s mode
> * delta 1.1.0 (for spark 3.2.1)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> It happens that our delta merge calls randomly fail with the error 
> "java.lang.RuntimeException: java.time.Instant is not a valid external type 
> for schema of timestamp",
> as if the support for java.time.Instant were not enabled despite the specific 
> spark.sql.datetime.java8API.enabled=true spark session setting.
> Sometimes, simply retrying the failed calls (with no change) solves the 
> failures, but the frequency of the error is variable and so is the number of 
> required retries.
> The error stack points to spark libraries (not delta library, not our code), 
> namely catalyst:
> ```
> 2022-02-03 11:22:53,695 WARN scheduler.TaskSetManager: Lost task 11.0 in 
> stage 23.0 (TID 1542) (10.251.129.64 executor 1): java.lang.RuntimeException: 
> Error while encoding: java.lang.RuntimeException: java.time.Instant is not a 
> valid external type for schema of timestamp
> [...similar rows removed...]
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
> TimestampType, fromJavaTimestamp, 
> validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 1, SRC_VALID_FROM_DTTM), TimestampType), 
> true, false) AS SRC_VALID_FROM_DTTM#1749
> [...similar rows removed...]
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213)
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
> at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
> at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.time.Instant is not a valid 
> external type for schema of timestamp
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)
> ... 19 more
> ```
> Tracking the error in spark sources lead us to a piece of code which calls 
> SQLConf.get.datetimeJava8ApiEnabled to select the proper datatype.
> [objects.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala)
> ```
> private lazy val errMsg = s" is not a valid external type for schema of 
> ${expected.simpleString}"
> ```
> [RowEncoder.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala)
> ```
> def externalDataTypeFor(dt: DataType): DataType = dt match {
> case _ if ScalaReflection.isNativeType(dt) => dt
> case TimestampType =>
> if (SQLConf.get.datetimeJava8ApiEnabled) {
> ObjectType(classOf[java.time.Instant])
> } else {
> ObjectType(classOf[java.sql.Timestamp])
> }
> case DateType =>
> if (SQLConf.get.datetimeJava8ApiEnabled) {
> ObjectType(classOf[java.time.LocalDate])
> } else {
> ObjectType(classOf[java.sql.Date])
> }
> ```
> The result of SQLConf.get depends on the existence of an active SparkSession, 
> and this leads to non-default spark session settings sometimes being lost by 
> executors, depending on when they are started.
> [SQLConf.get](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L149)
> ```
> * Returns the active config object within the current scope. If there is an 
> active SparkSession,
> * the proper SQLConf associated with the thread's active session is used. If 
> it's called from
> * tasks in the executor side, a SQLConf will be created from job local 
> properties, which are set
> * and propagated from the driver side, unless a `SQLConf` has been set in the 
> scope by
> * `withExistingConf` as done for propagating SQLConf for operations performed 
> on RDDs created
> * from DataFrames.
> ```
> We inferred that `SQLConf.get` (getConf) could be flaky, and this would be 
> the reason for our random error above.
> In fact, using a custom spark where the above `if` on 
> SQLConf.get.datetimeJava8ApiEnabled is short-circuited for Instant or, 
> better, changing the default in SQLConf.scala solves the issue:
> [original 
> SQLConf.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2613)
> our change:
> ```
> val DATETIME_JAVA8API_ENABLED = 
> buildConf("spark.sql.datetime.java8API.enabled")
> .doc("If the configuration property is set to true, java.time.Instant and " +
> "java.time.LocalDate classes of Java 8 API are used as external types for " +
> "Catalyst's TimestampType and DateType. If it is set to false, 
> java.sql.Timestamp " +
> "and java.sql.Date are used for the same purpose.")
> .version("3.0.0")
> .booleanConf
> .createWithDefault(true) // our test change
> // .createWithDefault(false)
> ```
> Browsing spark jira, we found that there are a number of issues (some open, 
> some closed) all pointing to similar problems in SQLConf.get.
> * [SPARK-23894](https://issues.apache.org/jira/browse/SPARK-23894) Flaky 
> Test: BucketedWriteWithoutHiveSupportSuite
> ```
> _So how come sometimes its defined? Note that activeSession is an Inheritable 
> thread local. Normally the executor threads are created before activeSession 
> is defined, so they don't inherit anything. But a threadpool is free to 
> create more threads at any time. And when they do, then suddenly the new 
> executor threads will inherit the active session from their parent, a thread 
> in the driver with the activeSession defined._
> ```
> * [SPARK-22938](https://issues.apache.org/jira/browse/SPARK-22938) Assert 
> that SQLConf.get is accessed only on the driver.
> ```
> _Assert if code tries to access SQLConf.get on executor._
> _This can lead to hard to detect bugs, where the executor will read 
> fallbackConf, falling back to default config values, ignoring potentially 
> changed non-default configs._
> _If a config is to be passed to executor code, it needs to be read on the 
> driver, and passed explicitly._
> ```
> * [SPARK-30798](https://issues.apache.org/jira/browse/SPARK-30798) Scope 
> Session.active in QueryExecution
> ```
> _SparkSession.active is a thread local variable that points to the current 
> thread's spark session. It is important to note that the SQLConf.get method 
> depends on SparkSession.active. In the current implementation it is possible 
> that SparkSession.active points to a different session which causes various 
> problems. Most of these problems arise because part of the query processing 
> is done using the configurations of a different session._
> ```
> * [SPARK-30223](https://issues.apache.org/jira/browse/SPARK-30223) queries in 
> thrift server may read wrong SQL configs
> ```
> _The Spark thrift server creates many SparkSessions to serve requests, and 
> the thrift server serves requests using a single thread. One thread can only 
> have one active SparkSession, so SQLCong.get can't get the proper conf from 
> the session that runs the query._
> ```
> * [SPARK-35252](https://issues.apache.org/jira/browse/SPARK-35252) 
> PartitionReaderFactory's Implemention Class of DataSourceV2: sqlConf 
> parameter is null
> ```
> _The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter 
> pass to the MyPartitionReaderFactory is not null.__{{But when the executor 
> deserialize the RDD, the sqlConf parameter is null.}}_
> ```
> * [SPARK-35324](https://issues.apache.org/jira/browse/SPARK-35324) Spark SQL 
> configs not respected in RDDs
> _I think the difference might have to do with the fact that in the RDD case 
> the config isn't in the local properties of the TaskContext._
> _* Stepping through the debugger, I see that both RDD and Dataset decide on 
> using or not using the legacy date formatter in DateFormatter.getFormatter._
> _* Then in SQLConf.get, both cases find a TaskContext and no existingConf. So 
> they create a new ReadOnlySQLConf from the TaskContext object._
> _* RDD and Dataset code path differ in the local properties they find on the 
> TaskContext here. The Dataset code path has spark.sql.legacy.timeParserPolicy 
> in the local properties, but the RDD path doesn't. The ReadOnlySQLConf is 
> created from the local properties, so in the RDD path the resulting config 
> object doesn't have an override for spark.sql.legacy.timeParserPolicy._
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to