Copilot commented on code in PR #2637:
URL: https://github.com/apache/sedona/pull/2637#discussion_r2787047760


##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermissions
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      normalizeLocalPath(configuredDir)
+    }
+  }
+
+  /**
+   * Normalize a local path. Converts `file:` URIs (e.g. 
`file:///tmp/libpostal`) to plain
+   * filesystem paths (`/tmp/libpostal`) so that jpostal receives a path it 
can use directly.
+   * Non-URI paths are returned unchanged.
+   */
+  private def normalizeLocalPath(path: String): String = {
+    try {
+      val uri = new URI(path)
+      if (uri.getScheme != null && uri.getScheme.equalsIgnoreCase("file")) {
+        new File(uri).getAbsolutePath
+      } else {
+        path
+      }
+    } catch {
+      case _: Exception => path
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      if (localCacheDir.exists()) {
+        if (!localCacheDir.isDirectory) {
+          throw new IllegalStateException(
+            s"Libpostal local cache path exists but is not a directory: 
${localCacheDir.getAbsolutePath}")
+        }
+      } else if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) {
+        throw new IllegalStateException(
+          s"Failed to create libpostal local cache directory at 
${localCacheDir.getAbsolutePath}")
+      }
+
+      validateCacheDirectory(localCacheDir)
+
+      val hadoopConf =
+        try {
+          SparkHadoopUtil.get.conf
+        } catch {
+          case _: Exception => new Configuration()
+        }
+      val remoteHadoopPath = new Path(remotePath)
+
+      try {
+        HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, 
remoteHadoopPath, localCacheDir)
+
+        // Write marker file to indicate successful completion
+        markerFile.createNewFile()
+
+        logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)
+      } finally {
+        // Always remove the lock entry to avoid unbounded growth.
+        // Use value-based remove to avoid interfering with any updated 
mapping.
+        locks.remove(cacheKey, lock)
+      }

Review Comment:
   If a previous attempt left a partially-copied cache directory (marker 
missing), this implementation copies *into the existing directory* without 
clearing it. That can leave stale/extra files from an older/failed download, 
producing a mixed model directory even though the new marker gets written. 
Prefer copying into a fresh temp directory and atomically renaming into place, 
or deleting/emptying `localCacheDir` when `markerFile` is missing before 
starting a new copy.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermissions
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      normalizeLocalPath(configuredDir)
+    }
+  }
+
+  /**
+   * Normalize a local path. Converts `file:` URIs (e.g. 
`file:///tmp/libpostal`) to plain
+   * filesystem paths (`/tmp/libpostal`) so that jpostal receives a path it 
can use directly.
+   * Non-URI paths are returned unchanged.
+   */
+  private def normalizeLocalPath(path: String): String = {
+    try {
+      val uri = new URI(path)
+      if (uri.getScheme != null && uri.getScheme.equalsIgnoreCase("file")) {
+        new File(uri).getAbsolutePath
+      } else {
+        path
+      }
+    } catch {
+      case _: Exception => path
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      if (localCacheDir.exists()) {
+        if (!localCacheDir.isDirectory) {
+          throw new IllegalStateException(
+            s"Libpostal local cache path exists but is not a directory: 
${localCacheDir.getAbsolutePath}")
+        }
+      } else if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) {
+        throw new IllegalStateException(
+          s"Failed to create libpostal local cache directory at 
${localCacheDir.getAbsolutePath}")
+      }
+
+      validateCacheDirectory(localCacheDir)
+
+      val hadoopConf =
+        try {
+          SparkHadoopUtil.get.conf
+        } catch {
+          case _: Exception => new Configuration()
+        }
+      val remoteHadoopPath = new Path(remotePath)
+
+      try {
+        HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, 
remoteHadoopPath, localCacheDir)
+
+        // Write marker file to indicate successful completion
+        markerFile.createNewFile()
+
+        logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)
+      } finally {
+        // Always remove the lock entry to avoid unbounded growth.
+        // Use value-based remove to avoid interfering with any updated 
mapping.
+        locks.remove(cacheKey, lock)
+      }
+    }
+
+    ensureTrailingSlash(localCacheDir.getAbsolutePath)
+  }
+
+  /**
+   * Validate that the cache directory is safe from symlink attacks and set 
restrictive
+   * permissions. A predictable path under java.io.tmpdir could be pre-created 
as a symlink by a
+   * malicious actor, so we verify that neither the base cache directory nor 
the hash-specific
+   * subdirectory is a symbolic link and that the canonical path stays within 
the expected base.
+   */
+  private def validateCacheDirectory(cacheDir: File): Unit = {
+    val baseCacheDir = cacheDir.getParentFile
+    val cachePath = cacheDir.toPath
+    val basePath = baseCacheDir.toPath
+
+    // Reject symlinks on both the base directory and the hash-specific 
subdirectory
+    if (Files.isSymbolicLink(basePath)) {
+      throw new SecurityException(
+        s"Libpostal cache base directory is a symbolic link: 
${baseCacheDir.getAbsolutePath}")
+    }
+    if (Files.isSymbolicLink(cachePath)) {
+      throw new SecurityException(
+        s"Libpostal cache directory is a symbolic link: 
${cacheDir.getAbsolutePath}")
+    }
+
+    // Canonical-path check: ensure the resolved path stays under the expected 
base
+    val canonicalBase = baseCacheDir.getCanonicalPath
+    val canonicalCache = cacheDir.getCanonicalPath
+    if (!canonicalCache.startsWith(
+        canonicalBase + File.separator) && canonicalCache != canonicalBase) {
+      throw new SecurityException(
+        s"Libpostal cache directory escapes the expected base. " +
+          s"Expected base: $canonicalBase, actual: $canonicalCache")
+    }
+
+    // Set restrictive permissions (owner-only) to mitigate cache poisoning.
+    // POSIX permissions are best-effort — on non-POSIX systems we fall back to
+    // java.io.File permission methods.

Review Comment:
   `validateCacheDirectory` verifies the base dir is not a symlink, but 
permissions are only tightened on `cacheDir`, not on the parent 
`sedona-libpostal-cache` directory (which is the predictable shared entry point 
under `java.io.tmpdir`). To reduce cache poisoning risk, also apply restrictive 
permissions to `baseCacheDir` (and consider best-effort tightening on newly 
created subdirectories/files after the copy, since `copyDirectoryToLocal` will 
create content with default permissions/umask).
   ```suggestion
       // First tighten permissions on the predictable shared base directory,
       // then on the hash-specific cache directory itself. POSIX permissions 
are
       // best-effort — on non-POSIX systems we fall back to java.io.File 
methods.
       setRestrictivePermissions(baseCacheDir)
   ```



##########
spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit}
+import scala.collection.mutable.ListBuffer
+
+class LibPostalDataLoaderTest extends TestBaseScala with Matchers {
+
+  describe("LibPostalDataLoader") {
+
+    describe("isRemotePath") {
+      it("should return false for local paths") {
+        LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false
+      }
+
+      it("should return false for relative paths") {
+        LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false
+      }
+
+      it("should return false for file:// URIs") {
+        LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe 
false
+      }
+
+      it("should return true for hdfs:// URIs") {
+        LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for hdfs:// URIs with host") {
+        
LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3a:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for gs:// URIs") {
+        LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for abfs:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "abfs://[email protected]/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for wasb:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "wasb://[email protected]/libpostal/") 
shouldBe true
+      }
+
+      it("should return false for empty string") {
+        LibPostalDataLoader.isRemotePath("") shouldBe false
+      }
+
+      it("should return false for Windows-like paths") {
+        // Single-letter scheme like C: should not be treated as remote
+        LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe 
false
+      }
+    }
+
+    describe("resolveDataDir") {
+      it("should return local path unchanged") {
+        val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile
+        try {
+          val result = 
LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath)
+          result shouldBe tempDir.getAbsolutePath
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should normalize file: URI to plain local path") {
+        val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile
+        try {
+          val fileUri = tempDir.toURI.toString
+          val result = LibPostalDataLoader.resolveDataDir(fileUri)
+          result should not startWith "file:"
+          result shouldBe tempDir.getAbsolutePath
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should normalize file: URI without trailing slash") {
+        val result = 
LibPostalDataLoader.resolveDataDir("file:///tmp/libpostal")
+        result should not startWith "file:"
+        result shouldBe "/tmp/libpostal"
+      }
+
+      it("should copy remote directory to local cache and return local path") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS with expected 
subdirs
+          val remotePath = hdfsUri + "libpostal-data/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          // Create the subdirectories that libpostal expects
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            // Write a dummy file
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()

Review Comment:
   Test code manually closes HDFS output streams; if `writeBytes` throws, the 
stream may leak and make failures harder to diagnose. Wrap stream usage in a 
`try/finally` (or equivalent resource-management helper) to guarantee `close()` 
runs.
   ```suggestion
               try {
                 out.writeBytes(s"data for $subdir")
               } finally {
                 out.close()
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to