This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dcbebce9eacb [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom target directories dcbebce9eacb is described below commit dcbebce9eacb201cc8dfac918318be04ada842a8 Author: vicennial <venkata.gud...@databricks.com> AuthorDate: Tue Dec 12 14:10:41 2023 -0800 [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom target directories ### What changes were proposed in this pull request? Adds new client APIs to the Spark Connect Scala Client: - `def addArtifact(bytes: Array[Byte], target: String): Unit` - `def addArtifact(source: String, target: String): Unit` ### Why are the changes needed? Currently, without the use of a REPL/Class finder, there is no API to support adding artifacts (file-based and in-memory) with a custom target directory structure to the remote Spark Connect session. ### Does this PR introduce _any_ user-facing change? Yes. Users can do the following for classfiles and jars: ```scala addArtifact("/Users/dummyUser/files/foo/bar.class", "sub/directory/foo.class") addArtifact(bytesBar, "bar.class") ``` This would preserve the directory structure in the remote server. In this case, the file would be stored under the directory: `$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/bar.class` `$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/sub/directory/foo.class` ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44109 from vicennial/SPARK-46202. Authored-by: vicennial <venkata.gud...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../scala/org/apache/spark/sql/SparkSession.scala | 41 ++++++++ .../spark/sql/connect/client/ArtifactSuite.scala | 50 ++++++++++ .../spark/sql/connect/client/ArtifactManager.scala | 108 +++++++++++++++++---- .../sql/connect/client/SparkConnectClient.scala | 37 +++++++ 4 files changed, 219 insertions(+), 17 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index daa172e215ad..81c2ca11a7fb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -590,6 +590,47 @@ class SparkSession private[sql] ( @Experimental def addArtifact(uri: URI): Unit = client.addArtifact(uri) + /** + * Add a single in-memory artifact to the session while preserving the directory structure + * specified by `target` under the session's working directory of that particular file + * extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact(bytesBar, "foo/bar.class") + * addArtifact(bytesFlat, "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + * + * @since 4.0.0 + */ + @Experimental + def addArtifact(bytes: Array[Byte], target: String): Unit = client.addArtifact(bytes, target) + + /** + * Add a single artifact to the session while preserving the directory structure specified by + * `target` under the session's working directory of that particular file extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class") + * addArtifact("/Users/dummyUser/files/flat.class", "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + * + * @since 4.0.0 + */ + @Experimental + def addArtifact(source: String, target: String): Unit = client.addArtifact(source, target) + /** * Add one or more artifacts to the session. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index f945313d2427..0c8ef8e599fb 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -284,4 +284,54 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { } } + + test("artifact with custom target") { + val artifactPath = artifactFilePath.resolve("smallClassFile.class") + val target = "sub/package/smallClassFile.class" + artifactManager.addArtifact(artifactPath.toString, target) + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + // Single `AddArtifactRequest` + assert(receivedRequests.size == 1) + + val request = receivedRequests.head + assert(request.hasBatch) + + val batch = request.getBatch + // Single artifact in batch + assert(batch.getArtifactsList.size() == 1) + + val singleChunkArtifact = batch.getArtifacts(0) + assert(singleChunkArtifact.getName.equals(s"classes/$target")) + assertFileDataEquality(singleChunkArtifact.getData, artifactPath) + } + + test("in-memory artifact with custom target") { + val artifactPath = artifactFilePath.resolve("smallClassFile.class") + val artifactBytes = Files.readAllBytes(artifactPath) + val target = "sub/package/smallClassFile.class" + artifactManager.addArtifact(artifactBytes, target) + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + // Single `AddArtifactRequest` + assert(receivedRequests.size == 1) + + val request = receivedRequests.head + assert(request.hasBatch) + + val batch = request.getBatch + // Single artifact in batch + assert(batch.getArtifactsList.size() == 1) + + val singleChunkArtifact = batch.getArtifacts(0) + assert(singleChunkArtifact.getName.equals(s"classes/$target")) + assert(singleChunkArtifact.getData.getData == ByteString.copyFrom(artifactBytes)) + } + + test( + "When both source and target paths are given, extension conditions are checked " + + "on target path") { + val artifactPath = artifactFilePath.resolve("smallClassFile.class") + assertThrows[UnsupportedOperationException] { + artifactManager.addArtifact(artifactPath.toString, "dummy.extension") + } + } } diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index 3cd35803d1ec..36bc60c7d63a 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.connect.client -import java.io.{ByteArrayInputStream, InputStream, PrintStream} +import java.io.{ByteArrayInputStream, File, InputStream, PrintStream} import java.net.URI import java.nio.file.{Files, Path, Paths} import java.util.Arrays @@ -83,14 +83,10 @@ class ArtifactManager( uri.getScheme match { case "file" => val path = Paths.get(uri) - val artifact = path.getFileName.toString match { - case jar if jar.endsWith(".jar") => - newJarArtifact(path.getFileName, new LocalFile(path)) - case cf if cf.endsWith(".class") => - newClassArtifact(path.getFileName, new LocalFile(path)) - case other => - throw new UnsupportedOperationException(s"Unsupported file format: $other") - } + val artifact = Artifact.newArtifactFromExtension( + path.getFileName.toString, + path.getFileName, + new LocalFile(path)) Seq[Artifact](artifact) case "ivy" => @@ -108,6 +104,55 @@ class ArtifactManager( */ def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + /** + * Add a single in-memory artifact to the session while preserving the directory structure + * specified by `target` under the session's working directory of that particular file + * extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact(bytesBar, "foo/bar.class") + * addArtifact(bytesFlat, "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + */ + def addArtifact(bytes: Array[Byte], target: String): Unit = { + val targetPath = Paths.get(target) + val artifact = Artifact.newArtifactFromExtension( + targetPath.getFileName.toString, + targetPath, + new InMemory(bytes)) + addArtifacts(artifact :: Nil) + } + + /** + * Add a single artifact to the session while preserving the directory structure specified by + * `target` under the session's working directory of that particular file extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class") + * addArtifact("/Users/dummyUser/files/flat.class", "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + */ + def addArtifact(source: String, target: String): Unit = { + val targetPath = Paths.get(target) + val artifact = Artifact.newArtifactFromExtension( + targetPath.getFileName.toString, + targetPath, + new LocalFile(Paths.get(source))) + addArtifacts(artifact :: Nil) + } + /** * Add multiple artifacts to the session. * @@ -366,12 +411,26 @@ object Artifact { val JAR_PREFIX: Path = Paths.get("jars") val CACHE_PREFIX: Path = Paths.get("cache") - def newJarArtifact(fileName: Path, storage: LocalData): Artifact = { - newArtifact(JAR_PREFIX, ".jar", fileName, storage) + def newArtifactFromExtension( + fileName: String, + targetFilePath: Path, + storage: LocalData): Artifact = { + fileName match { + case jar if jar.endsWith(".jar") => + newJarArtifact(targetFilePath, storage) + case cf if cf.endsWith(".class") => + newClassArtifact(targetFilePath, storage) + case other => + throw new UnsupportedOperationException(s"Unsupported file format: $other") + } + } + + def newJarArtifact(targetFilePath: Path, storage: LocalData): Artifact = { + newArtifact(JAR_PREFIX, ".jar", targetFilePath, storage) } - def newClassArtifact(fileName: Path, storage: LocalData): Artifact = { - newArtifact(CLASS_PREFIX, ".class", fileName, storage) + def newClassArtifact(targetFilePath: Path, storage: LocalData): Artifact = { + newArtifact(CLASS_PREFIX, ".class", targetFilePath, storage) } def newCacheArtifact(id: String, storage: LocalData): Artifact = { @@ -412,14 +471,29 @@ object Artifact { jars.map(p => Paths.get(p)).map(path => newJarArtifact(path.getFileName, new LocalFile(path))) } + private def concatenatePaths(basePath: Path, otherPath: Path): Path = { + // We avoid using the `.resolve()` method here to ensure that we're concatenating the two + // paths even if `otherPath` is absolute. + val concatenatedPath = Paths.get(basePath.toString, otherPath.toString) + // Note: The normalized resulting path may still reference parent directories if the + // `otherPath` contains sufficient number of parent operators (i.e ".."). + // Example: `basePath` = "/base", `otherPath` = "subdir/../../file.txt" + // Then, `concatenatedPath` = "/base/subdir/../../file.txt" + // and `normalizedPath` = "/base/file.txt". + val normalizedPath = concatenatedPath.normalize() + // Verify that the prefix of the `normalizedPath` starts with `basePath/`. + require( + normalizedPath != basePath && normalizedPath.startsWith(s"$basePath${File.separator}")) + normalizedPath + } + private def newArtifact( prefix: Path, requiredSuffix: String, - fileName: Path, + targetFilePath: Path, storage: LocalData): Artifact = { - require(!fileName.isAbsolute) - require(fileName.toString.endsWith(requiredSuffix)) - new Artifact(prefix.resolve(fileName), storage) + require(targetFilePath.toString.endsWith(requiredSuffix)) + new Artifact(concatenatePaths(prefix, targetFilePath), storage) } /** diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index c2776e65392f..cd1dfbd2e734 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -301,6 +301,43 @@ private[sql] class SparkConnectClient( */ def addArtifact(uri: URI): Unit = artifactManager.addArtifact(uri) + /** + * Add a single in-memory artifact to the session while preserving the directory structure + * specified by `target` under the session's working directory of that particular file + * extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact(bytesBar, "foo/bar.class") + * addArtifact(bytesFlat, "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + */ + def addArtifact(bytes: Array[Byte], target: String): Unit = + artifactManager.addArtifact(bytes, target) + + /** + * Add a single artifact to the session while preserving the directory structure specified by + * `target` under the session's working directory of that particular file extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class") + * addArtifact("/Users/dummyUser/files/flat.class", "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + */ + def addArtifact(source: String, target: String): Unit = + artifactManager.addArtifact(source, target) + /** * Add multiple artifacts to the session. * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org