This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c3923aaa59 [SPARK-45486][CONNECT] Make add_artifact request idempotent
8c3923aaa59 is described below

commit 8c3923aaa59a36e91fe9d3cdde95a69d51c599ae
Author: Alice Sayutina <[email protected]>
AuthorDate: Mon Oct 16 09:21:09 2023 +0900

    [SPARK-45486][CONNECT] Make add_artifact request idempotent
    
    ### What changes were proposed in this pull request?
    
    1. Make add_artifact request idempotent i.e. subsequent requests will 
succeed if the same content is provided. This makes retrying more safe.
    2. Fix existing error handling mechanism:
    
    Before the update the error looks like that
    ```
    >>> spark.addArtifact("tmp.py", pyfile=True)
    >>> spark.addArtifact("tmp.py", pyfile=True) # fails
    2023-10-09 15:55:30,352 82873 DEBUG __iter__ Will retry call after 
60014.279746934524 ms sleep (error: <_InactiveRpcError of RPC that terminated 
with:
            status = StatusCode.UNKNOWN
            details = ""
            debug_error_string = "UNKNOWN:Error received from peer  
{grpc_message:"", grpc_status:2, 
created_time:"2023-10-09T15:55:30.351541+02:00"}"
    >)
    (this is also getting retried)
    ```
    
    Now it looks:
    
    ```
    >>> spark.addArtifact("abc.sh", file=True)
    >>> spark.addArtifact("abc.sh", file=True) # passes
    >>> # update file's content
    >>> spark.addArtifact("abc.sh", file=True) # now fails
    Traceback (most recent call last):
    [...]
    grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated 
with:
            status = StatusCode.ALREADY_EXISTS
            details = "Duplicate Artifact: files/abc.sh. Artifacts cannot be 
overwritten."
            debug_error_string = "UNKNOWN:Error received from peer  
{grpc_message:"Duplicate Artifact: files/abc.sh. Artifacts cannot be 
overwritten.", grpc_status:6, created_time:"2023-10-10T01:25:38.231317+02:00"}"
    >
    
    ```
    
    ### Why are the changes needed?
    
    Makes retrying more robust, adds user-friendly error (see above).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Mostly internal improvements
    
    ### How was this patch tested?
    Unit testing, testing against server
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43314 from cdkrot/SPARK-45485.
    
    Authored-by: Alice Sayutina <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../artifact/SparkConnectArtifactManager.scala     | 14 ++++++++---
 .../service/SparkConnectAddArtifactsHandler.scala  | 27 +++++++++++++---------
 .../connect/artifact/ArtifactManagerSuite.scala    | 23 ++++++++++++++++++
 3 files changed, 50 insertions(+), 14 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
index 804c314ce67..ad551c4b0f5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriBuilder
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+import io.grpc.Status
 import org.apache.commons.io.{FilenameUtils, FileUtils}
 import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}
 
@@ -125,11 +126,18 @@ class SparkConnectArtifactManager(sessionHolder: 
SessionHolder) extends Logging
     } else {
       val target = ArtifactUtils.concatenatePaths(artifactPath, 
remoteRelativePath)
       Files.createDirectories(target.getParent)
-      // Disallow overwriting non-classfile artifacts
+
+      // Disallow overwriting with modified version
       if (Files.exists(target)) {
-        throw new RuntimeException(
-          s"Duplicate Artifact: $remoteRelativePath. " +
+        // makes the query idempotent
+        if (FileUtils.contentEquals(target.toFile, 
serverLocalStagingPath.toFile)) {
+          return
+        }
+
+        throw Status.ALREADY_EXISTS
+          .withDescription(s"Duplicate Artifact: $remoteRelativePath. " +
             "Artifacts cannot be overwritten.")
+          .asRuntimeException()
       }
       Files.move(serverLocalStagingPath, target)
 
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
index e424331e761..d9de2a8094d 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import com.google.common.io.CountingOutputStream
+import io.grpc.StatusRuntimeException
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto
@@ -80,7 +81,7 @@ class SparkConnectAddArtifactsHandler(val responseObserver: 
StreamObserver[AddAr
   }
 
   override def onError(throwable: Throwable): Unit = {
-    Utils.deleteRecursively(stagingDir.toFile)
+    cleanUpStagedArtifacts()
     responseObserver.onError(throwable)
   }
 
@@ -114,16 +115,20 @@ class SparkConnectAddArtifactsHandler(val 
responseObserver: StreamObserver[AddAr
   protected def cleanUpStagedArtifacts(): Unit = 
Utils.deleteRecursively(stagingDir.toFile)
 
   override def onCompleted(): Unit = {
-    val artifactSummaries = flushStagedArtifacts()
-    // Add the artifacts to the session and return the summaries to the client.
-    val builder = proto.AddArtifactsResponse.newBuilder()
-    artifactSummaries.foreach(summary => builder.addArtifacts(summary))
-    // Delete temp dir
-    cleanUpStagedArtifacts()
-
-    // Send the summaries and close
-    responseObserver.onNext(builder.build())
-    responseObserver.onCompleted()
+    try {
+      val artifactSummaries = flushStagedArtifacts()
+      // Add the artifacts to the session and return the summaries to the 
client.
+      val builder = proto.AddArtifactsResponse.newBuilder()
+      artifactSummaries.foreach(summary => builder.addArtifacts(summary))
+      // Delete temp dir
+      cleanUpStagedArtifacts()
+
+      // Send the summaries and close
+      responseObserver.onNext(builder.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: StatusRuntimeException => onError(e)
+    }
   }
 
   /**
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
index fa3b7d52379..f34c4f77088 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 import java.util.UUID
 
+import io.grpc.StatusRuntimeException
 import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException, SparkFunSuite}
@@ -150,6 +151,28 @@ class ArtifactManagerSuite extends SharedSparkSession with 
ResourceHelper {
     }
   }
 
+  test("Add artifact idempotency") {
+    val remotePath = Paths.get("pyfiles/abc.zip")
+
+    withTempPath { path =>
+      Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
+      artifactManager.addArtifact(remotePath, path.toPath, None)
+    }
+
+    withTempPath { path =>
+      // subsequent call succeeds
+      Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
+      artifactManager.addArtifact(remotePath, path.toPath, None)
+    }
+
+    withTempPath { path =>
+      Files.write(path.toPath, "updated file".getBytes(StandardCharsets.UTF_8))
+      assertThrows[StatusRuntimeException] {
+        artifactManager.addArtifact(remotePath, path.toPath, None)
+      }
+    }
+  }
+
   test("SPARK-43790: Forward artifact file to cloud storage path") {
     val copyDir = Utils.createTempDir().toPath
     val destFSDir = Utils.createTempDir().toPath


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to