This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch feature/2360-libpostal-remote-datadir
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 7d39f232f0408537d3f80a0e3a7388314f9f6b4f
Author: Jia Yu <[email protected]>
AuthorDate: Tue Feb 10 00:46:18 2026 -0800

    [SEDONA-2360] Support fetching libpostal model data from HDFS/object store
    
    - Add shared HadoopFileSystemUtils with copyFileToLocal and 
copyDirectoryToLocal
    - Refactor GeoPackage FileSystemUtils to delegate to shared utility
    - Add LibPostalDataLoader to resolve remote dataDir (HDFS, S3, GCS, ABFS) 
to local cache
    - Update LibPostalUtils to use LibPostalDataLoader for remote path 
resolution
    - Disable jpostal auto-download when data is fetched from remote store
    - Add tests for HadoopFileSystemUtils and LibPostalDataLoader using 
MiniDFSCluster
    - Update docs for ExpandAddress and ParseAddress with remote dataDir 
instructions
---
 docs/api/sql/Function.md                           |  10 +-
 .../geopackage/connection/FileSystemUtils.scala    |  18 +-
 .../sedona/sql/utils/HadoopFileSystemUtils.scala   | 130 +++++++++++
 .../expressions/LibPostalDataLoader.scala          | 151 +++++++++++++
 .../sedona_sql/expressions/LibPostalUtils.scala    |  11 +-
 .../sedona/sql/HadoopFileSystemUtilsTest.scala     | 208 +++++++++++++++++
 .../sedona/sql/LibPostalDataLoaderTest.scala       | 249 +++++++++++++++++++++
 7 files changed, 758 insertions(+), 19 deletions(-)

diff --git a/docs/api/sql/Function.md b/docs/api/sql/Function.md
index ad1895524a..2d051241c8 100644
--- a/docs/api/sql/Function.md
+++ b/docs/api/sql/Function.md
@@ -22,11 +22,14 @@
 Introduction: Returns an array of expanded forms of the input address string. 
This is backed by the [libpostal](https://github.com/openvenues/libpostal) 
library's address expanding functionality.
 
 !!!Note
-    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the function is called for the first time.
+    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the function is called for the first time, unless the data 
directory is configured to point to a pre-populated remote or local path.
 
 !!!Note
     The version of jpostal installed with this package only supports Linux and 
MacOS. If you are using Windows, you will need to install libjpostal and 
libpostal manually and ensure that they are available in your 
`java.library.path`.
 
+!!!Note
+    The data directory can be configured via `spark.sedona.libpostal.dataDir`. 
You can point it to a remote filesystem path (HDFS, S3, GCS, ABFS, etc.) such 
as `hdfs:///data/libpostal/` or `s3a://my-bucket/libpostal/`. When a remote 
path is used, Sedona will automatically copy the data to a local cache 
directory on each executor before initializing jpostal. The automatic internet 
download is disabled in this mode, so the remote directory must already contain 
the libpostal model files.
+
 Format: `ExpandAddress (address: String)`
 
 Since: `v1.8.0`
@@ -48,11 +51,14 @@ Output:
 Introduction: Returns an array of the components (e.g. street, postal code) of 
the input address string. This is backed by the 
[libpostal](https://github.com/openvenues/libpostal) library's address parsing 
functionality.
 
 !!!Note
-    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the library is initialized.
+    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the library is initialized, unless the data directory is 
configured to point to a pre-populated remote or local path.
 
 !!!Note
     The version of jpostal installed with this package only supports Linux and 
MacOS. If you are using Windows, you will need to install libjpostal and 
libpostal manually and ensure that they are available in your 
`java.library.path`.
 
+!!!Note
+    The data directory can be configured via `spark.sedona.libpostal.dataDir`. 
You can point it to a remote filesystem path (HDFS, S3, GCS, ABFS, etc.) such 
as `hdfs:///data/libpostal/` or `s3a://my-bucket/libpostal/`. When a remote 
path is used, Sedona will automatically copy the data to a local cache 
directory on each executor before initializing jpostal. The automatic internet 
download is disabled in this mode, so the remote directory must already contain 
the libpostal model files.
+
 Format: `ParseAddress (address: String)`
 
 Since: `v1.8.0`
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/geopackage/connection/FileSystemUtils.scala
 
b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/geopackage/connection/FileSystemUtils.scala
index be47cd35cd..56aa4fd070 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/geopackage/connection/FileSystemUtils.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/geopackage/connection/FileSystemUtils.scala
@@ -19,27 +19,15 @@
 package org.apache.sedona.sql.datasources.geopackage.connection
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
 
 import java.io.File
 
 object FileSystemUtils {
 
   def copyToLocal(options: Configuration, file: Path): (File, Boolean) = {
-    if (isLocalFileSystem(options, file)) {
-      return (new File(file.toUri.getPath), false)
-    }
-
-    val fs = file.getFileSystem(options)
-    val tempFile = File.createTempFile(java.util.UUID.randomUUID.toString, 
".gpkg")
-
-    fs.copyToLocalFile(file, new Path(tempFile.getAbsolutePath))
-
-    (tempFile, true)
-  }
-
-  private def isLocalFileSystem(conf: Configuration, path: Path): Boolean = {
-    FileSystem.get(path.toUri, 
conf).isInstanceOf[org.apache.hadoop.fs.LocalFileSystem]
+    HadoopFileSystemUtils.copyFileToLocal(options, file, ".gpkg")
   }
 
 }
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/utils/HadoopFileSystemUtils.scala
 
b/spark/common/src/main/scala/org/apache/sedona/sql/utils/HadoopFileSystemUtils.scala
new file mode 100644
index 0000000000..68102c9913
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/sedona/sql/utils/HadoopFileSystemUtils.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
+import org.slf4j.LoggerFactory
+
+import java.io.File
+
+/**
+ * Shared utilities for copying files from Hadoop-compatible file systems 
(HDFS, S3, GCS, ABFS,
+ * etc.) to the local filesystem. Used by GeoPackage readers and libpostal 
data loading.
+ */
+object HadoopFileSystemUtils {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  /**
+   * Check whether the given path resides on the local filesystem.
+   *
+   * @param conf
+   *   Hadoop configuration
+   * @param path
+   *   the path to check
+   * @return
+   *   true if the path is on a local filesystem
+   */
+  def isLocalFileSystem(conf: Configuration, path: Path): Boolean = {
+    FileSystem.get(path.toUri, conf).isInstanceOf[LocalFileSystem]
+  }
+
+  /**
+   * Copy a single remote file to a local temporary file. If the file is 
already on the local
+   * filesystem, a reference to it is returned directly (no copy).
+   *
+   * @param conf
+   *   Hadoop configuration
+   * @param file
+   *   the remote (or local) file path
+   * @param suffix
+   *   file extension for the temp file (e.g. ".gpkg")
+   * @return
+   *   a tuple of (localFile, wasCopied) – `wasCopied` is false when the file 
was already local
+   */
+  def copyFileToLocal(conf: Configuration, file: Path, suffix: String): (File, 
Boolean) = {
+    if (isLocalFileSystem(conf, file)) {
+      return (new File(file.toUri.getPath), false)
+    }
+
+    val fs = file.getFileSystem(conf)
+    val tempFile = File.createTempFile(java.util.UUID.randomUUID.toString, 
suffix)
+
+    logger.info(
+      "Copying remote file {} to local temp file {}",
+      file: Any,
+      tempFile.getAbsolutePath: Any)
+    fs.copyToLocalFile(file, new Path(tempFile.getAbsolutePath))
+
+    (tempFile, true)
+  }
+
+  /**
+   * Recursively copy a remote directory to a local directory. Existing files 
in the local
+   * directory are not removed, but will be overwritten if they have the same 
relative path.
+   *
+   * @param conf
+   *   Hadoop configuration
+   * @param remoteDir
+   *   the remote directory path
+   * @param localDir
+   *   the local directory to copy into
+   */
+  def copyDirectoryToLocal(conf: Configuration, remoteDir: Path, localDir: 
File): Unit = {
+    val fs = remoteDir.getFileSystem(conf)
+
+    if (!fs.exists(remoteDir)) {
+      throw new IllegalArgumentException(s"Remote directory does not exist: 
$remoteDir")
+    }
+
+    if (!fs.getFileStatus(remoteDir).isDirectory) {
+      throw new IllegalArgumentException(s"Remote path is not a directory: 
$remoteDir")
+    }
+
+    if (!localDir.exists()) {
+      localDir.mkdirs()
+    }
+
+    logger.info(
+      "Copying remote directory {} to local directory {}",
+      remoteDir: Any,
+      localDir.getAbsolutePath: Any)
+
+    val statuses = fs.listStatus(remoteDir)
+    for (status <- statuses) {
+      val name = status.getPath.getName
+      val localTarget = new File(localDir, name)
+
+      if (status.isDirectory) {
+        // Recurse into subdirectories
+        localTarget.mkdirs()
+        copyDirectoryToLocal(conf, status.getPath, localTarget)
+      } else {
+        // Copy individual file
+        fs.copyToLocalFile(false, status.getPath, new 
Path(localTarget.getAbsolutePath), true)
+      }
+    }
+
+    logger.info(
+      "Finished copying remote directory {} to {}",
+      remoteDir: Any,
+      localDir.getAbsolutePath: Any)
+  }
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
new file mode 100644
index 0000000000..9214d4eb17
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      localCacheDir.mkdirs()
+
+      val hadoopConf =
+        try {
+          SparkHadoopUtil.get.conf
+        } catch {
+          case _: Exception => new Configuration()
+        }
+      val remoteHadoopPath = new Path(remotePath)
+
+      HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, 
localCacheDir)
+
+      // Write marker file to indicate successful completion
+      markerFile.createNewFile()
+
+      // Remove the lock entry now that the cache is populated — future callers
+      // will return early via the markerFile.exists() fast path.
+      locks.remove(cacheKey)
+
+      logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)
+    }
+
+    ensureTrailingSlash(localCacheDir.getAbsolutePath)
+  }
+
+  private def hashPath(path: String): String = {
+    val digest = MessageDigest.getInstance("SHA-256")
+    val hash = digest.digest(path.getBytes(StandardCharsets.UTF_8))
+    hash.take(16).map("%02x".format(_)).mkString
+  }
+
+  private def ensureTrailingSlash(path: String): String = {
+    if (path.endsWith("/") || path.endsWith(File.separator)) path
+    else path + File.separator
+  }
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
index fe23cab722..5859a26054 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
@@ -23,10 +23,17 @@ import com.mapzen.jpostal.{AddressExpander, AddressParser, 
Config}
 object LibPostalUtils {
 
   private def getConfig(dataDir: String, useSenzing: Boolean): Config = {
+    // Resolve the data directory: if it points to a remote filesystem (HDFS, 
S3, etc.),
+    // copy the data locally first since jpostal requires local filesystem 
access.
+    val resolvedDir = LibPostalDataLoader.resolveDataDir(dataDir)
+    val isRemote = LibPostalDataLoader.isRemotePath(dataDir)
+
     Config
       .builder()
-      .dataDir(dataDir)
-      .downloadDataIfNeeded(true)
+      .dataDir(resolvedDir)
+      // When data was fetched from a remote store, it is already fully 
populated;
+      // disable jpostal's built-in internet download to avoid network access 
on the cluster.
+      .downloadDataIfNeeded(!isRemote)
       .senzing(useSenzing)
       .build()
   }
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/HadoopFileSystemUtilsTest.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/HadoopFileSystemUtilsTest.scala
new file mode 100644
index 0000000000..5ac56d7ade
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/HadoopFileSystemUtilsTest.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+
+class HadoopFileSystemUtilsTest extends TestBaseScala with Matchers {
+
+  describe("HadoopFileSystemUtils") {
+
+    describe("isLocalFileSystem") {
+      it("should return true for local file paths") {
+        val conf = new Configuration()
+        val tempDir = Files.createTempDirectory("sedona-test").toFile
+        try {
+          HadoopFileSystemUtils.isLocalFileSystem(
+            conf,
+            new Path(tempDir.getAbsolutePath)) shouldBe true
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should return true for file:// URIs") {
+        val conf = new Configuration()
+        val tempDir = Files.createTempDirectory("sedona-test").toFile
+        try {
+          HadoopFileSystemUtils.isLocalFileSystem(
+            conf,
+            new Path("file://" + tempDir.getAbsolutePath)) shouldBe true
+        } finally {
+          tempDir.delete()
+        }
+      }
+    }
+
+    describe("copyFileToLocal") {
+      it("should return the original file when path is local") {
+        val conf = new Configuration()
+        val tempFile = File.createTempFile("sedona-test", ".txt")
+        try {
+          Files.write(tempFile.toPath, "test content".getBytes)
+          val (localFile, wasCopied) =
+            HadoopFileSystemUtils.copyFileToLocal(
+              conf,
+              new Path(tempFile.getAbsolutePath),
+              ".txt")
+          wasCopied shouldBe false
+          localFile.getAbsolutePath shouldBe tempFile.getAbsolutePath
+        } finally {
+          tempFile.delete()
+        }
+      }
+
+      it("should copy a file from HDFS to local") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a test file on HDFS
+          val hdfsPath = new Path(hdfsUri + "test-file.txt")
+          val out = fs.create(hdfsPath)
+          out.writeBytes("hello from hdfs")
+          out.close()
+
+          // Copy to local
+          val (localFile, wasCopied) =
+            HadoopFileSystemUtils.copyFileToLocal(hdfsConf, hdfsPath, ".txt")
+          wasCopied shouldBe true
+          localFile.exists() shouldBe true
+
+          val content = new String(Files.readAllBytes(localFile.toPath))
+          content shouldBe "hello from hdfs"
+
+          // Clean up
+          localFile.delete()
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+    }
+
+    describe("copyDirectoryToLocal") {
+      it("should copy a directory tree from HDFS to local") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a directory structure on HDFS
+          val basePath = new Path(hdfsUri + "test-dir")
+          fs.mkdirs(basePath)
+          fs.mkdirs(new Path(basePath, "subdir1"))
+          fs.mkdirs(new Path(basePath, "subdir2"))
+
+          // Write files
+          val out1 = fs.create(new Path(basePath, "file1.txt"))
+          out1.writeBytes("content1")
+          out1.close()
+
+          val out2 = fs.create(new Path(basePath, "subdir1/file2.txt"))
+          out2.writeBytes("content2")
+          out2.close()
+
+          val out3 = fs.create(new Path(basePath, "subdir2/file3.txt"))
+          out3.writeBytes("content3")
+          out3.close()
+
+          // Copy to local
+          val localDir = Files.createTempDirectory("sedona-hdfs-test").toFile
+          HadoopFileSystemUtils.copyDirectoryToLocal(hdfsConf, basePath, 
localDir)
+
+          // Verify structure
+          new File(localDir, "file1.txt").exists() shouldBe true
+          new File(localDir, "subdir1/file2.txt").exists() shouldBe true
+          new File(localDir, "subdir2/file3.txt").exists() shouldBe true
+
+          // Verify content
+          new String(
+            Files.readAllBytes(new File(localDir, "file1.txt").toPath)) 
shouldBe "content1"
+          new String(
+            Files.readAllBytes(
+              new File(localDir, "subdir1/file2.txt").toPath)) shouldBe 
"content2"
+          new String(
+            Files.readAllBytes(
+              new File(localDir, "subdir2/file3.txt").toPath)) shouldBe 
"content3"
+
+          // Clean up
+          deleteDirectory(localDir)
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should throw for non-existent remote directory") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val localDir = Files.createTempDirectory("sedona-hdfs-test").toFile
+
+          an[IllegalArgumentException] should be thrownBy {
+            HadoopFileSystemUtils.copyDirectoryToLocal(
+              hdfsConf,
+              new Path(hdfsUri + "nonexistent"),
+              localDir)
+          }
+
+          deleteDirectory(localDir)
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should throw when remote path is a file not a directory") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          val filePath = new Path(hdfsUri + "just-a-file.txt")
+          val out = fs.create(filePath)
+          out.writeBytes("not a dir")
+          out.close()
+
+          val localDir = Files.createTempDirectory("sedona-hdfs-test").toFile
+
+          an[IllegalArgumentException] should be thrownBy {
+            HadoopFileSystemUtils.copyDirectoryToLocal(hdfsConf, filePath, 
localDir)
+          }
+
+          deleteDirectory(localDir)
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+    }
+  }
+
+  private def deleteDirectory(dir: File): Unit = {
+    if (dir.isDirectory) {
+      dir.listFiles().foreach(deleteDirectory)
+    }
+    dir.delete()
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala
new file mode 100644
index 0000000000..3c7d46b908
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit}
+import scala.collection.mutable.ListBuffer
+
+class LibPostalDataLoaderTest extends TestBaseScala with Matchers {
+
+  describe("LibPostalDataLoader") {
+
+    describe("isRemotePath") {
+      it("should return false for local paths") {
+        LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false
+      }
+
+      it("should return false for relative paths") {
+        LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false
+      }
+
+      it("should return false for file:// URIs") {
+        LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe 
false
+      }
+
+      it("should return true for hdfs:// URIs") {
+        LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for hdfs:// URIs with host") {
+        
LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3a:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for gs:// URIs") {
+        LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for abfs:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "abfs://[email protected]/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for wasb:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "wasb://[email protected]/libpostal/") 
shouldBe true
+      }
+
+      it("should return false for empty string") {
+        LibPostalDataLoader.isRemotePath("") shouldBe false
+      }
+
+      it("should return false for Windows-like paths") {
+        // Single-letter scheme like C: should not be treated as remote
+        LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe 
false
+      }
+    }
+
+    describe("resolveDataDir") {
+      it("should return local path unchanged") {
+        val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile
+        try {
+          val result = 
LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath)
+          result shouldBe tempDir.getAbsolutePath
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should copy remote directory to local cache and return local path") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS with expected 
subdirs
+          val remotePath = hdfsUri + "libpostal-data/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          // Create the subdirectories that libpostal expects
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            // Write a dummy file
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          // Resolve the remote path
+          val localPath = LibPostalDataLoader.resolveDataDir(remotePath)
+
+          // Verify the result is a local path
+          localPath should not startWith "hdfs://"
+          new File(localPath).exists() shouldBe true
+
+          // Verify all subdirs were copied
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            localSubdir.isDirectory shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Verify the marker file exists
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Verify trailing separator
+          localPath should endWith(File.separator)
+
+          // Call again — should use cache (no re-copy)
+          val localPath2 = LibPostalDataLoader.resolveDataDir(remotePath)
+          localPath2 shouldBe localPath
+
+          // Clean up local cache
+          deleteDirectory(new File(localPath))
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should handle concurrent access from multiple threads safely") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS
+          val remotePath = hdfsUri + "libpostal-concurrent/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          val numThreads = 8
+          val barrier = new CyclicBarrier(numThreads)
+          val executor = Executors.newFixedThreadPool(numThreads)
+          val results = new ListBuffer[String]()
+          val errors = new ListBuffer[Throwable]()
+          val resultsLock = new AnyRef
+
+          val futures = (1 to numThreads).map { _ =>
+            executor.submit(new Runnable {
+              override def run(): Unit = {
+                try {
+                  // All threads wait here until all are ready, then start 
simultaneously
+                  barrier.await(30, TimeUnit.SECONDS)
+                  val localPath = 
LibPostalDataLoader.resolveDataDir(remotePath)
+                  resultsLock.synchronized {
+                    results += localPath
+                  }
+                } catch {
+                  case e: Throwable =>
+                    resultsLock.synchronized {
+                      errors += e
+                    }
+                }
+              }
+            })
+          }
+
+          // Wait for all threads to complete
+          futures.foreach(_.get(60, TimeUnit.SECONDS))
+          executor.shutdown()
+
+          // No errors should have occurred
+          errors shouldBe empty
+
+          // All threads should have resolved to the same local path
+          results.size shouldBe numThreads
+          results.distinct.size shouldBe 1
+
+          val localPath = results.head
+
+          // Verify the data is intact
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Exactly one marker file should exist
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Clean up
+          deleteDirectory(new File(localPath))
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+    }
+  }
+
+  private def deleteDirectory(dir: File): Unit = {
+    if (dir.isDirectory) {
+      dir.listFiles().foreach(deleteDirectory)
+    }
+    dir.delete()
+  }
+}


Reply via email to