This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dcbebce9eacb [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to
support custom target directories
dcbebce9eacb is described below
commit dcbebce9eacb201cc8dfac918318be04ada842a8
Author: vicennial <[email protected]>
AuthorDate: Tue Dec 12 14:10:41 2023 -0800
[SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom
target directories
### What changes were proposed in this pull request?
Adds new client APIs to the Spark Connect Scala Client:
- `def addArtifact(bytes: Array[Byte], target: String): Unit`
- `def addArtifact(source: String, target: String): Unit`
### Why are the changes needed?
Currently, without the use of a REPL/Class finder, there is no API to
support adding artifacts (file-based and in-memory)
with a custom target directory structure to the remote Spark Connect
session.
### Does this PR introduce _any_ user-facing change?
Yes.
Users can do the following for classfiles and jars:
```scala
addArtifact("/Users/dummyUser/files/foo/bar.class",
"sub/directory/foo.class")
addArtifact(bytesBar, "bar.class")
```
This would preserve the directory structure in the remote server. In this
case, the file would be stored under the directory:
`$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/bar.class`
`$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/sub/directory/foo.class`
### How was this patch tested?
Unit test
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44109 from vicennial/SPARK-46202.
Authored-by: vicennial <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 41 ++++++++
.../spark/sql/connect/client/ArtifactSuite.scala | 50 ++++++++++
.../spark/sql/connect/client/ArtifactManager.scala | 108 +++++++++++++++++----
.../sql/connect/client/SparkConnectClient.scala | 37 +++++++
4 files changed, 219 insertions(+), 17 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index daa172e215ad..81c2ca11a7fb 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -590,6 +590,47 @@ class SparkSession private[sql] (
@Experimental
def addArtifact(uri: URI): Unit = client.addArtifact(uri)
+ /**
+ * Add a single in-memory artifact to the session while preserving the
directory structure
+ * specified by `target` under the session's working directory of that
particular file
+ * extension.
+ *
+ * Supported target file extensions are .jar and .class.
+ *
+ * ==Example==
+ * {{{
+ * addArtifact(bytesBar, "foo/bar.class")
+ * addArtifact(bytesFlat, "flat.class")
+ * // Directory structure of the session's working directory for class
files would look like:
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+ * }}}
+ *
+ * @since 4.0.0
+ */
+ @Experimental
+ def addArtifact(bytes: Array[Byte], target: String): Unit =
client.addArtifact(bytes, target)
+
+ /**
+ * Add a single artifact to the session while preserving the directory
structure specified by
+ * `target` under the session's working directory of that particular file
extension.
+ *
+ * Supported target file extensions are .jar and .class.
+ *
+ * ==Example==
+ * {{{
+ * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+ * addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+ * // Directory structure of the session's working directory for class
files would look like:
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+ * }}}
+ *
+ * @since 4.0.0
+ */
+ @Experimental
+ def addArtifact(source: String, target: String): Unit =
client.addArtifact(source, target)
+
/**
* Add one or more artifacts to the session.
*
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index f945313d2427..0c8ef8e599fb 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -284,4 +284,54 @@ class ArtifactSuite extends ConnectFunSuite with
BeforeAndAfterEach {
}
}
+
+ test("artifact with custom target") {
+ val artifactPath = artifactFilePath.resolve("smallClassFile.class")
+ val target = "sub/package/smallClassFile.class"
+ artifactManager.addArtifact(artifactPath.toString, target)
+ val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+ // Single `AddArtifactRequest`
+ assert(receivedRequests.size == 1)
+
+ val request = receivedRequests.head
+ assert(request.hasBatch)
+
+ val batch = request.getBatch
+ // Single artifact in batch
+ assert(batch.getArtifactsList.size() == 1)
+
+ val singleChunkArtifact = batch.getArtifacts(0)
+ assert(singleChunkArtifact.getName.equals(s"classes/$target"))
+ assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+ }
+
+ test("in-memory artifact with custom target") {
+ val artifactPath = artifactFilePath.resolve("smallClassFile.class")
+ val artifactBytes = Files.readAllBytes(artifactPath)
+ val target = "sub/package/smallClassFile.class"
+ artifactManager.addArtifact(artifactBytes, target)
+ val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+ // Single `AddArtifactRequest`
+ assert(receivedRequests.size == 1)
+
+ val request = receivedRequests.head
+ assert(request.hasBatch)
+
+ val batch = request.getBatch
+ // Single artifact in batch
+ assert(batch.getArtifactsList.size() == 1)
+
+ val singleChunkArtifact = batch.getArtifacts(0)
+ assert(singleChunkArtifact.getName.equals(s"classes/$target"))
+ assert(singleChunkArtifact.getData.getData ==
ByteString.copyFrom(artifactBytes))
+ }
+
+ test(
+ "When both source and target paths are given, extension conditions are
checked " +
+ "on target path") {
+ val artifactPath = artifactFilePath.resolve("smallClassFile.class")
+ assertThrows[UnsupportedOperationException] {
+ artifactManager.addArtifact(artifactPath.toString, "dummy.extension")
+ }
+ }
}
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 3cd35803d1ec..36bc60c7d63a 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.connect.client
-import java.io.{ByteArrayInputStream, InputStream, PrintStream}
+import java.io.{ByteArrayInputStream, File, InputStream, PrintStream}
import java.net.URI
import java.nio.file.{Files, Path, Paths}
import java.util.Arrays
@@ -83,14 +83,10 @@ class ArtifactManager(
uri.getScheme match {
case "file" =>
val path = Paths.get(uri)
- val artifact = path.getFileName.toString match {
- case jar if jar.endsWith(".jar") =>
- newJarArtifact(path.getFileName, new LocalFile(path))
- case cf if cf.endsWith(".class") =>
- newClassArtifact(path.getFileName, new LocalFile(path))
- case other =>
- throw new UnsupportedOperationException(s"Unsupported file format:
$other")
- }
+ val artifact = Artifact.newArtifactFromExtension(
+ path.getFileName.toString,
+ path.getFileName,
+ new LocalFile(path))
Seq[Artifact](artifact)
case "ivy" =>
@@ -108,6 +104,55 @@ class ArtifactManager(
*/
def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+ /**
+ * Add a single in-memory artifact to the session while preserving the
directory structure
+ * specified by `target` under the session's working directory of that
particular file
+ * extension.
+ *
+ * Supported target file extensions are .jar and .class.
+ *
+ * ==Example==
+ * {{{
+ * addArtifact(bytesBar, "foo/bar.class")
+ * addArtifact(bytesFlat, "flat.class")
+ * // Directory structure of the session's working directory for class
files would look like:
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+ * }}}
+ */
+ def addArtifact(bytes: Array[Byte], target: String): Unit = {
+ val targetPath = Paths.get(target)
+ val artifact = Artifact.newArtifactFromExtension(
+ targetPath.getFileName.toString,
+ targetPath,
+ new InMemory(bytes))
+ addArtifacts(artifact :: Nil)
+ }
+
+ /**
+ * Add a single artifact to the session while preserving the directory
structure specified by
+ * `target` under the session's working directory of that particular file
extension.
+ *
+ * Supported target file extensions are .jar and .class.
+ *
+ * ==Example==
+ * {{{
+ * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+ * addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+ * // Directory structure of the session's working directory for class
files would look like:
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+ * }}}
+ */
+ def addArtifact(source: String, target: String): Unit = {
+ val targetPath = Paths.get(target)
+ val artifact = Artifact.newArtifactFromExtension(
+ targetPath.getFileName.toString,
+ targetPath,
+ new LocalFile(Paths.get(source)))
+ addArtifacts(artifact :: Nil)
+ }
+
/**
* Add multiple artifacts to the session.
*
@@ -366,12 +411,26 @@ object Artifact {
val JAR_PREFIX: Path = Paths.get("jars")
val CACHE_PREFIX: Path = Paths.get("cache")
- def newJarArtifact(fileName: Path, storage: LocalData): Artifact = {
- newArtifact(JAR_PREFIX, ".jar", fileName, storage)
+ def newArtifactFromExtension(
+ fileName: String,
+ targetFilePath: Path,
+ storage: LocalData): Artifact = {
+ fileName match {
+ case jar if jar.endsWith(".jar") =>
+ newJarArtifact(targetFilePath, storage)
+ case cf if cf.endsWith(".class") =>
+ newClassArtifact(targetFilePath, storage)
+ case other =>
+ throw new UnsupportedOperationException(s"Unsupported file format:
$other")
+ }
+ }
+
+ def newJarArtifact(targetFilePath: Path, storage: LocalData): Artifact = {
+ newArtifact(JAR_PREFIX, ".jar", targetFilePath, storage)
}
- def newClassArtifact(fileName: Path, storage: LocalData): Artifact = {
- newArtifact(CLASS_PREFIX, ".class", fileName, storage)
+ def newClassArtifact(targetFilePath: Path, storage: LocalData): Artifact = {
+ newArtifact(CLASS_PREFIX, ".class", targetFilePath, storage)
}
def newCacheArtifact(id: String, storage: LocalData): Artifact = {
@@ -412,14 +471,29 @@ object Artifact {
jars.map(p => Paths.get(p)).map(path => newJarArtifact(path.getFileName,
new LocalFile(path)))
}
+ private def concatenatePaths(basePath: Path, otherPath: Path): Path = {
+ // We avoid using the `.resolve()` method here to ensure that we're
concatenating the two
+ // paths even if `otherPath` is absolute.
+ val concatenatedPath = Paths.get(basePath.toString, otherPath.toString)
+ // Note: The normalized resulting path may still reference parent
directories if the
+ // `otherPath` contains sufficient number of parent operators (i.e "..").
+ // Example: `basePath` = "/base", `otherPath` = "subdir/../../file.txt"
+ // Then, `concatenatedPath` = "/base/subdir/../../file.txt"
+ // and `normalizedPath` = "/base/file.txt".
+ val normalizedPath = concatenatedPath.normalize()
+ // Verify that the prefix of the `normalizedPath` starts with `basePath/`.
+ require(
+ normalizedPath != basePath &&
normalizedPath.startsWith(s"$basePath${File.separator}"))
+ normalizedPath
+ }
+
private def newArtifact(
prefix: Path,
requiredSuffix: String,
- fileName: Path,
+ targetFilePath: Path,
storage: LocalData): Artifact = {
- require(!fileName.isAbsolute)
- require(fileName.toString.endsWith(requiredSuffix))
- new Artifact(prefix.resolve(fileName), storage)
+ require(targetFilePath.toString.endsWith(requiredSuffix))
+ new Artifact(concatenatePaths(prefix, targetFilePath), storage)
}
/**
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index c2776e65392f..cd1dfbd2e734 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -301,6 +301,43 @@ private[sql] class SparkConnectClient(
*/
def addArtifact(uri: URI): Unit = artifactManager.addArtifact(uri)
+ /**
+ * Add a single in-memory artifact to the session while preserving the
directory structure
+ * specified by `target` under the session's working directory of that
particular file
+ * extension.
+ *
+ * Supported target file extensions are .jar and .class.
+ *
+ * ==Example==
+ * {{{
+ * addArtifact(bytesBar, "foo/bar.class")
+ * addArtifact(bytesFlat, "flat.class")
+ * // Directory structure of the session's working directory for class
files would look like:
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+ * }}}
+ */
+ def addArtifact(bytes: Array[Byte], target: String): Unit =
+ artifactManager.addArtifact(bytes, target)
+
+ /**
+ * Add a single artifact to the session while preserving the directory
structure specified by
+ * `target` under the session's working directory of that particular file
extension.
+ *
+ * Supported target file extensions are .jar and .class.
+ *
+ * ==Example==
+ * {{{
+ * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+ * addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+ * // Directory structure of the session's working directory for class
files would look like:
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+ * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+ * }}}
+ */
+ def addArtifact(source: String, target: String): Unit =
+ artifactManager.addArtifact(source, target)
+
/**
* Add multiple artifacts to the session.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]