Copilot commented on code in PR #2637: URL: https://github.com/apache/sedona/pull/2637#discussion_r2787047760
########## spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.expressions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.utils.HadoopFileSystemUtils +import org.apache.spark.deploy.SparkHadoopUtil +import org.slf4j.LoggerFactory + +import java.io.File +import java.net.URI +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.security.MessageDigest +import java.util.concurrent.ConcurrentHashMap + +/** + * Resolves libpostal data directory paths. When the configured data directory points to a remote + * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache directory so that + * jpostal's native code can access it. + * + * The local cache uses a hash of the remote URI to avoid re-downloading on subsequent + * invocations. A marker file `.sedona_libpostal_complete` is written after a successful copy so + * that partially-copied directories are detected and re-copied. + */ +object LibPostalDataLoader { + + private val logger = LoggerFactory.getLogger(getClass) + + private val MARKER_FILE = ".sedona_libpostal_complete" + + /** Per-cache-key lock objects to prevent concurrent downloads to the same directory. */ + private val locks = new ConcurrentHashMap[String, AnyRef]() + + /** + * Resolve the data directory to a local filesystem path. If the configured path already points + * to the local filesystem, it is returned as-is. If it points to a remote filesystem, the + * directory is recursively copied to a local cache under `java.io.tmpdir`. + * + * @param configuredDir + * the data directory path from Sedona configuration (may be local or remote) + * @return + * a local filesystem path suitable for jpostal + */ + def resolveDataDir(configuredDir: String): String = { + if (isRemotePath(configuredDir)) { + copyRemoteToLocalCache(configuredDir) + } else { + normalizeLocalPath(configuredDir) + } + } + + /** + * Normalize a local path. Converts `file:` URIs (e.g. `file:///tmp/libpostal`) to plain + * filesystem paths (`/tmp/libpostal`) so that jpostal receives a path it can use directly. + * Non-URI paths are returned unchanged. + */ + private def normalizeLocalPath(path: String): String = { + try { + val uri = new URI(path) + if (uri.getScheme != null && uri.getScheme.equalsIgnoreCase("file")) { + new File(uri).getAbsolutePath + } else { + path + } + } catch { + case _: Exception => path + } + } + + /** + * Determine whether a path string refers to a remote (non-local) filesystem. + */ + def isRemotePath(path: String): Boolean = { + try { + val uri = new URI(path) + val scheme = uri.getScheme + scheme != null && scheme != "file" && scheme.length > 1 + } catch { + case _: Exception => false + } + } + + /** + * Copy a remote directory to a local cache and return the local path. Uses a hash-based cache + * key so that different remote paths get different local directories. A marker file is used to + * ensure completeness — if the marker is missing (e.g. from a previous interrupted copy), the + * directory is re-copied. + */ + private def copyRemoteToLocalCache(remotePath: String): String = { + val cacheKey = hashPath(remotePath) + val localCacheDir = + new File(System.getProperty("java.io.tmpdir"), s"sedona-libpostal-cache/$cacheKey") + val markerFile = new File(localCacheDir, MARKER_FILE) + + if (markerFile.exists()) { + logger.info( + "Libpostal data already cached at {}, skipping download from {}", + localCacheDir.getAbsolutePath: Any, + remotePath: Any) + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + // Synchronize on a canonical lock object per cache key so that concurrent + // threads on the same JVM don't race to download the same data. + val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef) + lock.synchronized { + // Double-check after acquiring lock + if (markerFile.exists()) { + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + logger.info( + "Copying libpostal data from {} to local cache at {}", + remotePath: Any, + localCacheDir.getAbsolutePath: Any) + + if (localCacheDir.exists()) { + if (!localCacheDir.isDirectory) { + throw new IllegalStateException( + s"Libpostal local cache path exists but is not a directory: ${localCacheDir.getAbsolutePath}") + } + } else if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) { + throw new IllegalStateException( + s"Failed to create libpostal local cache directory at ${localCacheDir.getAbsolutePath}") + } + + validateCacheDirectory(localCacheDir) + + val hadoopConf = + try { + SparkHadoopUtil.get.conf + } catch { + case _: Exception => new Configuration() + } + val remoteHadoopPath = new Path(remotePath) + + try { + HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) + + // Write marker file to indicate successful completion + markerFile.createNewFile() + + logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) + } finally { + // Always remove the lock entry to avoid unbounded growth. + // Use value-based remove to avoid interfering with any updated mapping. + locks.remove(cacheKey, lock) + } Review Comment: If a previous attempt left a partially-copied cache directory (marker missing), this implementation copies *into the existing directory* without clearing it. That can leave stale/extra files from an older/failed download, producing a mixed model directory even though the new marker gets written. Prefer copying into a fresh temp directory and atomically renaming into place, or deleting/emptying `localCacheDir` when `markerFile` is missing before starting a new copy. ########## spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.expressions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.utils.HadoopFileSystemUtils +import org.apache.spark.deploy.SparkHadoopUtil +import org.slf4j.LoggerFactory + +import java.io.File +import java.net.URI +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.security.MessageDigest +import java.util.concurrent.ConcurrentHashMap + +/** + * Resolves libpostal data directory paths. When the configured data directory points to a remote + * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache directory so that + * jpostal's native code can access it. + * + * The local cache uses a hash of the remote URI to avoid re-downloading on subsequent + * invocations. A marker file `.sedona_libpostal_complete` is written after a successful copy so + * that partially-copied directories are detected and re-copied. + */ +object LibPostalDataLoader { + + private val logger = LoggerFactory.getLogger(getClass) + + private val MARKER_FILE = ".sedona_libpostal_complete" + + /** Per-cache-key lock objects to prevent concurrent downloads to the same directory. */ + private val locks = new ConcurrentHashMap[String, AnyRef]() + + /** + * Resolve the data directory to a local filesystem path. If the configured path already points + * to the local filesystem, it is returned as-is. If it points to a remote filesystem, the + * directory is recursively copied to a local cache under `java.io.tmpdir`. + * + * @param configuredDir + * the data directory path from Sedona configuration (may be local or remote) + * @return + * a local filesystem path suitable for jpostal + */ + def resolveDataDir(configuredDir: String): String = { + if (isRemotePath(configuredDir)) { + copyRemoteToLocalCache(configuredDir) + } else { + normalizeLocalPath(configuredDir) + } + } + + /** + * Normalize a local path. Converts `file:` URIs (e.g. `file:///tmp/libpostal`) to plain + * filesystem paths (`/tmp/libpostal`) so that jpostal receives a path it can use directly. + * Non-URI paths are returned unchanged. + */ + private def normalizeLocalPath(path: String): String = { + try { + val uri = new URI(path) + if (uri.getScheme != null && uri.getScheme.equalsIgnoreCase("file")) { + new File(uri).getAbsolutePath + } else { + path + } + } catch { + case _: Exception => path + } + } + + /** + * Determine whether a path string refers to a remote (non-local) filesystem. + */ + def isRemotePath(path: String): Boolean = { + try { + val uri = new URI(path) + val scheme = uri.getScheme + scheme != null && scheme != "file" && scheme.length > 1 + } catch { + case _: Exception => false + } + } + + /** + * Copy a remote directory to a local cache and return the local path. Uses a hash-based cache + * key so that different remote paths get different local directories. A marker file is used to + * ensure completeness — if the marker is missing (e.g. from a previous interrupted copy), the + * directory is re-copied. + */ + private def copyRemoteToLocalCache(remotePath: String): String = { + val cacheKey = hashPath(remotePath) + val localCacheDir = + new File(System.getProperty("java.io.tmpdir"), s"sedona-libpostal-cache/$cacheKey") + val markerFile = new File(localCacheDir, MARKER_FILE) + + if (markerFile.exists()) { + logger.info( + "Libpostal data already cached at {}, skipping download from {}", + localCacheDir.getAbsolutePath: Any, + remotePath: Any) + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + // Synchronize on a canonical lock object per cache key so that concurrent + // threads on the same JVM don't race to download the same data. + val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef) + lock.synchronized { + // Double-check after acquiring lock + if (markerFile.exists()) { + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + logger.info( + "Copying libpostal data from {} to local cache at {}", + remotePath: Any, + localCacheDir.getAbsolutePath: Any) + + if (localCacheDir.exists()) { + if (!localCacheDir.isDirectory) { + throw new IllegalStateException( + s"Libpostal local cache path exists but is not a directory: ${localCacheDir.getAbsolutePath}") + } + } else if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) { + throw new IllegalStateException( + s"Failed to create libpostal local cache directory at ${localCacheDir.getAbsolutePath}") + } + + validateCacheDirectory(localCacheDir) + + val hadoopConf = + try { + SparkHadoopUtil.get.conf + } catch { + case _: Exception => new Configuration() + } + val remoteHadoopPath = new Path(remotePath) + + try { + HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) + + // Write marker file to indicate successful completion + markerFile.createNewFile() + + logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) + } finally { + // Always remove the lock entry to avoid unbounded growth. + // Use value-based remove to avoid interfering with any updated mapping. + locks.remove(cacheKey, lock) + } + } + + ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + /** + * Validate that the cache directory is safe from symlink attacks and set restrictive + * permissions. A predictable path under java.io.tmpdir could be pre-created as a symlink by a + * malicious actor, so we verify that neither the base cache directory nor the hash-specific + * subdirectory is a symbolic link and that the canonical path stays within the expected base. + */ + private def validateCacheDirectory(cacheDir: File): Unit = { + val baseCacheDir = cacheDir.getParentFile + val cachePath = cacheDir.toPath + val basePath = baseCacheDir.toPath + + // Reject symlinks on both the base directory and the hash-specific subdirectory + if (Files.isSymbolicLink(basePath)) { + throw new SecurityException( + s"Libpostal cache base directory is a symbolic link: ${baseCacheDir.getAbsolutePath}") + } + if (Files.isSymbolicLink(cachePath)) { + throw new SecurityException( + s"Libpostal cache directory is a symbolic link: ${cacheDir.getAbsolutePath}") + } + + // Canonical-path check: ensure the resolved path stays under the expected base + val canonicalBase = baseCacheDir.getCanonicalPath + val canonicalCache = cacheDir.getCanonicalPath + if (!canonicalCache.startsWith( + canonicalBase + File.separator) && canonicalCache != canonicalBase) { + throw new SecurityException( + s"Libpostal cache directory escapes the expected base. " + + s"Expected base: $canonicalBase, actual: $canonicalCache") + } + + // Set restrictive permissions (owner-only) to mitigate cache poisoning. + // POSIX permissions are best-effort — on non-POSIX systems we fall back to + // java.io.File permission methods. Review Comment: `validateCacheDirectory` verifies the base dir is not a symlink, but permissions are only tightened on `cacheDir`, not on the parent `sedona-libpostal-cache` directory (which is the predictable shared entry point under `java.io.tmpdir`). To reduce cache poisoning risk, also apply restrictive permissions to `baseCacheDir` (and consider best-effort tightening on newly created subdirectories/files after the copy, since `copyDirectoryToLocal` will create content with default permissions/umask). ```suggestion // First tighten permissions on the predictable shared base directory, // then on the hash-specific cache directory itself. POSIX permissions are // best-effort — on non-POSIX systems we fall back to java.io.File methods. setRestrictivePermissions(baseCacheDir) ``` ########## spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.sql + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader +import org.scalatest.matchers.should.Matchers + +import java.io.File +import java.nio.file.Files +import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit} +import scala.collection.mutable.ListBuffer + +class LibPostalDataLoaderTest extends TestBaseScala with Matchers { + + describe("LibPostalDataLoader") { + + describe("isRemotePath") { + it("should return false for local paths") { + LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false + } + + it("should return false for relative paths") { + LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false + } + + it("should return false for file:// URIs") { + LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe false + } + + it("should return true for hdfs:// URIs") { + LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe true + } + + it("should return true for hdfs:// URIs with host") { + LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") shouldBe true + } + + it("should return true for s3a:// URIs") { + LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") shouldBe true + } + + it("should return true for s3:// URIs") { + LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe true + } + + it("should return true for gs:// URIs") { + LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe true + } + + it("should return true for abfs:// URIs") { + LibPostalDataLoader.isRemotePath( + "abfs://[email protected]/libpostal/") shouldBe true + } + + it("should return true for wasb:// URIs") { + LibPostalDataLoader.isRemotePath( + "wasb://[email protected]/libpostal/") shouldBe true + } + + it("should return false for empty string") { + LibPostalDataLoader.isRemotePath("") shouldBe false + } + + it("should return false for Windows-like paths") { + // Single-letter scheme like C: should not be treated as remote + LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe false + } + } + + describe("resolveDataDir") { + it("should return local path unchanged") { + val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile + try { + val result = LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath) + result shouldBe tempDir.getAbsolutePath + } finally { + tempDir.delete() + } + } + + it("should normalize file: URI to plain local path") { + val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile + try { + val fileUri = tempDir.toURI.toString + val result = LibPostalDataLoader.resolveDataDir(fileUri) + result should not startWith "file:" + result shouldBe tempDir.getAbsolutePath + } finally { + tempDir.delete() + } + } + + it("should normalize file: URI without trailing slash") { + val result = LibPostalDataLoader.resolveDataDir("file:///tmp/libpostal") + result should not startWith "file:" + result shouldBe "/tmp/libpostal" + } + + it("should copy remote directory to local cache and return local path") { + val (hdfsCluster, hdfsUri) = creatMiniHdfs() + try { + val hdfsConf = hdfsCluster.getConfiguration(0) + val fs = FileSystem.get(hdfsConf) + + // Create a mock libpostal data directory on HDFS with expected subdirs + val remotePath = hdfsUri + "libpostal-data/" + val basePath = new Path(remotePath) + fs.mkdirs(basePath) + + // Create the subdirectories that libpostal expects + val subdirs = + Seq( + "transliteration", + "numex", + "address_parser", + "address_expansions", + "language_classifier") + for (subdir <- subdirs) { + val subdirPath = new Path(basePath, subdir) + fs.mkdirs(subdirPath) + // Write a dummy file + val out = fs.create(new Path(subdirPath, "model.dat")) + out.writeBytes(s"data for $subdir") + out.close() Review Comment: Test code manually closes HDFS output streams; if `writeBytes` throws, the stream may leak and make failures harder to diagnose. Wrap stream usage in a `try/finally` (or equivalent resource-management helper) to guarantee `close()` runs. ```suggestion try { out.writeBytes(s"data for $subdir") } finally { out.close() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
