Copilot commented on code in PR #2637:
URL: https://github.com/apache/sedona/pull/2637#discussion_r2786886477


##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }

Review Comment:
   `file://...` URIs are treated as local (good), but `resolveDataDir` returns 
them unchanged, which can break jpostal if it expects a plain filesystem path 
(e.g., `/tmp/libpostal`) rather than a URI string (`file:///tmp/libpostal`). 
Consider normalizing `file:` URIs in `resolveDataDir` (or in a dedicated 
helper) by converting them to a local path (e.g., via `new 
File(uri).getAbsolutePath`) before returning.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }

Review Comment:
   `file://...` URIs are treated as local (good), but `resolveDataDir` returns 
them unchanged, which can break jpostal if it expects a plain filesystem path 
(e.g., `/tmp/libpostal`) rather than a URI string (`file:///tmp/libpostal`). 
Consider normalizing `file:` URIs in `resolveDataDir` (or in a dedicated 
helper) by converting them to a local path (e.g., via `new 
File(uri).getAbsolutePath`) before returning.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)

Review Comment:
   The cache directory is a predictable path under `java.io.tmpdir` derived 
from a hash of the remote URI. On multi-tenant systems, this predictability can 
enable symlink/hardlink attacks or cache poisoning if other users can write to 
the same temp directory. Consider hardening by (a) validating `localCacheDir` 
is not a symlink (and ideally that all path components are not symlinks), (b) 
checking canonical paths remain under the intended base cache directory, and 
(c) setting restrictive permissions on the cache directory/marker file where 
possible.



##########
spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit}
+import scala.collection.mutable.ListBuffer
+
+class LibPostalDataLoaderTest extends TestBaseScala with Matchers {
+
+  describe("LibPostalDataLoader") {
+
+    describe("isRemotePath") {
+      it("should return false for local paths") {
+        LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false
+      }
+
+      it("should return false for relative paths") {
+        LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false
+      }
+
+      it("should return false for file:// URIs") {
+        LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe 
false
+      }
+
+      it("should return true for hdfs:// URIs") {
+        LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for hdfs:// URIs with host") {
+        
LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3a:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for gs:// URIs") {
+        LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for abfs:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "abfs://[email protected]/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for wasb:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "wasb://[email protected]/libpostal/") 
shouldBe true
+      }
+
+      it("should return false for empty string") {
+        LibPostalDataLoader.isRemotePath("") shouldBe false
+      }
+
+      it("should return false for Windows-like paths") {
+        // Single-letter scheme like C: should not be treated as remote
+        LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe 
false
+      }
+    }
+
+    describe("resolveDataDir") {
+      it("should return local path unchanged") {
+        val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile
+        try {
+          val result = 
LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath)
+          result shouldBe tempDir.getAbsolutePath
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should copy remote directory to local cache and return local path") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS with expected 
subdirs
+          val remotePath = hdfsUri + "libpostal-data/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          // Create the subdirectories that libpostal expects
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            // Write a dummy file
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          // Resolve the remote path
+          val localPath = LibPostalDataLoader.resolveDataDir(remotePath)
+
+          // Verify the result is a local path
+          localPath should not startWith "hdfs://"
+          new File(localPath).exists() shouldBe true
+
+          // Verify all subdirs were copied
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            localSubdir.isDirectory shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Verify the marker file exists
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Verify trailing separator
+          localPath should endWith(File.separator)
+
+          // Call again — should use cache (no re-copy)
+          val localPath2 = LibPostalDataLoader.resolveDataDir(remotePath)
+          localPath2 shouldBe localPath
+
+          // Clean up local cache
+          deleteDirectory(new File(localPath))
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should handle concurrent access from multiple threads safely") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS
+          val remotePath = hdfsUri + "libpostal-concurrent/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          val numThreads = 8
+          val barrier = new CyclicBarrier(numThreads)
+          val executor = Executors.newFixedThreadPool(numThreads)

Review Comment:
   If the test throws before `executor.shutdown()` (e.g., a `get()` timeout or 
unexpected exception), the thread pool can leak and destabilize subsequent 
tests. Wrap the executor lifecycle in a `try/finally` that always calls 
`shutdownNow()` (or `shutdown()` + `awaitTermination`) to ensure cleanup even 
on failure.



##########
spark/common/src/main/scala/org/apache/sedona/sql/utils/HadoopFileSystemUtils.scala:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
+import org.slf4j.LoggerFactory
+
+import java.io.File
+
+/**
+ * Shared utilities for copying files from Hadoop-compatible file systems 
(HDFS, S3, GCS, ABFS,
+ * etc.) to the local filesystem. Used by GeoPackage readers and libpostal 
data loading.
+ */
+object HadoopFileSystemUtils {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  /**
+   * Check whether the given path resides on the local filesystem.
+   *
+   * @param conf
+   *   Hadoop configuration
+   * @param path
+   *   the path to check
+   * @return
+   *   true if the path is on a local filesystem
+   */
+  def isLocalFileSystem(conf: Configuration, path: Path): Boolean = {
+    FileSystem.get(path.toUri, conf).isInstanceOf[LocalFileSystem]
+  }
+
+  /**
+   * Copy a single remote file to a local temporary file. If the file is 
already on the local
+   * filesystem, a reference to it is returned directly (no copy).
+   *
+   * @param conf
+   *   Hadoop configuration
+   * @param file
+   *   the remote (or local) file path
+   * @param suffix
+   *   file extension for the temp file (e.g. ".gpkg")
+   * @return
+   *   a tuple of (localFile, wasCopied) – `wasCopied` is false when the file 
was already local
+   */
+  def copyFileToLocal(conf: Configuration, file: Path, suffix: String): (File, 
Boolean) = {
+    if (isLocalFileSystem(conf, file)) {
+      return (new File(file.toUri.getPath), false)
+    }
+
+    val fs = file.getFileSystem(conf)
+    val tempFile = File.createTempFile(java.util.UUID.randomUUID.toString, 
suffix)
+
+    logger.info(
+      "Copying remote file {} to local temp file {}",
+      file: Any,
+      tempFile.getAbsolutePath: Any)
+    fs.copyToLocalFile(file, new Path(tempFile.getAbsolutePath))
+
+    (tempFile, true)
+  }
+
+  /**
+   * Recursively copy a remote directory to a local directory. Existing files 
in the local
+   * directory are not removed, but will be overwritten if they have the same 
relative path.
+   *
+   * @param conf
+   *   Hadoop configuration
+   * @param remoteDir
+   *   the remote directory path
+   * @param localDir
+   *   the local directory to copy into
+   */
+  def copyDirectoryToLocal(conf: Configuration, remoteDir: Path, localDir: 
File): Unit = {
+    val fs = remoteDir.getFileSystem(conf)
+
+    if (!fs.exists(remoteDir)) {
+      throw new IllegalArgumentException(s"Remote directory does not exist: 
$remoteDir")
+    }
+
+    if (!fs.getFileStatus(remoteDir).isDirectory) {
+      throw new IllegalArgumentException(s"Remote path is not a directory: 
$remoteDir")
+    }
+
+    if (!localDir.exists()) {
+      localDir.mkdirs()
+    }

Review Comment:
   `mkdirs()` result is ignored here. If directory creation fails (permissions, 
disk issues, etc.), subsequent copies will fail with less actionable errors. 
Recommend validating `localDir` is a directory and throwing a clear exception 
when `mkdirs()` returns false (and `localDir.isDirectory` is still false). Also 
consider handling the case where `localDir` exists but is not a directory.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      if (localCacheDir.exists()) {
+        if (!localCacheDir.isDirectory) {
+          throw new IllegalStateException(
+            s"Libpostal local cache path exists but is not a directory: 
${localCacheDir.getAbsolutePath}")
+        }
+      } else if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) {
+        throw new IllegalStateException(
+          s"Failed to create libpostal local cache directory at 
${localCacheDir.getAbsolutePath}")
+      }

Review Comment:
   The cache directory is a predictable path under `java.io.tmpdir` derived 
from a hash of the remote URI. On multi-tenant systems, this predictability can 
enable symlink/hardlink attacks or cache poisoning if other users can write to 
the same temp directory. Consider hardening by (a) validating `localCacheDir` 
is not a symlink (and ideally that all path components are not symlinks), (b) 
checking canonical paths remain under the intended base cache directory, and 
(c) setting restrictive permissions on the cache directory/marker file where 
possible.



##########
spark/common/src/test/scala/org/apache/sedona/sql/HadoopFileSystemUtilsTest.scala:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+
+class HadoopFileSystemUtilsTest extends TestBaseScala with Matchers {
+
+  describe("HadoopFileSystemUtils") {
+
+    describe("isLocalFileSystem") {
+      it("should return true for local file paths") {
+        val conf = new Configuration()
+        val tempDir = Files.createTempDirectory("sedona-test").toFile
+        try {
+          HadoopFileSystemUtils.isLocalFileSystem(
+            conf,
+            new Path(tempDir.getAbsolutePath)) shouldBe true
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should return true for file:// URIs") {
+        val conf = new Configuration()
+        val tempDir = Files.createTempDirectory("sedona-test").toFile
+        try {
+          HadoopFileSystemUtils.isLocalFileSystem(
+            conf,
+            new Path("file://" + tempDir.getAbsolutePath)) shouldBe true
+        } finally {
+          tempDir.delete()
+        }
+      }
+    }
+
+    describe("copyFileToLocal") {
+      it("should return the original file when path is local") {
+        val conf = new Configuration()
+        val tempFile = File.createTempFile("sedona-test", ".txt")
+        try {
+          Files.write(tempFile.toPath, "test content".getBytes)
+          val (localFile, wasCopied) =
+            HadoopFileSystemUtils.copyFileToLocal(
+              conf,
+              new Path(tempFile.getAbsolutePath),
+              ".txt")
+          wasCopied shouldBe false
+          localFile.getAbsolutePath shouldBe tempFile.getAbsolutePath
+        } finally {
+          tempFile.delete()
+        }
+      }
+
+      it("should copy a file from HDFS to local") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a test file on HDFS
+          val hdfsPath = new Path(hdfsUri + "test-file.txt")
+          val out = fs.create(hdfsPath)
+          out.writeBytes("hello from hdfs")
+          out.close()
+
+          // Copy to local
+          val (localFile, wasCopied) =
+            HadoopFileSystemUtils.copyFileToLocal(hdfsConf, hdfsPath, ".txt")
+          wasCopied shouldBe true
+          localFile.exists() shouldBe true
+
+          val content = new String(Files.readAllBytes(localFile.toPath))
+          content shouldBe "hello from hdfs"
+
+          // Clean up
+          localFile.delete()
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+    }
+
+    describe("copyDirectoryToLocal") {
+      it("should copy a directory tree from HDFS to local") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a directory structure on HDFS
+          val basePath = new Path(hdfsUri + "test-dir")
+          fs.mkdirs(basePath)
+          fs.mkdirs(new Path(basePath, "subdir1"))
+          fs.mkdirs(new Path(basePath, "subdir2"))
+
+          // Write files
+          val out1 = fs.create(new Path(basePath, "file1.txt"))
+          out1.writeBytes("content1")
+          out1.close()
+
+          val out2 = fs.create(new Path(basePath, "subdir1/file2.txt"))
+          out2.writeBytes("content2")
+          out2.close()
+
+          val out3 = fs.create(new Path(basePath, "subdir2/file3.txt"))
+          out3.writeBytes("content3")
+          out3.close()
+
+          // Copy to local
+          val localDir = Files.createTempDirectory("sedona-hdfs-test").toFile
+          HadoopFileSystemUtils.copyDirectoryToLocal(hdfsConf, basePath, 
localDir)
+
+          // Verify structure
+          new File(localDir, "file1.txt").exists() shouldBe true
+          new File(localDir, "subdir1/file2.txt").exists() shouldBe true
+          new File(localDir, "subdir2/file3.txt").exists() shouldBe true
+
+          // Verify content
+          new String(
+            Files.readAllBytes(new File(localDir, "file1.txt").toPath)) 
shouldBe "content1"
+          new String(
+            Files.readAllBytes(
+              new File(localDir, "subdir1/file2.txt").toPath)) shouldBe 
"content2"
+          new String(
+            Files.readAllBytes(
+              new File(localDir, "subdir2/file3.txt").toPath)) shouldBe 
"content3"
+
+          // Clean up
+          deleteDirectory(localDir)
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should throw for non-existent remote directory") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val localDir = Files.createTempDirectory("sedona-hdfs-test").toFile
+
+          an[IllegalArgumentException] should be thrownBy {
+            HadoopFileSystemUtils.copyDirectoryToLocal(
+              hdfsConf,
+              new Path(hdfsUri + "nonexistent"),
+              localDir)
+          }
+
+          deleteDirectory(localDir)
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should throw when remote path is a file not a directory") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          val filePath = new Path(hdfsUri + "just-a-file.txt")
+          val out = fs.create(filePath)
+          out.writeBytes("not a dir")
+          out.close()
+
+          val localDir = Files.createTempDirectory("sedona-hdfs-test").toFile
+
+          an[IllegalArgumentException] should be thrownBy {
+            HadoopFileSystemUtils.copyDirectoryToLocal(hdfsConf, filePath, 
localDir)
+          }
+
+          deleteDirectory(localDir)
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+    }
+  }
+
+  private def deleteDirectory(dir: File): Unit = {
+    if (dir.isDirectory) {
+      dir.listFiles().foreach(deleteDirectory)

Review Comment:
   `dir.listFiles()` can return `null` (I/O error, permissions), which would 
throw an NPE in the test cleanup path and potentially mask the real failure. 
Consider null-guarding (e.g., 
`Option(dir.listFiles()).getOrElse(Array.empty).foreach(...)`) similar to the 
approach used in `LibPostalDataLoaderTest`.
   ```suggestion
         Option(dir.listFiles()).getOrElse(Array.empty).foreach(deleteDirectory)
   ```



##########
spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit}
+import scala.collection.mutable.ListBuffer
+
+class LibPostalDataLoaderTest extends TestBaseScala with Matchers {
+
+  describe("LibPostalDataLoader") {
+
+    describe("isRemotePath") {
+      it("should return false for local paths") {
+        LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false
+      }
+
+      it("should return false for relative paths") {
+        LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false
+      }
+
+      it("should return false for file:// URIs") {
+        LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe 
false
+      }
+
+      it("should return true for hdfs:// URIs") {
+        LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for hdfs:// URIs with host") {
+        
LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3a:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for gs:// URIs") {
+        LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for abfs:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "abfs://[email protected]/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for wasb:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "wasb://[email protected]/libpostal/") 
shouldBe true
+      }
+
+      it("should return false for empty string") {
+        LibPostalDataLoader.isRemotePath("") shouldBe false
+      }
+
+      it("should return false for Windows-like paths") {
+        // Single-letter scheme like C: should not be treated as remote
+        LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe 
false
+      }
+    }
+
+    describe("resolveDataDir") {
+      it("should return local path unchanged") {
+        val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile
+        try {
+          val result = 
LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath)
+          result shouldBe tempDir.getAbsolutePath
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should copy remote directory to local cache and return local path") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS with expected 
subdirs
+          val remotePath = hdfsUri + "libpostal-data/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          // Create the subdirectories that libpostal expects
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            // Write a dummy file
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          // Resolve the remote path
+          val localPath = LibPostalDataLoader.resolveDataDir(remotePath)
+
+          // Verify the result is a local path
+          localPath should not startWith "hdfs://"
+          new File(localPath).exists() shouldBe true
+
+          // Verify all subdirs were copied
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            localSubdir.isDirectory shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Verify the marker file exists
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Verify trailing separator
+          localPath should endWith(File.separator)
+
+          // Call again — should use cache (no re-copy)
+          val localPath2 = LibPostalDataLoader.resolveDataDir(remotePath)
+          localPath2 shouldBe localPath
+
+          // Clean up local cache
+          deleteDirectory(new File(localPath))
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should handle concurrent access from multiple threads safely") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS
+          val remotePath = hdfsUri + "libpostal-concurrent/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          val numThreads = 8
+          val barrier = new CyclicBarrier(numThreads)
+          val executor = Executors.newFixedThreadPool(numThreads)
+          val results = new ListBuffer[String]()
+          val errors = new ListBuffer[Throwable]()
+          val resultsLock = new AnyRef
+
+          val futures = (1 to numThreads).map { _ =>
+            executor.submit(new Runnable {
+              override def run(): Unit = {
+                try {
+                  // All threads wait here until all are ready, then start 
simultaneously
+                  barrier.await(30, TimeUnit.SECONDS)
+                  val localPath = 
LibPostalDataLoader.resolveDataDir(remotePath)
+                  resultsLock.synchronized {
+                    results += localPath
+                  }
+                } catch {
+                  case e: Throwable =>
+                    resultsLock.synchronized {
+                      errors += e
+                    }
+                }
+              }
+            })
+          }
+
+          // Wait for all threads to complete
+          futures.foreach(_.get(60, TimeUnit.SECONDS))
+          executor.shutdown()
+
+          // No errors should have occurred
+          errors shouldBe empty
+
+          // All threads should have resolved to the same local path
+          results.size shouldBe numThreads
+          results.distinct.size shouldBe 1
+
+          val localPath = results.head
+
+          // Verify the data is intact
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Exactly one marker file should exist
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Clean up
+          deleteDirectory(new File(localPath))

Review Comment:
   If the test throws before `executor.shutdown()` (e.g., a `get()` timeout or 
unexpected exception), the thread pool can leak and destabilize subsequent 
tests. Wrap the executor lifecycle in a `try/finally` that always calls 
`shutdownNow()` (or `shutdown()` + `awaitTermination`) to ensure cleanup even 
on failure.
   ```suggestion
             try {
               val futures = (1 to numThreads).map { _ =>
                 executor.submit(new Runnable {
                   override def run(): Unit = {
                     try {
                       // All threads wait here until all are ready, then start 
simultaneously
                       barrier.await(30, TimeUnit.SECONDS)
                       val localPath = 
LibPostalDataLoader.resolveDataDir(remotePath)
                       resultsLock.synchronized {
                         results += localPath
                       }
                     } catch {
                       case e: Throwable =>
                         resultsLock.synchronized {
                           errors += e
                         }
                     }
                   }
                 })
               }
   
               // Wait for all threads to complete
               futures.foreach(_.get(60, TimeUnit.SECONDS))
               executor.shutdown()
   
               // No errors should have occurred
               errors shouldBe empty
   
               // All threads should have resolved to the same local path
               results.size shouldBe numThreads
               results.distinct.size shouldBe 1
   
               val localPath = results.head
   
               // Verify the data is intact
               for (subdir <- subdirs) {
                 val localSubdir = new File(localPath, subdir)
                 localSubdir.exists() shouldBe true
                 new File(localSubdir, "model.dat").exists() shouldBe true
               }
   
               // Exactly one marker file should exist
               new File(localPath, ".sedona_libpostal_complete").exists() 
shouldBe true
   
               // Clean up
               deleteDirectory(new File(localPath))
             } finally {
               executor.shutdownNow()
             }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to