This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 8e9a696ae [SEDONA-495] Raster data source uses shared FileSystem
connections which lead to race condition (#1236)
8e9a696ae is described below
commit 8e9a696ae63ccc927de98fba2600acb8b73ad6ef
Author: Jia Yu <[email protected]>
AuthorDate: Thu Feb 15 00:07:09 2024 -0800
[SEDONA-495] Raster data source uses shared FileSystem connections which
lead to race condition (#1236)
---
.../sedona_sql/io/raster/RasterFileFormat.scala | 5 ++---
.../org/apache/sedona/sql/TestBaseScala.scala | 24 +++++++++++++---------
.../scala/org/apache/sedona/sql/rasterIOTest.scala | 9 +++++---
3 files changed, 22 insertions(+), 16 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
index d7851b11d..eaa3d0295 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
@@ -20,7 +20,7 @@
package org.apache.spark.sql.sedona_sql.io.raster
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -29,7 +29,6 @@ import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import java.io.IOException
-import java.nio.file.Paths
import java.util.UUID
private[spark] class RasterFileFormat extends FileFormat with
DataSourceRegister {
@@ -82,7 +81,7 @@ private class RasterFileWriter(savePath: String,
dataSchema: StructType,
context: TaskAttemptContext) extends
OutputWriter {
- private val hfs = new Path(savePath).getFileSystem(context.getConfiguration)
+ private val hfs = FileSystem.newInstance(new Path(savePath).toUri,
context.getConfiguration)
private val rasterFieldIndex = if (rasterOptions.rasterField.isEmpty)
getRasterFieldIndex else dataSchema.fieldIndex(rasterOptions.rasterField.get)
private def getRasterFieldIndex: Int = {
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index fec235696..4c50bc3c0 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -78,19 +78,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
val buildingDataLocation: String = resourceFolder + "813_buildings_test.csv"
val smallRasterDataLocation: String = resourceFolder + "raster/test1.tiff"
private val factory = new GeometryFactory()
- var hdfsURI: String = _
-
override def beforeAll(): Unit = {
SedonaContext.create(sparkSession)
- // Set up HDFS minicluster
- val baseDir = new File("./target/hdfs/").getAbsoluteFile
- FileUtil.fullyDelete(baseDir)
- val hdfsConf = new HdfsConfiguration
- hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
- val builder = new MiniDFSCluster.Builder(hdfsConf)
- val hdfsCluster = builder.build
- hdfsURI = "hdfs://127.0.0.1:" + hdfsCluster.getNameNodePort + "/"
}
override def afterAll(): Unit = {
@@ -237,4 +227,18 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll
{
}).sum
}).sum
}
+
+ /**
+ * Create a mini HDFS cluster and return the HDFS instance and the URI.
+ * @return (MiniDFSCluster, HDFS URI)
+ */
+ def creatMiniHdfs(): (MiniDFSCluster, String) = {
+ val baseDir = new File("./target/hdfs/").getAbsoluteFile
+ FileUtil.fullyDelete(baseDir)
+ val hdfsConf = new HdfsConfiguration
+ hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
+ val builder = new MiniDFSCluster.Builder(hdfsConf)
+ val hdfsCluster = builder.build
+ (hdfsCluster, "hdfs://127.0.0.1:" + hdfsCluster.getNameNodePort + "/")
+ }
}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
index d5203e6a0..3875439d2 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -19,6 +19,7 @@
package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.spark.sql.SaveMode
import org.junit.Assert.assertEquals
import org.scalatest.{BeforeAndAfter, GivenWhenThen}
@@ -149,12 +150,14 @@ class rasterIOTest extends TestBaseScala with
BeforeAndAfter with GivenWhenThen
}
it("should read geotiff using binary source and write geotiff back to hdfs
using raster source") {
- var rasterDf =
sparkSession.read.format("binaryFile").load(rasterdatalocation)
+ val miniHDFS: (MiniDFSCluster, String) = creatMiniHdfs()
+ var rasterDf =
sparkSession.read.format("binaryFile").load(rasterdatalocation).repartition(3)
val rasterCount = rasterDf.count()
- rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(hdfsURI +
"/raster-written")
- rasterDf = sparkSession.read.format("binaryFile").load(hdfsURI +
"/raster-written/*")
+
rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(miniHDFS._2 +
"/raster-written")
+ rasterDf = sparkSession.read.format("binaryFile").load(miniHDFS._2 +
"/raster-written/*")
rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
assert(rasterDf.count() == rasterCount)
+ miniHDFS._1.shutdown()
}
}