This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ec02224371f [SPARK-42748][CONNECT] Server-side Artifact Management
ec02224371f is described below

commit ec02224371fbb4ff4437cc449c45e88adde482d7
Author: vicennial <venkata.gud...@databricks.com>
AuthorDate: Wed Mar 22 20:06:48 2023 -0400

    [SPARK-42748][CONNECT] Server-side Artifact Management
    
    ### What changes were proposed in this pull request?
    
    This PR adds server-side artifact management as a follow up to the 
client-side artifact transfer introduced in 
https://github.com/apache/spark/pull/40256.
    
    Note: The artifacts added on the server are visible to **all users** of the 
cluster. This is a limitation of the current spark architecture (unisolated 
classloaders).
    
    Apart from storing generic artifacts, we handle jars and classfiles in 
specific ways:
    
    - Jars:
      - Jars may be added but not removed or overwritten.
      - Added jars would be visible to **all** users/tasks/queries.
    - Classfiles:
      - Classfiles may not be explicitly removed but are allowed to be 
overwritten.
      - We piggyback on top of the REPL architecture to serve classfiles to the 
executors
        -  If a REPL is initialized, classfiles are stored in the existing 
`spark.repl.class.outputDir` and share the URI with `spark.repl.class.uri`.
        - If a REPL is not being used, we use a custom directory (root: 
`sparkContext. sparkConnectArtifactDirectory`) to store classfiles and point 
the `spark.repl.class.uri` towards it.
      - Class files are visible to **all** users/tasks/queries.
    
    ### Why are the changes needed?
    
    https://github.com/apache/spark/pull/40256 implements the client-side 
transfer of artifacts to the server but currently, the server does not process 
these requests.
    
    We need to implement a server-side management mechanism to handle the 
storage of these artifacts on the driver as well as perform further processing 
(such as adding jars and moving class files to the right directories).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, a new experimental API but no behavioural changes.
    A new method called `sparkConnectArtifactDirectory` is accessible through 
SparkContext (the directory storing all artifacts from SparkConnect)
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #40368 from vicennial/SPARK-42748.
    
    Lead-authored-by: vicennial <venkata.gud...@databricks.com>
    Co-authored-by: Venkata Sai Akhil Gudesa <venkata.gud...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   2 +-
 .../spark/sql/connect/client/ArtifactSuite.scala   |   2 +-
 .../sql/connect/client/util/ConnectFunSuite.scala  |   3 +-
 .../src/test/resources/artifact-tests/Hello.class  | Bin 0 -> 5671 bytes
 .../test/resources/artifact-tests/crc/Hello.txt    |   1 +
 .../test/resources/artifact-tests/crc/README.md    |   0
 .../resources/artifact-tests/crc/junitLargeJar.txt |   0
 .../artifact-tests/crc/smallClassFile.txt          |   0
 .../artifact-tests/crc/smallClassFileDup.txt       |   0
 .../test/resources/artifact-tests/crc/smallJar.txt |   0
 .../resources/artifact-tests/junitLargeJar.jar     | Bin
 .../resources/artifact-tests/smallClassFile.class  | Bin
 .../artifact-tests/smallClassFileDup.class         | Bin
 .../src/test/resources/artifact-tests/smallJar.jar | Bin
 connector/connect/server/pom.xml                   |   7 +
 .../artifact/SparkConnectArtifactManager.scala     | 162 +++++++++++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   5 +-
 .../service/SparkConnectAddArtifactsHandler.scala  | 239 ++++++++++++++++
 .../service/SparkConnectAnalyzeHandler.scala       |  22 +-
 .../sql/connect/service/SparkConnectService.scala  |  16 +-
 .../service/SparkConnectStreamHandler.scala        |   3 +-
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |  21 +-
 .../apache/spark/sql/connect/ResourceHelper.scala} |  24 +-
 .../connect/artifact/ArtifactManagerSuite.scala    | 110 +++++++
 .../connect/service/AddArtifactsHandlerSuite.scala | 316 +++++++++++++++++++++
 .../main/scala/org/apache/spark/SparkContext.scala |  23 +-
 26 files changed, 891 insertions(+), 65 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index cb62977d937..e53eed013e3 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -69,7 +69,7 @@ class PlanGenerationTestSuite
   // Borrowed from SparkFunSuite
   private val regenerateGoldenFiles: Boolean = 
System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
 
-  protected val queryFilePath: Path = commonResourcePath.resolve("queries")
+  protected val queryFilePath: Path = 
commonResourcePath.resolve("query-tests/queries")
 
   // A relative path to /connector/connect/server, used by 
`ProtoToParsedPlanTestSuite` to run
   // with the datasource.
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index f3d2e5be954..09072b8d6eb 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -75,7 +75,7 @@ class ArtifactSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
   }
 
   private val CHUNK_SIZE: Int = 32 * 1024
-  protected def artifactFilePath: Path = 
baseResourcePath.resolve("artifact-tests")
+  protected def artifactFilePath: Path = 
commonResourcePath.resolve("artifact-tests")
   protected def artifactCrcPath: Path = artifactFilePath.resolve("crc")
 
   private def getCrcValues(filePath: Path): Seq[Long] = {
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
index 1ece0838b1b..0a1e794c8e7 100755
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
@@ -52,7 +52,6 @@ trait ConnectFunSuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
       "common",
       "src",
       "test",
-      "resources",
-      "query-tests").toAbsolutePath
+      "resources").toAbsolutePath
   }
 }
diff --git 
a/connector/connect/common/src/test/resources/artifact-tests/Hello.class 
b/connector/connect/common/src/test/resources/artifact-tests/Hello.class
new file mode 100644
index 00000000000..56725764de2
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/artifact-tests/Hello.class differ
diff --git 
a/connector/connect/common/src/test/resources/artifact-tests/crc/Hello.txt 
b/connector/connect/common/src/test/resources/artifact-tests/crc/Hello.txt
new file mode 100644
index 00000000000..799ce78f11b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/artifact-tests/crc/Hello.txt
@@ -0,0 +1 @@
+553633018
\ No newline at end of file
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/crc/README.md 
b/connector/connect/common/src/test/resources/artifact-tests/crc/README.md
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/crc/README.md
rename to 
connector/connect/common/src/test/resources/artifact-tests/crc/README.md
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/crc/junitLargeJar.txt
 
b/connector/connect/common/src/test/resources/artifact-tests/crc/junitLargeJar.txt
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/crc/junitLargeJar.txt
rename to 
connector/connect/common/src/test/resources/artifact-tests/crc/junitLargeJar.txt
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/crc/smallClassFile.txt
 
b/connector/connect/common/src/test/resources/artifact-tests/crc/smallClassFile.txt
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/crc/smallClassFile.txt
rename to 
connector/connect/common/src/test/resources/artifact-tests/crc/smallClassFile.txt
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/crc/smallClassFileDup.txt
 
b/connector/connect/common/src/test/resources/artifact-tests/crc/smallClassFileDup.txt
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/crc/smallClassFileDup.txt
rename to 
connector/connect/common/src/test/resources/artifact-tests/crc/smallClassFileDup.txt
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/crc/smallJar.txt
 b/connector/connect/common/src/test/resources/artifact-tests/crc/smallJar.txt
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/crc/smallJar.txt
rename to 
connector/connect/common/src/test/resources/artifact-tests/crc/smallJar.txt
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/junitLargeJar.jar
 b/connector/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/junitLargeJar.jar
rename to 
connector/connect/common/src/test/resources/artifact-tests/junitLargeJar.jar
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/smallClassFile.class
 
b/connector/connect/common/src/test/resources/artifact-tests/smallClassFile.class
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/smallClassFile.class
rename to 
connector/connect/common/src/test/resources/artifact-tests/smallClassFile.class
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/smallClassFileDup.class
 
b/connector/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/smallClassFileDup.class
rename to 
connector/connect/common/src/test/resources/artifact-tests/smallClassFileDup.class
diff --git 
a/connector/connect/client/jvm/src/test/resources/artifact-tests/smallJar.jar 
b/connector/connect/common/src/test/resources/artifact-tests/smallJar.jar
similarity index 100%
rename from 
connector/connect/client/jvm/src/test/resources/artifact-tests/smallJar.jar
rename to 
connector/connect/common/src/test/resources/artifact-tests/smallJar.jar
diff --git a/connector/connect/server/pom.xml b/connector/connect/server/pom.xml
index 4d8e082a2db..838d7bf2bd3 100644
--- a/connector/connect/server/pom.xml
+++ b/connector/connect/server/pom.xml
@@ -233,6 +233,13 @@
       <version>2.1.214</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-repl_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
new file mode 100644
index 00000000000..9ed5fd945f2
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.artifact
+
+import java.net.{URL, URLClassLoader}
+import java.nio.file.{Files, Path, Paths, StandardCopyOption}
+import java.util.concurrent.CopyOnWriteArrayList
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
+
+/**
+ * The Artifact Manager for the [[SparkConnectService]].
+ *
+ * This class handles the storage of artifacts as well as preparing the 
artifacts for use.
+ * Currently, jars and classfile artifacts undergo additional processing:
+ *   - Jars are automatically added to the underlying [[SparkContext]] and are 
accessible by all
+ *     users of the cluster.
+ *   - Class files are moved into a common directory that is shared among all 
users of the
+ *     cluster. Note: Under a multi-user setup, class file conflicts may occur 
between user
+ *     classes as the class file directory is shared.
+ */
+class SparkConnectArtifactManager private[connect] {
+
+  // The base directory where all artifacts are stored.
+  // Note: If a REPL is attached to the cluster, class file artifacts are 
stored in the
+  // REPL's output directory.
+  private[connect] lazy val artifactRootPath = SparkContext.getActive match {
+    case Some(sc) =>
+      sc.sparkConnectArtifactDirectory.toPath
+    case None =>
+      throw new RuntimeException("SparkContext is uninitialized!")
+  }
+  private[connect] lazy val artifactRootURI = {
+    val fileServer = SparkEnv.get.rpcEnv.fileServer
+    fileServer.addDirectory("artifacts", artifactRootPath.toFile)
+  }
+
+  // The base directory where all class files are stored.
+  // Note: If a REPL is attached to the cluster, we piggyback on the existing 
REPL output
+  // directory to store class file artifacts.
+  private[connect] lazy val classArtifactDir = SparkEnv.get.conf
+    .getOption("spark.repl.class.outputDir")
+    .map(p => Paths.get(p))
+    .getOrElse(artifactRootPath.resolve("classes"))
+
+  private[connect] lazy val classArtifactUri: String =
+    SparkEnv.get.conf.getOption("spark.repl.class.uri") match {
+      case Some(uri) => uri
+      case None =>
+        throw new RuntimeException("Class artifact URI had not been 
initialised in SparkContext!")
+    }
+
+  private val jarsList = new CopyOnWriteArrayList[Path]
+
+  /**
+   * Get the URLs of all jar artifacts added through the 
[[SparkConnectService]].
+   *
+   * @return
+   */
+  def getSparkConnectAddedJars: Seq[URL] = 
jarsList.asScala.map(_.toUri.toURL).toSeq
+
+  /**
+   * Add and prepare a staged artifact (i.e an artifact that has been rebuilt 
locally from bytes
+   * over the wire) for use.
+   *
+   * @param session
+   * @param remoteRelativePath
+   * @param serverLocalStagingPath
+   */
+  private[connect] def addArtifact(
+      session: SparkSession,
+      remoteRelativePath: Path,
+      serverLocalStagingPath: Path): Unit = {
+    require(!remoteRelativePath.isAbsolute)
+    if (remoteRelativePath.startsWith("classes/")) {
+      // Move class files to common location (shared among all users)
+      val target = 
classArtifactDir.resolve(remoteRelativePath.toString.stripPrefix("classes/"))
+      Files.createDirectories(target.getParent)
+      // Allow overwriting class files to capture updates to classes.
+      Files.move(serverLocalStagingPath, target, 
StandardCopyOption.REPLACE_EXISTING)
+    } else {
+      val target = artifactRootPath.resolve(remoteRelativePath)
+      Files.createDirectories(target.getParent)
+      // Disallow overwriting jars because spark doesn't support removing jars 
that were
+      // previously added,
+      if (Files.exists(target)) {
+        throw new RuntimeException(
+          s"Duplicate Jar: $remoteRelativePath. " +
+            s"Jars cannot be overwritten.")
+      }
+      Files.move(serverLocalStagingPath, target)
+      if (remoteRelativePath.startsWith("jars")) {
+        // Adding Jars to the underlying spark context (visible to all users)
+        session.sessionState.resourceLoader.addJar(target.toString)
+        jarsList.add(target)
+      }
+    }
+  }
+}
+
+object SparkConnectArtifactManager {
+
+  private var _activeArtifactManager: SparkConnectArtifactManager = _
+
+  /**
+   * Obtain the active artifact manager or create a new artifact manager.
+   *
+   * @return
+   */
+  def getOrCreateArtifactManager: SparkConnectArtifactManager = {
+    if (_activeArtifactManager == null) {
+      _activeArtifactManager = new SparkConnectArtifactManager
+    }
+    _activeArtifactManager
+  }
+
+  private lazy val artifactManager = getOrCreateArtifactManager
+
+  /**
+   * Obtain a classloader that contains jar and classfile artifacts on the 
classpath.
+   *
+   * @return
+   */
+  def classLoaderWithArtifacts: ClassLoader = {
+    val urls = artifactManager.getSparkConnectAddedJars :+
+      artifactManager.classArtifactDir.toUri.toURL
+    new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
+  }
+
+  /**
+   * Run a segment of code utilising a classloader that contains jar and 
classfile artifacts on
+   * the classpath.
+   *
+   * @param thunk
+   * @tparam T
+   * @return
+   */
+  def withArtifactClassLoader[T](thunk: => T): T = {
+    Utils.withContextClassLoader(classLoaderWithArtifacts) {
+      thunk
+    }
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 375002798c6..0faa2f74981 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, 
Inner, JoinType, L
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, 
CommandResult, Deduplicate, Except, Intersect, LocalRelation, LogicalPlan, 
Project, Sample, Sort, SubqueryAlias, Union, Unpivot, UnresolvedHint}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput, LiteralValueProtoConverter, UdfPacket}
 import 
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
 import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
@@ -988,7 +989,9 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformScalarScalaUDF(fun: 
proto.CommonInlineUserDefinedFunction): ScalaUDF = {
     val udf = fun.getScalarScalaUdf
     val udfPacket =
-      Utils.deserialize[UdfPacket](udf.getPayload.toByteArray, 
Utils.getContextOrSparkClassLoader)
+      Utils.deserialize[UdfPacket](
+        udf.getPayload.toByteArray,
+        SparkConnectArtifactManager.classLoaderWithArtifacts)
     ScalaUDF(
       function = udfPacket.function,
       dataType = udfPacket.outputEncoder.dataType,
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
new file mode 100644
index 00000000000..7f447c9672f
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import com.google.common.io.CountingOutputStream
+import io.grpc.stub.StreamObserver
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedOutputStream, CRC32}
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse}
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.util.Utils
+
+/**
+ * Handles [[AddArtifactsRequest]]s for the [[SparkConnectService]].
+ *
+ * @param responseObserver
+ */
+class SparkConnectAddArtifactsHandler(val responseObserver: 
StreamObserver[AddArtifactsResponse])
+    extends StreamObserver[AddArtifactsRequest] {
+
+  // Temporary directory where artifacts are rebuilt from the bytes sent over 
the wire.
+  protected val stagingDir: Path = Utils.createTempDir().toPath
+  protected val stagedArtifacts: mutable.Buffer[StagedArtifact] =
+    mutable.Buffer.empty[StagedArtifact]
+  // If not null, indicates the currently active chunked artifact that is 
being rebuilt from
+  // several [[AddArtifactsRequest]]s.
+  private var chunkedArtifact: StagedChunkedArtifact = _
+  private var holder: SessionHolder = _
+  private def artifactManager: SparkConnectArtifactManager =
+    SparkConnectArtifactManager.getOrCreateArtifactManager
+
+  override def onNext(req: AddArtifactsRequest): Unit = {
+    if (this.holder == null) {
+      this.holder = SparkConnectService.getOrCreateIsolatedSession(
+        req.getUserContext.getUserId,
+        req.getSessionId)
+    }
+
+    if (req.hasBeginChunk) {
+      // The beginning of a multi-chunk artifact.
+      require(chunkedArtifact == null)
+      chunkedArtifact = writeArtifactToFile(req.getBeginChunk)
+    } else if (req.hasChunk) {
+      // We are currently processing a multi-chunk artifact
+      require(chunkedArtifact != null && !chunkedArtifact.isFinished)
+      chunkedArtifact.write(req.getChunk)
+
+      if (chunkedArtifact.isFinished) {
+        chunkedArtifact.close()
+        // Unset the currently active chunked artifact.
+        chunkedArtifact = null
+      }
+    } else if (req.hasBatch) {
+      // Each artifact in the batch is single-chunked.
+      req.getBatch.getArtifactsList.forEach(artifact => 
writeArtifactToFile(artifact).close())
+    } else {
+      throw new UnsupportedOperationException(s"Unsupported data transfer 
request: $req")
+    }
+  }
+
+  override def onError(throwable: Throwable): Unit = {
+    Utils.deleteRecursively(stagingDir.toFile)
+    responseObserver.onError(throwable)
+  }
+
+  protected def addStagedArtifactToArtifactManager(artifact: StagedArtifact): 
Unit = {
+    artifactManager.addArtifact(holder.session, artifact.path, 
artifact.stagedPath)
+  }
+
+  /**
+   * Process all the staged artifacts built in this stream.
+   *
+   * @return
+   */
+  protected def flushStagedArtifacts(): Seq[ArtifactSummary] = {
+    // Non-lazy transformation when using Buffer.
+    stagedArtifacts.map { artifact =>
+      // We do not store artifacts that fail the CRC. The failure is reported 
in the artifact
+      // summary and it is up to the client to decide whether to retry sending 
the artifact.
+      if (artifact.getCrcStatus.contains(true)) {
+        addStagedArtifactToArtifactManager(artifact)
+      }
+      artifact.summary()
+    }.toSeq
+  }
+
+  protected def cleanUpStagedArtifacts(): Unit = 
Utils.deleteRecursively(stagingDir.toFile)
+
+  override def onCompleted(): Unit = {
+    val artifactSummaries = flushStagedArtifacts()
+    // Add the artifacts to the session and return the summaries to the client.
+    val builder = proto.AddArtifactsResponse.newBuilder()
+    artifactSummaries.foreach(summary => builder.addArtifacts(summary))
+    // Delete temp dir
+    cleanUpStagedArtifacts()
+
+    // Send the summaries and close
+    responseObserver.onNext(builder.build())
+    responseObserver.onCompleted()
+  }
+
+  /**
+   * Create a (temporary) file for a single-chunk artifact.
+   */
+  private def writeArtifactToFile(
+      artifact: proto.AddArtifactsRequest.SingleChunkArtifact): StagedArtifact 
= {
+    val stagedDep = new StagedArtifact(artifact.getName)
+    stagedArtifacts += stagedDep
+    stagedDep.write(artifact.getData)
+    stagedDep
+  }
+
+  /**
+   * Create a (temporary) file for the multi-chunk artifact and write the 
initial chunk. Further
+   * chunks can be appended to the file.
+   */
+  private def writeArtifactToFile(
+      artifact: proto.AddArtifactsRequest.BeginChunkedArtifact): 
StagedChunkedArtifact = {
+    val stagedChunkedArtifact =
+      new StagedChunkedArtifact(artifact.getName, artifact.getNumChunks, 
artifact.getTotalBytes)
+    stagedArtifacts += stagedChunkedArtifact
+    stagedChunkedArtifact.write(artifact.getInitialChunk)
+    stagedChunkedArtifact
+  }
+
+  /**
+   * Handles rebuilding an artifact from bytes sent over the wire.
+   */
+  class StagedArtifact(val name: String) {
+    val path: Path = Paths.get(name)
+    val stagedPath: Path = stagingDir.resolve(path)
+
+    Files.createDirectories(stagedPath.getParent)
+
+    private val fileOut = Files.newOutputStream(stagedPath)
+    private val countingOut = new CountingOutputStream(fileOut)
+    private val checksumOut = new CheckedOutputStream(countingOut, new CRC32)
+
+    private val builder = ArtifactSummary.newBuilder().setName(name)
+    private var artifactSummary: ArtifactSummary = _
+    protected var isCrcSuccess: Boolean = _
+
+    protected def updateCrc(isSuccess: Boolean): Unit = {
+      isCrcSuccess = isSuccess
+    }
+
+    def getCrcStatus: Option[Boolean] = Option(isCrcSuccess)
+
+    def write(dataChunk: proto.AddArtifactsRequest.ArtifactChunk): Unit = {
+      try dataChunk.getData.writeTo(checksumOut)
+      catch {
+        case NonFatal(e) =>
+          close()
+          throw e
+      }
+      updateCrc(checksumOut.getChecksum.getValue == dataChunk.getCrc)
+      checksumOut.getChecksum.reset()
+    }
+
+    def close(): Unit = {
+      if (artifactSummary == null) {
+        checksumOut.close()
+        artifactSummary = builder
+          .setName(name)
+          .setIsCrcSuccessful(getCrcStatus.getOrElse(false))
+          .build()
+      }
+    }
+
+    def summary(): ArtifactSummary = {
+      require(artifactSummary != null)
+      artifactSummary
+    }
+  }
+
+  /**
+   * Extends [[StagedArtifact]] to handle multi-chunk artifacts.
+   *
+   * @param name
+   * @param numChunks
+   * @param totalBytes
+   */
+  class StagedChunkedArtifact(name: String, numChunks: Long, totalBytes: Long)
+      extends StagedArtifact(name) {
+
+    private var remainingChunks = numChunks
+    private var totalBytesProcessed = 0L
+    private var isFirstCrcUpdate = true
+
+    def isFinished: Boolean = remainingChunks == 0
+
+    override protected def updateCrc(isSuccess: Boolean): Unit = {
+      // The overall artifact CRC is a success if and only if all the 
individual chunk CRCs match.
+      isCrcSuccess = isSuccess && (isCrcSuccess || isFirstCrcUpdate)
+      isFirstCrcUpdate = false
+    }
+
+    override def write(dataChunk: proto.AddArtifactsRequest.ArtifactChunk): 
Unit = {
+      if (remainingChunks == 0) {
+        throw new RuntimeException(
+          s"Excessive data chunks for artifact: $name, " +
+            s"expected $numChunks chunks in total. Processed 
$totalBytesProcessed bytes out of" +
+            s" $totalBytes bytes.")
+      }
+      super.write(dataChunk)
+      totalBytesProcessed += dataChunk.getData.size()
+      remainingChunks -= 1
+    }
+
+    override def close(): Unit = {
+      if (remainingChunks != 0 || totalBytesProcessed != totalBytes) {
+        throw new RuntimeException(
+          s"Missing data chunks for artifact: $name. Expected " +
+            s"$numChunks chunks and received ${numChunks - remainingChunks} 
chunks. Processed" +
+            s" $totalBytesProcessed bytes out of $totalBytes bytes.")
+      }
+      super.close()
+    }
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index a03b827b60e..671574353a1 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -24,6 +24,7 @@ import io.grpc.stub.StreamObserver
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput, StorageLevelProtoConverter}
 import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, 
FormattedMode, SimpleMode}
@@ -32,17 +33,18 @@ private[connect] class SparkConnectAnalyzeHandler(
     responseObserver: StreamObserver[proto.AnalyzePlanResponse])
     extends Logging {
 
-  def handle(request: proto.AnalyzePlanRequest): Unit = {
-    val session =
-      SparkConnectService
-        .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
-        .session
-    session.withActive {
-      val response = process(request, session)
-      responseObserver.onNext(response)
-      responseObserver.onCompleted()
+  def handle(request: proto.AnalyzePlanRequest): Unit =
+    SparkConnectArtifactManager.withArtifactClassLoader {
+      val session =
+        SparkConnectService
+          .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
+          .session
+      session.withActive {
+        val response = process(request, session)
+        responseObserver.onNext(response)
+        responseObserver.onCompleted()
+      }
     }
-  }
 
   def process(
       request: proto.AnalyzePlanRequest,
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
old mode 100755
new mode 100644
index a9442a8c92c..a4474ac64c1
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -188,20 +188,8 @@ class SparkConnectService(debug: Boolean)
    * @return
    */
   override def addArtifacts(responseObserver: 
StreamObserver[AddArtifactsResponse])
-      : StreamObserver[AddArtifactsRequest] = {
-    // TODO: Handle artifact files
-    // No-Op StreamObserver
-    new StreamObserver[AddArtifactsRequest] {
-      override def onNext(v: AddArtifactsRequest): Unit = {}
-
-      override def onError(throwable: Throwable): Unit = 
responseObserver.onError(throwable)
-
-      override def onCompleted(): Unit = {
-        
responseObserver.onNext(proto.AddArtifactsResponse.newBuilder().build())
-        responseObserver.onCompleted()
-      }
-    }
-  }
+      : StreamObserver[AddArtifactsRequest] = new 
SparkConnectAddArtifactsHandler(
+    responseObserver)
 }
 
 /**
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 335b871d499..74983eeecf3 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -28,6 +28,7 @@ import org.apache.spark.connect.proto.{ExecutePlanRequest, 
ExecutePlanResponse}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import 
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
 import 
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
@@ -42,7 +43,7 @@ import org.apache.spark.util.ThreadUtils
 class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResponse])
     extends Logging {
 
-  def handle(v: ExecutePlanRequest): Unit = {
+  def handle(v: ExecutePlanRequest): Unit = 
SparkConnectArtifactManager.withArtifactClassLoader {
     val session =
       SparkConnectService
         .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index e20a6159cc8..7071e5300d8 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -58,7 +58,10 @@ import org.apache.spark.util.Utils
  * }}}
  */
 // scalastyle:on
-class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession 
{
+class ProtoToParsedPlanTestSuite
+    extends SparkFunSuite
+    with SharedSparkSession
+    with ResourceHelper {
   val url = "jdbc:h2:mem:testdb0"
   var conn: java.sql.Connection = null
 
@@ -105,19 +108,9 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite 
with SharedSparkSession {
       .set(org.apache.spark.sql.internal.SQLConf.ANSI_ENABLED.key, 
false.toString)
   }
 
-  protected val baseResourcePath: Path = {
-    getWorkspaceFilePath(
-      "connector",
-      "connect",
-      "common",
-      "src",
-      "test",
-      "resources",
-      "query-tests").toAbsolutePath
-  }
-
-  protected val inputFilePath: Path = baseResourcePath.resolve("queries")
-  protected val goldenFilePath: Path = 
baseResourcePath.resolve("explain-results")
+  protected val suiteBaseResourcePath = 
commonResourcePath.resolve("query-tests")
+  protected val inputFilePath: Path = suiteBaseResourcePath.resolve("queries")
+  protected val goldenFilePath: Path = 
suiteBaseResourcePath.resolve("explain-results")
   private val emptyProps: util.Map[String, String] = 
util.Collections.emptyMap()
 
   private val analyzer = {
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ResourceHelper.scala
old mode 100755
new mode 100644
similarity index 60%
copy from 
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
copy to 
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ResourceHelper.scala
index 1ece0838b1b..dad89e0b9b0
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ResourceHelper.scala
@@ -14,32 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.connect.client.util
+package org.apache.spark.sql.connect
 
 import java.nio.file.Path
 
-import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+import org.apache.spark.SparkFunSuite
 
-/**
- * The basic testsuite the client tests should extend from.
- */
-trait ConnectFunSuite extends AnyFunSuite { // scalastyle:ignore funsuite
-
-  // Borrowed from SparkFunSuite
-  protected def getWorkspaceFilePath(first: String, more: String*): Path = {
-    if (!(sys.props.contains("spark.test.home") || 
sys.env.contains("SPARK_HOME"))) {
-      fail("spark.test.home or SPARK_HOME is not set.")
-    }
-    val sparkHome = sys.props.getOrElse("spark.test.home", 
sys.env("SPARK_HOME"))
-    java.nio.file.Paths.get(sparkHome, first +: more: _*)
-  }
+trait ResourceHelper extends SparkFunSuite {
 
   protected val baseResourcePath: Path = {
     getWorkspaceFilePath(
       "connector",
       "connect",
-      "client",
-      "jvm",
+      "server",
       "src",
       "test",
       "resources").toAbsolutePath
@@ -52,7 +39,6 @@ trait ConnectFunSuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
       "common",
       "src",
       "test",
-      "resources",
-      "query-tests").toAbsolutePath
+      "resources").toAbsolutePath
   }
 }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
new file mode 100644
index 00000000000..6c661cbe1bb
--- /dev/null
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.artifact
+
+import java.nio.file.Paths
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.connect.ResourceHelper
+import org.apache.spark.sql.connect.service.SparkConnectService
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+class ArtifactManagerSuite extends SharedSparkSession with ResourceHelper {
+
+  override protected def sparkConf: SparkConf = {
+    val conf = super.sparkConf
+    conf.set("spark.plugins", 
"org.apache.spark.sql.connect.SparkConnectPlugin")
+  }
+
+  private val artifactPath = commonResourcePath.resolve("artifact-tests")
+  private lazy val artifactManager = 
SparkConnectArtifactManager.getOrCreateArtifactManager
+
+  test("Jar artifacts are added to spark session") {
+    val copyDir = Utils.createTempDir().toPath
+    FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
+    val stagingPath = copyDir.resolve("smallJar.jar")
+    val remotePath = Paths.get("jars/smallJar.jar")
+    artifactManager.addArtifact(spark, remotePath, stagingPath)
+
+    val jarList = spark.sparkContext.listJars()
+    assert(jarList.exists(_.contains(remotePath.toString)))
+  }
+
+  test("Class artifacts are added to the correct directory.") {
+    val copyDir = Utils.createTempDir().toPath
+    FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
+    val stagingPath = copyDir.resolve("smallClassFile.class")
+    val remotePath = Paths.get("classes/smallClassFile.class")
+    assert(stagingPath.toFile.exists())
+    artifactManager.addArtifact(spark, remotePath, stagingPath)
+
+    val classFileDirectory = artifactManager.classArtifactDir
+    val movedClassFile = 
classFileDirectory.resolve("smallClassFile.class").toFile
+    assert(movedClassFile.exists())
+  }
+
+  test("Class file artifacts are added to SC classloader") {
+    val copyDir = Utils.createTempDir().toPath
+    FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
+    val stagingPath = copyDir.resolve("Hello.class")
+    val remotePath = Paths.get("classes/Hello.class")
+    assert(stagingPath.toFile.exists())
+    artifactManager.addArtifact(spark, remotePath, stagingPath)
+
+    val classFileDirectory = artifactManager.classArtifactDir
+    val movedClassFile = classFileDirectory.resolve("Hello.class").toFile
+    assert(movedClassFile.exists())
+
+    val classLoader = SparkConnectArtifactManager.classLoaderWithArtifacts
+
+    val instance = classLoader
+      .loadClass("Hello")
+      .getDeclaredConstructor(classOf[String])
+      .newInstance("Talon")
+
+    val msg = instance.getClass.getMethod("msg").invoke(instance)
+    assert(msg == "Hello Talon! Nice to meet you!")
+  }
+
+  test("UDF can reference added class file") {
+    val copyDir = Utils.createTempDir().toPath
+    FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
+    val stagingPath = copyDir.resolve("Hello.class")
+    val remotePath = Paths.get("classes/Hello.class")
+    assert(stagingPath.toFile.exists())
+    artifactManager.addArtifact(spark, remotePath, stagingPath)
+
+    val classFileDirectory = artifactManager.classArtifactDir
+    val movedClassFile = classFileDirectory.resolve("Hello.class").toFile
+    assert(movedClassFile.exists())
+
+    val classLoader = SparkConnectArtifactManager.classLoaderWithArtifacts
+
+    val instance = classLoader
+      .loadClass("Hello")
+      .getDeclaredConstructor(classOf[String])
+      .newInstance("Talon")
+      .asInstanceOf[String => String]
+    val udf = org.apache.spark.sql.functions.udf(instance)
+    val session = SparkConnectService.getOrCreateIsolatedSession("c1", 
"session").session
+    session.range(10).select(udf(col("id").cast("string"))).collect()
+  }
+}
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
new file mode 100644
index 00000000000..4a4e00ad997
--- /dev/null
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import java.io.InputStream
+import java.nio.file.{Files, Path}
+
+import collection.JavaConverters._
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse}
+import org.apache.spark.sql.connect.ResourceHelper
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.ThreadUtils
+
+class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper {
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  class DummyStreamObserver(p: Promise[AddArtifactsResponse])
+      extends StreamObserver[AddArtifactsResponse] {
+    override def onNext(v: AddArtifactsResponse): Unit = p.success(v)
+    override def onError(throwable: Throwable): Unit = throw throwable
+    override def onCompleted(): Unit = {}
+  }
+
+  class TestAddArtifactsHandler(responseObserver: 
StreamObserver[AddArtifactsResponse])
+      extends SparkConnectAddArtifactsHandler(responseObserver) {
+
+    // Stop the staged artifacts from being automatically deleted
+    override protected def cleanUpStagedArtifacts(): Unit = {}
+
+    private val finalArtifacts = mutable.Buffer.empty[String]
+
+    // Record the artifacts that are sent out for final processing.
+    override protected def addStagedArtifactToArtifactManager(artifact: 
StagedArtifact): Unit = {
+      finalArtifacts.append(artifact.name)
+    }
+
+    def getFinalArtifacts: Seq[String] = finalArtifacts.toSeq
+    def stagingDirectory: Path = this.stagingDir
+    def forceCleanUp(): Unit = super.cleanUpStagedArtifacts()
+  }
+
+  protected val inputFilePath: Path = 
commonResourcePath.resolve("artifact-tests")
+  protected val crcPath: Path = inputFilePath.resolve("crc")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  private def getDataChunks(filePath: Path): Seq[ByteString] = {
+    val in = Files.newInputStream(filePath)
+    var chunkData: ByteString = readNextChunk(in)
+    val dataChunks = mutable.ListBuffer.empty[ByteString]
+    while (chunkData != ByteString.empty()) {
+      dataChunks.append(chunkData)
+      chunkData = readNextChunk(in)
+    }
+    dataChunks.toSeq
+  }
+
+  private def getCrcValues(filePath: Path): Seq[Long] = {
+    val fileName = filePath.getFileName.toString
+    val crcFileName = fileName.split('.').head + ".txt"
+    Files
+      .readAllLines(crcPath.resolve(crcFileName))
+      .asScala
+      .map(_.toLong)
+      .toSeq
+  }
+
+  private def addSingleChunkArtifact(
+      handler: SparkConnectAddArtifactsHandler,
+      name: String,
+      artifactPath: Path): Unit = {
+    val dataChunks = getDataChunks(artifactPath)
+    assert(dataChunks.size == 1)
+    val bytes = dataChunks.head
+    val context = proto.UserContext
+      .newBuilder()
+      .setUserId("c1")
+      .build()
+    val fileNameNoExtension = artifactPath.getFileName.toString.split('.').head
+    val singleChunkArtifact = proto.AddArtifactsRequest.SingleChunkArtifact
+      .newBuilder()
+      .setName(name)
+      .setData(
+        proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(bytes)
+          .setCrc(getCrcValues(crcPath.resolve(fileNameNoExtension + 
".txt")).head)
+          .build())
+      .build()
+
+    val singleChunkArtifactRequest = AddArtifactsRequest
+      .newBuilder()
+      .setSessionId("abc")
+      .setUserContext(context)
+      .setBatch(
+        
proto.AddArtifactsRequest.Batch.newBuilder().addArtifacts(singleChunkArtifact).build())
+      .build()
+
+    handler.onNext(singleChunkArtifactRequest)
+  }
+
+  private def addSingleChunkArtifacts(
+      handler: SparkConnectAddArtifactsHandler,
+      names: Seq[String],
+      artifactPaths: Seq[Path]): Unit = {
+    names.zip(artifactPaths).foreach { case (name, path) =>
+      addSingleChunkArtifact(handler, name, path)
+    }
+  }
+
+  private def addChunkedArtifact(
+      handler: SparkConnectAddArtifactsHandler,
+      name: String,
+      artifactPath: Path): Unit = {
+    val dataChunks = getDataChunks(artifactPath)
+    val crcs = getCrcValues(artifactPath)
+    assert(dataChunks.size == crcs.size)
+    val artifactChunks = dataChunks.zip(crcs).map { case (chunk, crc) =>
+      
proto.AddArtifactsRequest.ArtifactChunk.newBuilder().setData(chunk).setCrc(crc).build()
+    }
+
+    val context = proto.UserContext
+      .newBuilder()
+      .setUserId("c1")
+      .build()
+    val beginChunkedArtifact = proto.AddArtifactsRequest.BeginChunkedArtifact
+      .newBuilder()
+      .setName(name)
+      .setNumChunks(artifactChunks.size)
+      .setTotalBytes(Files.size(artifactPath))
+      .setInitialChunk(artifactChunks.head)
+      .build()
+
+    val requestBuilder = AddArtifactsRequest
+      .newBuilder()
+      .setSessionId("abc")
+      .setUserContext(context)
+      .setBeginChunk(beginChunkedArtifact)
+
+    handler.onNext(requestBuilder.build())
+    requestBuilder.clearBeginChunk()
+    artifactChunks.drop(1).foreach { dataChunk =>
+      requestBuilder.setChunk(dataChunk)
+      handler.onNext(requestBuilder.build())
+    }
+  }
+
+  test("single chunk artifact") {
+    val promise = Promise[AddArtifactsResponse]
+    val handler = new TestAddArtifactsHandler(new DummyStreamObserver(promise))
+    try {
+      val name = "classes/smallClassFile.class"
+      val artifactPath = inputFilePath.resolve("smallClassFile.class")
+      addSingleChunkArtifact(handler, name, artifactPath)
+      handler.onCompleted()
+      val response = ThreadUtils.awaitResult(promise.future, 5.seconds)
+      val summaries = response.getArtifactsList.asScala.toSeq
+      assert(summaries.size == 1)
+      assert(summaries.head.getName == name)
+      assert(summaries.head.getIsCrcSuccessful)
+
+      val writtenFile = handler.stagingDirectory.resolve(name)
+      assert(writtenFile.toFile.exists())
+      val writtenBytes = ByteString.readFrom(Files.newInputStream(writtenFile))
+      val expectedBytes = 
ByteString.readFrom(Files.newInputStream(artifactPath))
+      assert(writtenBytes == expectedBytes)
+    } finally {
+      handler.forceCleanUp()
+    }
+  }
+
+  test("Multi chunk artifact") {
+    val promise = Promise[AddArtifactsResponse]
+    val handler = new TestAddArtifactsHandler(new DummyStreamObserver(promise))
+    try {
+      val name = "jars/junitLargeJar.jar"
+      val artifactPath = inputFilePath.resolve("junitLargeJar.jar")
+      addChunkedArtifact(handler, name, artifactPath)
+      handler.onCompleted()
+      val response = ThreadUtils.awaitResult(promise.future, 5.seconds)
+      val summaries = response.getArtifactsList.asScala.toSeq
+      assert(summaries.size == 1)
+      assert(summaries.head.getName == name)
+      assert(summaries.head.getIsCrcSuccessful)
+
+      val writtenFile = handler.stagingDirectory.resolve(name)
+      assert(writtenFile.toFile.exists())
+      val writtenBytes = ByteString.readFrom(Files.newInputStream(writtenFile))
+      val expectedByes = 
ByteString.readFrom(Files.newInputStream(artifactPath))
+      assert(writtenBytes == expectedByes)
+    } finally {
+      handler.forceCleanUp()
+    }
+  }
+
+  test("Mix of single-chunk and chunked artifacts") {
+    val promise = Promise[AddArtifactsResponse]
+    val handler = new TestAddArtifactsHandler(new DummyStreamObserver(promise))
+    try {
+      val names = Seq(
+        "classes/smallClassFile.class",
+        "jars/junitLargeJar.jar",
+        "classes/smallClassFileDup.class",
+        "jars/smallJar.jar")
+
+      val artifactPaths = Seq(
+        inputFilePath.resolve("smallClassFile.class"),
+        inputFilePath.resolve("junitLargeJar.jar"),
+        inputFilePath.resolve("smallClassFileDup.class"),
+        inputFilePath.resolve("smallJar.jar"))
+
+      addSingleChunkArtifact(handler, names.head, artifactPaths.head)
+      addChunkedArtifact(handler, names(1), artifactPaths(1))
+      addSingleChunkArtifacts(handler, names.drop(2), artifactPaths.drop(2))
+      handler.onCompleted()
+      val response = ThreadUtils.awaitResult(promise.future, 5.seconds)
+      val summaries = response.getArtifactsList.asScala.toSeq
+      assert(summaries.size == 4)
+      summaries.zip(names).foreach { case (summary, name) =>
+        assert(summary.getName == name)
+        assert(summary.getIsCrcSuccessful)
+      }
+
+      val writtenFiles = names.map(name => 
handler.stagingDirectory.resolve(name))
+      writtenFiles.zip(artifactPaths).foreach { case (writtenFile, 
artifactPath) =>
+        assert(writtenFile.toFile.exists())
+        val writtenBytes = 
ByteString.readFrom(Files.newInputStream(writtenFile))
+        val expectedByes = 
ByteString.readFrom(Files.newInputStream(artifactPath))
+        assert(writtenBytes == expectedByes)
+      }
+    } finally {
+      handler.forceCleanUp()
+    }
+  }
+
+  test("Artifacts that fail CRC are not added to the artifact manager") {
+    val promise = Promise[AddArtifactsResponse]
+    val handler = new TestAddArtifactsHandler(new DummyStreamObserver(promise))
+    try {
+      val name = "classes/smallClassFile.class"
+      val artifactPath = inputFilePath.resolve("smallClassFile.class")
+      val dataChunks = getDataChunks(artifactPath)
+      assert(dataChunks.size == 1)
+      val bytes = dataChunks.head
+      val context = proto.UserContext
+        .newBuilder()
+        .setUserId("c1")
+        .build()
+      val singleChunkArtifact = proto.AddArtifactsRequest.SingleChunkArtifact
+        .newBuilder()
+        .setName(name)
+        .setData(
+          proto.AddArtifactsRequest.ArtifactChunk
+            .newBuilder()
+            .setData(bytes)
+            // Set a dummy CRC value
+            .setCrc(12345)
+            .build())
+        .build()
+
+      val singleChunkArtifactRequest = AddArtifactsRequest
+        .newBuilder()
+        .setSessionId("abc")
+        .setUserContext(context)
+        .setBatch(
+          
proto.AddArtifactsRequest.Batch.newBuilder().addArtifacts(singleChunkArtifact).build())
+        .build()
+
+      handler.onNext(singleChunkArtifactRequest)
+      handler.onCompleted()
+      val response = ThreadUtils.awaitResult(promise.future, 5.seconds)
+      val summaries = response.getArtifactsList.asScala.toSeq
+      assert(summaries.size == 1)
+      assert(summaries.head.getName == name)
+      assert(!summaries.head.getIsCrcSuccessful)
+
+      assert(handler.getFinalArtifacts.isEmpty)
+    } finally {
+      handler.forceCleanUp()
+    }
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 43573894748..d119dae5c76 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.io._
 import java.net.URI
+import java.nio.file.Files
 import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
@@ -41,7 +42,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => 
NewInputFormat, Job => NewHad
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
 import org.apache.logging.log4j.Level
 
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Private}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.executor.{Executor, ExecutorMetrics, 
ExecutorMetricsSource}
@@ -386,6 +387,13 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.setLogLevel(Level.toLevel(upperCased))
   }
 
+  /**
+   * :: Private ::
+   * Returns the directory that stores artifacts transferred through Spark 
Connect.
+   */
+  @Private
+  private[spark] lazy val sparkConnectArtifactDirectory: File = 
Utils.createTempDir("artifacts")
+
   try {
     _conf = config.clone()
     _conf.validateSettings()
@@ -465,7 +473,18 @@ class SparkContext(config: SparkConf) extends Logging {
     SparkEnv.set(_env)
 
     // If running the REPL, register the repl's output dir with the file 
server.
-    _conf.getOption("spark.repl.class.outputDir").foreach { path =>
+    _conf.getOption("spark.repl.class.outputDir").orElse {
+      if 
(_conf.get(PLUGINS).contains("org.apache.spark.sql.connect.SparkConnectPlugin"))
 {
+        // For Spark Connect, we piggyback on the existing REPL integration to 
load class
+        // files on the executors.
+        // This is a temporary intermediate step due to unavailable 
classloader isolation.
+        val classDirectory = 
sparkConnectArtifactDirectory.toPath.resolve("classes")
+        Files.createDirectories(classDirectory)
+        Some(classDirectory.toString)
+      } else {
+        None
+      }
+    }.foreach { path =>
       val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new 
File(path))
       _conf.set("spark.repl.class.uri", replUri)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to