[
https://issues.apache.org/jira/browse/SPARK-38328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498025#comment-17498025
]
Andrea Picasso Ratto commented on SPARK-38328:
----------------------------------------------
The problem was enlightened from the issues linked here but it has never been
completely solved.
> SQLConf.get flaky causes NON-default spark session settings being lost
> ----------------------------------------------------------------------
>
> Key: SPARK-38328
> URL: https://issues.apache.org/jira/browse/SPARK-38328
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.2, 3.2.1
> Environment: We have used two configurations:
> 1 Spark 3.1.2
> * spark on k8s mode
> * delta 1.0.0 (for spark 3.1.2)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> 2 Spark 3.2.1
> * spark on k8s mode
> * delta 1.1.0 (for spark 3.2.1)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> Reporter: Andrea Picasso Ratto
> Priority: Major
> Labels: config, spark-sql
>
> We have been using two different setups and the bug is present in both:1
> Spark 3.1.2
> * spark on k8s mode
> * delta 1.0.0 (for spark 3.1.2)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> 2 Spark 3.2.1
> * spark on k8s mode
> * delta 1.1.0 (for spark 3.2.1)
> * spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
> It happens that our delta merge calls randomly fail with the error
> "java.lang.RuntimeException: java.time.Instant is not a valid external type
> for schema of timestamp",
> as if the support for java.time.Instant were not enabled despite the specific
> spark.sql.datetime.java8API.enabled=true spark session setting.
> Sometimes, simply retrying the failed calls (with no change) solves the
> failures, but the frequency of the error is variable and so is the number of
> required retries.
> The error stack points to spark libraries (not delta library, not our code),
> namely catalyst:
> ```
> 2022-02-03 11:22:53,695 WARN scheduler.TaskSetManager: Lost task 11.0 in
> stage 23.0 (TID 1542) (10.251.129.64 executor 1): java.lang.RuntimeException:
> Error while encoding: java.lang.RuntimeException: java.time.Instant is not a
> valid external type for schema of timestamp
> [...similar rows removed...]
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$,
> TimestampType, fromJavaTimestamp,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 1, SRC_VALID_FROM_DTTM), TimestampType),
> true, false) AS SRC_VALID_FROM_DTTM#1749
> [...similar rows removed...]
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.time.Instant is not a valid
> external type for schema of timestamp
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)
> ... 19 more
> ```
> Tracking the error in spark sources lead us to a piece of code which calls
> SQLConf.get.datetimeJava8ApiEnabled to select the proper datatype.
> [objects.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala)
> ```
> private lazy val errMsg = s" is not a valid external type for schema of
> ${expected.simpleString}"
> ```
> [RowEncoder.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala)
> ```
> def externalDataTypeFor(dt: DataType): DataType = dt match {
> case _ if ScalaReflection.isNativeType(dt) => dt
> case TimestampType =>
> if (SQLConf.get.datetimeJava8ApiEnabled) {
> ObjectType(classOf[java.time.Instant])
> } else {
> ObjectType(classOf[java.sql.Timestamp])
> }
> case DateType =>
> if (SQLConf.get.datetimeJava8ApiEnabled) {
> ObjectType(classOf[java.time.LocalDate])
> } else {
> ObjectType(classOf[java.sql.Date])
> }
> ```
> The result of SQLConf.get depends on the existence of an active SparkSession,
> and this leads to non-default spark session settings sometimes being lost by
> executors, depending on when they are started.
> [SQLConf.get](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L149)
> ```
> * Returns the active config object within the current scope. If there is an
> active SparkSession,
> * the proper SQLConf associated with the thread's active session is used. If
> it's called from
> * tasks in the executor side, a SQLConf will be created from job local
> properties, which are set
> * and propagated from the driver side, unless a `SQLConf` has been set in the
> scope by
> * `withExistingConf` as done for propagating SQLConf for operations performed
> on RDDs created
> * from DataFrames.
> ```
> We inferred that `SQLConf.get` (getConf) could be flaky, and this would be
> the reason for our random error above.
> In fact, using a custom spark where the above `if` on
> SQLConf.get.datetimeJava8ApiEnabled is short-circuited for Instant or,
> better, changing the default in SQLConf.scala solves the issue:
> [original
> SQLConf.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2613)
> our change:
> ```
> val DATETIME_JAVA8API_ENABLED =
> buildConf("spark.sql.datetime.java8API.enabled")
> .doc("If the configuration property is set to true, java.time.Instant and " +
> "java.time.LocalDate classes of Java 8 API are used as external types for " +
> "Catalyst's TimestampType and DateType. If it is set to false,
> java.sql.Timestamp " +
> "and java.sql.Date are used for the same purpose.")
> .version("3.0.0")
> .booleanConf
> .createWithDefault(true) // our test change
> // .createWithDefault(false)
> ```
> Browsing spark jira, we found that there are a number of issues (some open,
> some closed) all pointing to similar problems in SQLConf.get.
> * [SPARK-23894](https://issues.apache.org/jira/browse/SPARK-23894) Flaky
> Test: BucketedWriteWithoutHiveSupportSuite
> ```
> _So how come sometimes its defined? Note that activeSession is an Inheritable
> thread local. Normally the executor threads are created before activeSession
> is defined, so they don't inherit anything. But a threadpool is free to
> create more threads at any time. And when they do, then suddenly the new
> executor threads will inherit the active session from their parent, a thread
> in the driver with the activeSession defined._
> ```
> * [SPARK-22938](https://issues.apache.org/jira/browse/SPARK-22938) Assert
> that SQLConf.get is accessed only on the driver.
> ```
> _Assert if code tries to access SQLConf.get on executor._
> _This can lead to hard to detect bugs, where the executor will read
> fallbackConf, falling back to default config values, ignoring potentially
> changed non-default configs._
> _If a config is to be passed to executor code, it needs to be read on the
> driver, and passed explicitly._
> ```
> * [SPARK-30798](https://issues.apache.org/jira/browse/SPARK-30798) Scope
> Session.active in QueryExecution
> ```
> _SparkSession.active is a thread local variable that points to the current
> thread's spark session. It is important to note that the SQLConf.get method
> depends on SparkSession.active. In the current implementation it is possible
> that SparkSession.active points to a different session which causes various
> problems. Most of these problems arise because part of the query processing
> is done using the configurations of a different session._
> ```
> * [SPARK-30223](https://issues.apache.org/jira/browse/SPARK-30223) queries in
> thrift server may read wrong SQL configs
> ```
> _The Spark thrift server creates many SparkSessions to serve requests, and
> the thrift server serves requests using a single thread. One thread can only
> have one active SparkSession, so SQLCong.get can't get the proper conf from
> the session that runs the query._
> ```
> * [SPARK-35252](https://issues.apache.org/jira/browse/SPARK-35252)
> PartitionReaderFactory's Implemention Class of DataSourceV2: sqlConf
> parameter is null
> ```
> _The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter
> pass to the MyPartitionReaderFactory is not null.__{{But when the executor
> deserialize the RDD, the sqlConf parameter is null.}}_
> ```
> * [SPARK-35324](https://issues.apache.org/jira/browse/SPARK-35324) Spark SQL
> configs not respected in RDDs
> _I think the difference might have to do with the fact that in the RDD case
> the config isn't in the local properties of the TaskContext._
> _* Stepping through the debugger, I see that both RDD and Dataset decide on
> using or not using the legacy date formatter in DateFormatter.getFormatter._
> _* Then in SQLConf.get, both cases find a TaskContext and no existingConf. So
> they create a new ReadOnlySQLConf from the TaskContext object._
> _* RDD and Dataset code path differ in the local properties they find on the
> TaskContext here. The Dataset code path has spark.sql.legacy.timeParserPolicy
> in the local properties, but the RDD path doesn't. The ReadOnlySQLConf is
> created from the local properties, so in the RDD path the resulting config
> object doesn't have an override for spark.sql.legacy.timeParserPolicy._
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]