[
https://issues.apache.org/jira/browse/SEDONA-325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743014#comment-17743014
]
Kristin Cowalcijk commented on SEDONA-325:
------------------------------------------
Actually, we don't need any caches when reading GeoTiff images in
{{RS_FromGeoTiff}}, since spark already read the contents of the GeoTiff files
into byte arrays. The GridCoverage2D should be backed by a planar image holding
no file handles, and everything should be pure memory blocks.
{{RS_FromArcInfoAsciiGrid}} may also be subject to this issue.
> RS_FromGeoTiff is leaking file descriptors
> ------------------------------------------
>
> Key: SEDONA-325
> URL: https://issues.apache.org/jira/browse/SEDONA-325
> Project: Apache Sedona
> Issue Type: Bug
> Affects Versions: 1.4.1
> Reporter: Kristin Cowalcijk
> Priority: Major
>
> I tried loading a raster dataset composed of 20000+ GeoTiff images in a local
> spark session using the following code:
> {code:python}
> df_binary = spark.read.format("binaryFile").option("pathGlobFilter",
> "*.tif").option("recursiveFileLookup", "true").load(DATA_ROOT_PATH +
> '/raster/EuroSAT_MS')
> df_geotiff = df_binary.withColumn("rast",
> expr("RS_FromGeoTiff(content)")).withColumn("name", expr("reverse(split(path,
> '/'))[0]")).select("name", "length", "rast")
> df_geotiff.where("name LIKE 'Forest_%.tif'").selectExpr("name",
> "RS_BandAsArray(rast, 3) as band").orderBy("name").show()
> {code}
> The spark job failed with the following error messages:
> {code:java}
> Py4JJavaError: An error occurred while calling o70.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 782
> in stage 5.0 failed 1 times, most recent failure: Lost task 782.0 in stage
> 5.0 (TID 786) (kontinuation executor driver): java.io.FileNotFoundException:
> /home/kontinuation/documents/wherobots/notebooks/data/raster/EuroSAT_MS/Forest/Forest_2298.tif
> (Too many open files)
> It is possible the underlying files have been updated. You can explicitly
> invalidate
> the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
> recreating the Dataset/DataFrame involved.
>
> at
> org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
> at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:664)
> at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
> at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1539)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:136)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
> It says that the spark job is opening too many files. If we run {{lsof}} to
> inspect opened files, we can see most of them are temporary files prefixed by
> {{{}imageio{}}}:
> {code:java}
> java 3843951 kontinuation 1006u REG 252,1 107244
> 1204728 /tmp/imageio3709666550975207536.tmp
> java 3843951 kontinuation 1007u REG 252,1 107244
> 1204729 /tmp/imageio7503001112441146978.tmp
> java 3843951 kontinuation 1008u REG 252,1 107244
> 1204730 /tmp/imageio1035759556272836613.tmp
> java 3843951 kontinuation 1009u REG 252,1 107244
> 1204731 /tmp/imageio451679980601844202.tmp
> java 3843951 kontinuation 1010u REG 252,1 107244
> 1204732 /tmp/imageio2111699718021158223.tmp
> java 3843951 kontinuation 1011u REG 252,1 107244
> 1204733 /tmp/imageio8919853818666809481.tmp
> java 3843951 kontinuation 1012u REG 252,1 107244
> 1204734 /tmp/imageio6956257348066899899.tmp
> java 3843951 kontinuation 1013u REG 252,1 107244
> 1204735 /tmp/imageio3045964803135174263.tmp
> java 3843951 kontinuation 1014u REG 252,1 107244
> 1204736 /tmp/imageio8138794596381465904.tmp
> java 3843951 kontinuation 1015u REG 252,1 107244
> 1204737 /tmp/imageio6991404647914889791.tmp
> java 3843951 kontinuation 1016u REG 252,1 107244
> 1204738 /tmp/imageio3098287432603901322.tmp
> java 3843951 kontinuation 1017u REG 252,1 107244
> 1204739 /tmp/imageio599912999779858439.tmp
> java 3843951 kontinuation 1018u REG 252,1 107244
> 1204740 /tmp/imageio8841430021636925470.tmp
> java 3843951 kontinuation 1019u REG 252,1 107244
> 1204741 /tmp/imageio8981079233288315985.tmp
> java 3843951 kontinuation 1020u REG 252,1 107244
> 1204742 /tmp/imageio3673591736487787612.tmp
> java 3843951 kontinuation 1021u REG 252,1 107244
> 1204743 /tmp/imageio8805168727392534534.tmp
> java 3843951 kontinuation 1022u REG 252,1 107244
> 1204744 /tmp/imageio441228595459753924.tmp
> java 3843951 kontinuation 1023u REG 252,1 107244
> 1204753 /tmp/imageio6548224310964783498.tmp
> {code}
> My first attempt to fix the problem is to dispose the GridCoverage2D object
> after using it inĀ {{RS_BandAsArray}}. However, it does not fix this problem.
> I've done further investigations and found that there's another problem in
> the GeoTiffReader provided by GeoTools: it initializes a file-backed cache
> when reading GeoTiff from an input stream, and won't close the file-backed
> cache when the grid coverage object was disposed. The temporary files named
> {{imageioXXXX}} where created by the file-backed cache. If the size of the
> raster dataset exceeds the maximum number of opened files, the job will fail
> and the spark session won't properly respond to any future queries.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)