Kontinuation commented on code in PR #2297:
URL: https://github.com/apache/sedona/pull/2297#discussion_r2285729418
##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala:
##########
@@ -179,29 +184,29 @@ class GeoParquetFileFormat(val spatialFilter:
Option[GeoParquetSpatialFilter])
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
- hadoopConf.set(
- SQLConf.SESSION_LOCAL_TIMEZONE.key,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
+ val conf = new PortableSQLConf(sparkSession.sessionState.conf)
+ hadoopConf.set(PortableSQLConf.SESSION_LOCAL_TIMEZONE.key,
conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
- SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
- sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
- hadoopConf.setBoolean(
- SQLConf.CASE_SENSITIVE.key,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ PortableSQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+ conf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(PortableSQLConf.CASE_SENSITIVE.key,
conf.caseSensitiveAnalysis)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.sessionState.conf.isParquetBinaryAsString)
+ PortableSQLConf.PARQUET_BINARY_AS_STRING.key,
+ conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ PortableSQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ conf.isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
- SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
- sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+ PortableSQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+ conf.parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
- SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
- sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+ PortableSQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ conf.legacyParquetNanosAsLong)
+
+ // Workaround "The file might have been updated during query execution" on
Databricks
+ hadoopConf.setBoolean("spark.databricks.scan.modTimeCheck.enabled", false)
Review Comment:
Databricks Runtime is more weird than I thought. Stripping off dependencies
on Spark's ParquetFileFormat result in another weird problem. When reading
geoparquet files, the following error occur
```
Job aborted due to stage failure: Task 0 in stage 58.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 58.0 (TID 204)
(ip-10-24-152-151.us-west-2.compute.internal executor driver):
org.apache.spark.SparkException: Exception thrown in awaitResult: The file
might have been updated during query execution. Ensure that no pipeline updates
existing files during query execution and try again. at
org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:51)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:519) at
org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:604) at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:434)
at
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(GeoParquetFileFormat.scala:385)
at
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormat$.$ano
nfun$mergeSchemasInParallel$1$adapted(GeoParquetFileFormat.scala:383) at
org.apache.spark.sql.execution.datasources.geoparquet.internal.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$4(SchemaMergeUtils.scala:117)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:910) at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:910) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:420) at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:417) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:384) at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$2(ResultTask.scala:76)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:76)
at com.databricks.
spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:227) at
org.apache.spark.scheduler.Task.doRunTask(Task.scala:204) at
org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:166) at
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at
com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269) at
com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108) at
org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:160) at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:105) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$11(Executor.scala:1224)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:112) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1228)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:1080) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840) Caused by:
com.databricks.common.filesystem.InconsistentReadException: The file might have
been updated during query execution. Ensure that no pipeline updates existing
files during query execut
ion and try again. at
com.databricks.common.filesystem.LokiS3AInputStream.withExceptionRewrites(LokiS3FS.scala:247)
at
com.databricks.common.filesystem.LokiS3AInputStream.read(LokiS3FS.scala:251) at
java.base/java.io.FilterInputStream.read(FilterInputStream.java:82) at
com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$2(FileSystemWithMetrics.scala:83)
at
com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
at
com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:82)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82) at
org.apache.parquet.io.DelegatingSeekableInputStream.read(DelegatingSeekableInputStream.java:61)
at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:570)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:828
) at
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:682) at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFooterReader.readFooter(ParquetFooterReader.java:89)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFooterReader.readFooter(ParquetFooterReader.java:78)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:442)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$3(ThreadUtils.scala:601)
at
com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$6(SparkThreadLocalForwardingThreadPoolExecutor.scala:119)
at
com.databricks.sql.transaction.tahoe.mst.MSTThreadHelper$.runWithMstTxnId(MSTThreadHelper.scala:57)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$5(S
parkThreadLocalForwardingThreadPoolExecutor.scala:118) at
com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:117)
at
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:116)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:93)
at
org.apache.spark.util.threads.CapturedSparkThreadLocals.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:141)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:601)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at
scala.util.Success.$anonfun$map$1(Try.scala:255) at
scala.util.Success.map(Try.sc
ala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1193)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1666) at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1633)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by:
shaded.databricks.org.apache.hadoop.fs.s3a.RemoteFileChangedException: open
`s3a://databricks-workspace-stack-9216f-bucket/oregon-prod/1030888365966037/FileStore/geoparquet_test/part-00000-tid-6314889
273487124623-1c5646d6-0432-4113-b7dd-a6a5c1bc3864-87-1-c000.snappy.parquet':
Change reported by S3 during open at position 1002. File
s3a://databricks-workspace-stack-9216f-bucket/oregon-prod/1030888365966037/FileStore/geoparquet_test/part-00000-tid-6314889273487124623-1c5646d6-0432-4113-b7dd-a6a5c1bc3864-87-1-c000.snappy.parquet
at given modTime (1000) was unavailable, null at
shaded.databricks.org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:210)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:307)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$2(S3AInputStream.java:469)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:247)
at shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:134)
at shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:128) at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry
$5(Invoker.java:371) at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:435)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:367)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:245)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:289)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:462)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:495)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82) at
com.databricks.common.filesystem.LokiS3AInputStream.$anonfun$read$1(LokiS3FS.scala:251)
at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23) at
com.databricks.common.filesystem.LokiS3AInputStream.withExceptionRewrites(LokiS3FS.scala:244)
... 39 more
```
Setting `spark.databricks.scan.modTimeCheck.enabled` is for getting rid of
the above error. This looks very hacky, but I have not found a better to
resolve this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]