This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c3923aaa59 [SPARK-45486][CONNECT] Make add_artifact request idempotent
8c3923aaa59 is described below
commit 8c3923aaa59a36e91fe9d3cdde95a69d51c599ae
Author: Alice Sayutina <[email protected]>
AuthorDate: Mon Oct 16 09:21:09 2023 +0900
[SPARK-45486][CONNECT] Make add_artifact request idempotent
### What changes were proposed in this pull request?
1. Make add_artifact request idempotent i.e. subsequent requests will
succeed if the same content is provided. This makes retrying more safe.
2. Fix existing error handling mechanism:
Before the update the error looks like that
```
>>> spark.addArtifact("tmp.py", pyfile=True)
>>> spark.addArtifact("tmp.py", pyfile=True) # fails
2023-10-09 15:55:30,352 82873 DEBUG __iter__ Will retry call after
60014.279746934524 ms sleep (error: <_InactiveRpcError of RPC that terminated
with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "UNKNOWN:Error received from peer
{grpc_message:"", grpc_status:2,
created_time:"2023-10-09T15:55:30.351541+02:00"}"
>)
(this is also getting retried)
```
Now it looks:
```
>>> spark.addArtifact("abc.sh", file=True)
>>> spark.addArtifact("abc.sh", file=True) # passes
>>> # update file's content
>>> spark.addArtifact("abc.sh", file=True) # now fails
Traceback (most recent call last):
[...]
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
with:
status = StatusCode.ALREADY_EXISTS
details = "Duplicate Artifact: files/abc.sh. Artifacts cannot be
overwritten."
debug_error_string = "UNKNOWN:Error received from peer
{grpc_message:"Duplicate Artifact: files/abc.sh. Artifacts cannot be
overwritten.", grpc_status:6, created_time:"2023-10-10T01:25:38.231317+02:00"}"
>
```
### Why are the changes needed?
Makes retrying more robust, adds user-friendly error (see above).
### Does this PR introduce _any_ user-facing change?
Mostly internal improvements
### How was this patch tested?
Unit testing, testing against server
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43314 from cdkrot/SPARK-45485.
Authored-by: Alice Sayutina <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../artifact/SparkConnectArtifactManager.scala | 14 ++++++++---
.../service/SparkConnectAddArtifactsHandler.scala | 27 +++++++++++++---------
.../connect/artifact/ArtifactManagerSuite.scala | 23 ++++++++++++++++++
3 files changed, 50 insertions(+), 14 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
index 804c314ce67..ad551c4b0f5 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriBuilder
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
+import io.grpc.Status
import org.apache.commons.io.{FilenameUtils, FileUtils}
import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}
@@ -125,11 +126,18 @@ class SparkConnectArtifactManager(sessionHolder:
SessionHolder) extends Logging
} else {
val target = ArtifactUtils.concatenatePaths(artifactPath,
remoteRelativePath)
Files.createDirectories(target.getParent)
- // Disallow overwriting non-classfile artifacts
+
+ // Disallow overwriting with modified version
if (Files.exists(target)) {
- throw new RuntimeException(
- s"Duplicate Artifact: $remoteRelativePath. " +
+ // makes the query idempotent
+ if (FileUtils.contentEquals(target.toFile,
serverLocalStagingPath.toFile)) {
+ return
+ }
+
+ throw Status.ALREADY_EXISTS
+ .withDescription(s"Duplicate Artifact: $remoteRelativePath. " +
"Artifacts cannot be overwritten.")
+ .asRuntimeException()
}
Files.move(serverLocalStagingPath, target)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
index e424331e761..d9de2a8094d 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
import scala.util.control.NonFatal
import com.google.common.io.CountingOutputStream
+import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto
@@ -80,7 +81,7 @@ class SparkConnectAddArtifactsHandler(val responseObserver:
StreamObserver[AddAr
}
override def onError(throwable: Throwable): Unit = {
- Utils.deleteRecursively(stagingDir.toFile)
+ cleanUpStagedArtifacts()
responseObserver.onError(throwable)
}
@@ -114,16 +115,20 @@ class SparkConnectAddArtifactsHandler(val
responseObserver: StreamObserver[AddAr
protected def cleanUpStagedArtifacts(): Unit =
Utils.deleteRecursively(stagingDir.toFile)
override def onCompleted(): Unit = {
- val artifactSummaries = flushStagedArtifacts()
- // Add the artifacts to the session and return the summaries to the client.
- val builder = proto.AddArtifactsResponse.newBuilder()
- artifactSummaries.foreach(summary => builder.addArtifacts(summary))
- // Delete temp dir
- cleanUpStagedArtifacts()
-
- // Send the summaries and close
- responseObserver.onNext(builder.build())
- responseObserver.onCompleted()
+ try {
+ val artifactSummaries = flushStagedArtifacts()
+ // Add the artifacts to the session and return the summaries to the
client.
+ val builder = proto.AddArtifactsResponse.newBuilder()
+ artifactSummaries.foreach(summary => builder.addArtifacts(summary))
+ // Delete temp dir
+ cleanUpStagedArtifacts()
+
+ // Send the summaries and close
+ responseObserver.onNext(builder.build())
+ responseObserver.onCompleted()
+ } catch {
+ case e: StatusRuntimeException => onError(e)
+ }
}
/**
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
index fa3b7d52379..f34c4f77088 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala
@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.UUID
+import io.grpc.StatusRuntimeException
import org.apache.commons.io.FileUtils
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkException, SparkFunSuite}
@@ -150,6 +151,28 @@ class ArtifactManagerSuite extends SharedSparkSession with
ResourceHelper {
}
}
+ test("Add artifact idempotency") {
+ val remotePath = Paths.get("pyfiles/abc.zip")
+
+ withTempPath { path =>
+ Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
+ artifactManager.addArtifact(remotePath, path.toPath, None)
+ }
+
+ withTempPath { path =>
+ // subsequent call succeeds
+ Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
+ artifactManager.addArtifact(remotePath, path.toPath, None)
+ }
+
+ withTempPath { path =>
+ Files.write(path.toPath, "updated file".getBytes(StandardCharsets.UTF_8))
+ assertThrows[StatusRuntimeException] {
+ artifactManager.addArtifact(remotePath, path.toPath, None)
+ }
+ }
+ }
+
test("SPARK-43790: Forward artifact file to cloud storage path") {
val copyDir = Utils.createTempDir().toPath
val destFSDir = Utils.createTempDir().toPath
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]