Andrea Picasso Ratto created SPARK-38328:
--------------------------------------------
Summary: SQLConf.get flaky causes NON-default spark session
settings being lost
Key: SPARK-38328
URL: https://issues.apache.org/jira/browse/SPARK-38328
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.2.1, 3.1.2
Environment: We have used two configurations:
1 Spark 3.1.2
* spark on k8s mode
* delta 1.0.0 (for spark 3.1.2)
* spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
2 Spark 3.2.1
* spark on k8s mode
* delta 1.1.0 (for spark 3.2.1)
* spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
Reporter: Andrea Picasso Ratto
We have been using two different setups and the bug is present in both:1 Spark
3.1.2
* spark on k8s mode
* delta 1.0.0 (for spark 3.1.2)
* spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
2 Spark 3.2.1
* spark on k8s mode
* delta 1.1.0 (for spark 3.2.1)
* spark.sql.datetime.java8API.enabled=true (NON-default spark session setting)
It happens that our delta merge calls randomly fail with the error
"java.lang.RuntimeException: java.time.Instant is not a valid external type for
schema of timestamp",
as if the support for java.time.Instant were not enabled despite the specific
spark.sql.datetime.java8API.enabled=true spark session setting.
Sometimes, simply retrying the failed calls (with no change) solves the
failures, but the frequency of the error is variable and so is the number of
required retries.
The error stack points to spark libraries (not delta library, not our code),
namely catalyst:
```
2022-02-03 11:22:53,695 WARN scheduler.TaskSetManager: Lost task 11.0 in stage
23.0 (TID 1542) (10.251.129.64 executor 1): java.lang.RuntimeException: Error
while encoding: java.lang.RuntimeException: java.time.Instant is not a valid
external type for schema of timestamp
[...similar rows removed...]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$,
TimestampType, fromJavaTimestamp,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true]), 1, SRC_VALID_FROM_DTTM), TimestampType),
true, false) AS SRC_VALID_FROM_DTTM#1749
[...similar rows removed...]
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.time.Instant is not a valid
external type for schema of timestamp
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)
... 19 more
```
Tracking the error in spark sources lead us to a piece of code which calls
SQLConf.get.datetimeJava8ApiEnabled to select the proper datatype.
[objects.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala)
```
private lazy val errMsg = s" is not a valid external type for schema of
${expected.simpleString}"
```
[RowEncoder.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala)
```
def externalDataTypeFor(dt: DataType): DataType = dt match {
case _ if ScalaReflection.isNativeType(dt) => dt
case TimestampType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
ObjectType(classOf[java.time.Instant])
} else {
ObjectType(classOf[java.sql.Timestamp])
}
case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
ObjectType(classOf[java.time.LocalDate])
} else {
ObjectType(classOf[java.sql.Date])
}
```
The result of SQLConf.get depends on the existence of an active SparkSession,
and this leads to non-default spark session settings sometimes being lost by
executors, depending on when they are started.
[SQLConf.get](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L149)
```
* Returns the active config object within the current scope. If there is an
active SparkSession,
* the proper SQLConf associated with the thread's active session is used. If
it's called from
* tasks in the executor side, a SQLConf will be created from job local
properties, which are set
* and propagated from the driver side, unless a `SQLConf` has been set in the
scope by
* `withExistingConf` as done for propagating SQLConf for operations performed
on RDDs created
* from DataFrames.
```
We inferred that `SQLConf.get` (getConf) could be flaky, and this would be the
reason for our random error above.
In fact, using a custom spark where the above `if` on
SQLConf.get.datetimeJava8ApiEnabled is short-circuited for Instant or, better,
changing the default in SQLConf.scala solves the issue:
[original
SQLConf.scala](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2613)
our change:
```
val DATETIME_JAVA8API_ENABLED = buildConf("spark.sql.datetime.java8API.enabled")
.doc("If the configuration property is set to true, java.time.Instant and " +
"java.time.LocalDate classes of Java 8 API are used as external types for " +
"Catalyst's TimestampType and DateType. If it is set to false,
java.sql.Timestamp " +
"and java.sql.Date are used for the same purpose.")
.version("3.0.0")
.booleanConf
.createWithDefault(true) // our test change
// .createWithDefault(false)
```
Browsing spark jira, we found that there are a number of issues (some open,
some closed) all pointing to similar problems in SQLConf.get.
* [SPARK-23894](https://issues.apache.org/jira/browse/SPARK-23894) Flaky Test:
BucketedWriteWithoutHiveSupportSuite
```
_So how come sometimes its defined? Note that activeSession is an Inheritable
thread local. Normally the executor threads are created before activeSession is
defined, so they don't inherit anything. But a threadpool is free to create
more threads at any time. And when they do, then suddenly the new executor
threads will inherit the active session from their parent, a thread in the
driver with the activeSession defined._
```
* [SPARK-22938](https://issues.apache.org/jira/browse/SPARK-22938) Assert that
SQLConf.get is accessed only on the driver.
```
_Assert if code tries to access SQLConf.get on executor._
_This can lead to hard to detect bugs, where the executor will read
fallbackConf, falling back to default config values, ignoring potentially
changed non-default configs._
_If a config is to be passed to executor code, it needs to be read on the
driver, and passed explicitly._
```
* [SPARK-30798](https://issues.apache.org/jira/browse/SPARK-30798) Scope
Session.active in QueryExecution
```
_SparkSession.active is a thread local variable that points to the current
thread's spark session. It is important to note that the SQLConf.get method
depends on SparkSession.active. In the current implementation it is possible
that SparkSession.active points to a different session which causes various
problems. Most of these problems arise because part of the query processing is
done using the configurations of a different session._
```
* [SPARK-30223](https://issues.apache.org/jira/browse/SPARK-30223) queries in
thrift server may read wrong SQL configs
```
_The Spark thrift server creates many SparkSessions to serve requests, and the
thrift server serves requests using a single thread. One thread can only have
one active SparkSession, so SQLCong.get can't get the proper conf from the
session that runs the query._
```
* [SPARK-35252](https://issues.apache.org/jira/browse/SPARK-35252)
PartitionReaderFactory's Implemention Class of DataSourceV2: sqlConf parameter
is null
```
_The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter pass
to the MyPartitionReaderFactory is not null.__{{But when the executor
deserialize the RDD, the sqlConf parameter is null.}}_
```
* [SPARK-35324](https://issues.apache.org/jira/browse/SPARK-35324) Spark SQL
configs not respected in RDDs
_I think the difference might have to do with the fact that in the RDD case the
config isn't in the local properties of the TaskContext._
_* Stepping through the debugger, I see that both RDD and Dataset decide on
using or not using the legacy date formatter in DateFormatter.getFormatter._
_* Then in SQLConf.get, both cases find a TaskContext and no existingConf. So
they create a new ReadOnlySQLConf from the TaskContext object._
_* RDD and Dataset code path differ in the local properties they find on the
TaskContext here. The Dataset code path has spark.sql.legacy.timeParserPolicy
in the local properties, but the RDD path doesn't. The ReadOnlySQLConf is
created from the local properties, so in the RDD path the resulting config
object doesn't have an override for spark.sql.legacy.timeParserPolicy._
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]