This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new c9d3a73205 fix(workflow-core): paginate S3 deleteDirectory deletions
(#5569)
c9d3a73205 is described below
commit c9d3a732055bfa9d4ad0f28ff953ec63ccaea004
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Fri Jun 12 19:04:04 2026 +0000
fix(workflow-core): paginate S3 deleteDirectory deletions (#5569)
### What changes were proposed in this PR?
`S3StorageClient.deleteDirectory` listed objects with a single
`listObjectsV2` call and issued one `deleteObjects` batch. Both S3 APIs
cap at 1000 keys per call, so for any prefix holding more than 1000
objects only the first 1000 were deleted and the rest causes a storage
leak. This affects dataset deletion (`DatasetResource`) and
per-execution cleanup (`LargeBinaryManager`), either of which can exceed
1000 objects under one prefix.
This PR:
- Lists via `listObjectsV2Paginator`, which follows the continuation
token across all pages, and deletes in batches of at most 1000 keys.
Keys are streamed so memory stays bounded to a single batch.
- Inspects each `DeleteObjects` response and throws if any key failed.
### Any related issues, documentation, discussions?
Closes #5281
### How was this PR tested?
1. Create more than 1000 files `for i in {1..1100}; do printf 'x' >
"file_$i.txt"; done`
2. Upload them in a dataset. (There is a frontend memory issue when you
upload all 1100 files at the same time. Try to upload batch-by-batch)
3. Delete the dataset.
4. Check if all the files are removed in the minio console. (Before this
fix, some files remain)
### Was this PR authored or co-authored using generative AI tooling?
(backported from commit 227cbd73960afbcaa734b30f3ac108dc669324f3)
Generated-by: Claude Code (Claude Opus 4.8)
---
.../texera/service/util/LargeBinaryManager.scala | 4 +-
.../texera/service/util/S3StorageClient.scala | 87 ++++++++++-------
.../texera/service/util/S3StorageClientSpec.scala | 108 ++++++++++++++++++++-
3 files changed, 158 insertions(+), 41 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
index 2fa4acb530..df61981252 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
@@ -75,8 +75,8 @@ object LargeBinaryManager extends LazyLogging {
}
/**
- * Deletes all large binaries for one execution. Uses deleteDirectory,
which removes one
- * listing page (<= 1000 objects) — enough for expected counts; more needs
a paginated delete.
+ * Deletes all large binaries for one execution via deleteDirectory, which
removes every object
+ * under the prefix.
*
* @param executionId the execution whose large binaries should be removed
*/
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index 148205a681..956de75520 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -27,7 +27,6 @@ import software.amazon.awssdk.services.s3.{S3Client,
S3Configuration}
import software.amazon.awssdk.core.sync.RequestBody
import java.io.InputStream
-import java.security.MessageDigest
import scala.jdk.CollectionConverters._
/**
@@ -40,6 +39,10 @@ object S3StorageClient {
val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000
//Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180
val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24
+ // S3 DeleteObjects accepts at most 1000 keys per request.
+ val MAX_KEYS_PER_DELETE_REQUEST = 1000
+ // Cap how many failed keys are listed in the error message.
+ private[util] val MAX_LISTED_DELETE_ERRORS = 10
// Initialize MinIO-compatible S3 Client
private lazy val s3Client: S3Client = {
@@ -97,50 +100,60 @@ object S3StorageClient {
}
/**
- * Deletes a directory (all objects under a given prefix) from a bucket.
+ * Deletes every object whose key begins with `directoryPrefix`. S3 keys
are a flat namespace
+ * with no real directories, so a "directory" is just a shared key prefix.
+ *
+ * A trailing `/` is added when missing so the prefix matches on a path
boundary (`a/b` deletes
+ * `a/b/file` but not `a/bc/file`). An empty prefix would match the whole
bucket and is rejected.
*
* @param bucketName Target S3/MinIO bucket.
- * @param directoryPrefix The directory to delete (must end with `/`).
+ * @param directoryPrefix Non-empty key prefix to delete.
*/
def deleteDirectory(bucketName: String, directoryPrefix: String): Unit = {
- // Ensure the directory prefix ends with `/` to avoid accidental deletions
+ require(directoryPrefix.nonEmpty, "directoryPrefix must not be empty")
val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else
directoryPrefix + "/"
- // List objects under the given prefix
- val listRequest = ListObjectsV2Request
- .builder()
- .bucket(bucketName)
- .prefix(prefix)
- .build()
-
- val listResponse = s3Client.listObjectsV2(listRequest)
-
- // Extract object keys
- val objectKeys = listResponse.contents().asScala.map(_.key())
-
- if (objectKeys.nonEmpty) {
- val objectsToDelete =
- objectKeys.map(key =>
ObjectIdentifier.builder().key(key).build()).asJava
-
- val deleteRequest = Delete
- .builder()
- .objects(objectsToDelete)
- .build()
-
- // Compute MD5 checksum for MinIO if required
- val md5Hash = MessageDigest
- .getInstance("MD5")
- .digest(deleteRequest.toString.getBytes("UTF-8"))
+ val listRequest =
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()
+
+ // Delete in batches capped at the per-request key limit. Attempt every
batch before raising,
+ // so one undeletable key can't strand the rest; `quiet(true)` keeps each
response to just the
+ // failures.
+ val errors = s3Client
+ .listObjectsV2Paginator(listRequest)
+ .contents()
+ .asScala
+ .iterator
+ .map(obj => ObjectIdentifier.builder().key(obj.key()).build())
+ .grouped(MAX_KEYS_PER_DELETE_REQUEST)
+ .flatMap { batch =>
+ s3Client
+ .deleteObjects(
+ DeleteObjectsRequest
+ .builder()
+ .bucket(bucketName)
+
.delete(Delete.builder().objects(batch.asJava).quiet(true).build())
+ .build()
+ )
+ .errors()
+ .asScala
+ }
+ .toList
- // Convert object keys to S3 DeleteObjectsRequest format
- val deleteObjectsRequest = DeleteObjectsRequest
- .builder()
- .bucket(bucketName)
- .delete(deleteRequest)
- .build()
+ throwOnDeleteErrors(prefix, errors)
+ }
- // Perform batch deletion
- s3Client.deleteObjects(deleteObjectsRequest)
+ /** Raise if any object failed to delete, listing up to
`MAX_LISTED_DELETE_ERRORS` keys. */
+ private[util] def throwOnDeleteErrors(prefix: String, errors: Seq[S3Error]):
Unit = {
+ if (errors.nonEmpty) {
+ val listed = errors.take(MAX_LISTED_DELETE_ERRORS).map(e => s"${e.key()}
(${e.code()})")
+ val summary =
+ if (errors.size > MAX_LISTED_DELETE_ERRORS)
+ s" (and ${errors.size - MAX_LISTED_DELETE_ERRORS} more)"
+ else ""
+ throw new RuntimeException(
+ s"Failed to delete ${errors.size} object(s) under prefix '$prefix': " +
+ listed.mkString(", ") + summary
+ )
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
index a1662cf8c3..c98f3c589c 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
@@ -21,8 +21,12 @@ package org.apache.texera.service.util
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
+import software.amazon.awssdk.services.s3.model.S3Error
import java.io.ByteArrayInputStream
+import java.util.concurrent.Executors
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
class S3StorageClientSpec
@@ -39,9 +43,9 @@ class S3StorageClientSpec
}
override def afterAll(): Unit = {
- // Clean up test bucket
+ // Best-effort cleanup of the prefixes these tests use (deleteDirectory
rejects an empty prefix).
try {
- S3StorageClient.deleteDirectory(testBucketName, "")
+ Seq("test",
"delete-dir").foreach(S3StorageClient.deleteDirectory(testBucketName, _))
} catch {
case _: Exception => // Ignore cleanup errors
}
@@ -334,4 +338,104 @@ class S3StorageClientSpec
S3StorageClient.deleteObject(testBucketName, objectKey)
}
+
+ // ========================================
+ // deleteDirectory Tests
+ // ========================================
+
+ test("deleteDirectory should delete all objects under a prefix") {
+ val prefix = "delete-dir/small"
+ val keys = (0 until 5).map(i => s"$prefix/object-$i.txt")
+ keys.foreach(key =>
+ S3StorageClient.uploadObject(testBucketName, key,
createInputStream("data"))
+ )
+
+ assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+ S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+ assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+ }
+
+ test("deleteDirectory should not delete siblings that merely share the
prefix string") {
+ // The trailing-slash guard: deleting "delete-dir/small" (→
"delete-dir/small/") must leave the
+ // sibling "delete-dir/small-sibling.txt" untouched.
+ val prefix = "delete-dir/small"
+ S3StorageClient.uploadObject(testBucketName, s"$prefix/object.txt",
createInputStream("data"))
+ val sibling = "delete-dir/small-sibling.txt"
+ S3StorageClient.uploadObject(testBucketName, sibling,
createInputStream("keep me"))
+
+ S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+ assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+ val survivor = S3StorageClient.downloadObject(testBucketName, sibling)
+ assert(new String(readInputStream(survivor)) == "keep me")
+ survivor.close()
+ S3StorageClient.deleteObject(testBucketName, sibling)
+ }
+
+ test("deleteDirectory should delete more than 1000 objects under a prefix") {
+ // >1000 objects exercises pagination and delete batching; without them
the tail is orphaned.
+ val prefix = "delete-dir/large"
+ val objectCount = 1001
+
+ // Upload concurrently to keep the test reasonably fast.
+ val pool = Executors.newFixedThreadPool(16)
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool)
+ try {
+ val uploads = (0 until objectCount).map { i =>
+ Future {
+ S3StorageClient.uploadObject(
+ testBucketName,
+ f"$prefix/object-$i%05d.txt",
+ createInputStream("")
+ )
+ }
+ }
+ Await.result(Future.sequence(uploads), 5.minutes)
+ } finally {
+ pool.shutdown()
+ }
+
+ assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+ S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+ assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+ }
+
+ test("deleteDirectory should not throw for a prefix with no objects") {
+ // Empty listing: no DeleteObjects request is issued.
+ S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent")
+ }
+
+ // A real per-key failure needs object-lock setup, so test
throwOnDeleteErrors directly.
+
+ test("throwOnDeleteErrors should raise on a per-key delete failure") {
+ val errors =
Seq(S3Error.builder().key("delete-dir/locked.txt").code("AccessDenied").build())
+ val thrown = intercept[RuntimeException] {
+ S3StorageClient.throwOnDeleteErrors("delete-dir/", errors)
+ }
+ assert(thrown.getMessage.contains("delete-dir/locked.txt"))
+ assert(thrown.getMessage.contains("AccessDenied"))
+ }
+
+ test("throwOnDeleteErrors should not throw when there are no errors") {
+ S3StorageClient.throwOnDeleteErrors("delete-dir/", Seq.empty[S3Error])
+ }
+
+ test("throwOnDeleteErrors should report the true total but list at most the
cap") {
+ val cap = S3StorageClient.MAX_LISTED_DELETE_ERRORS
+ val errorCount = cap + 5
+ val errors = (0 until errorCount).map(i =>
+
S3Error.builder().key(f"delete-dir/locked-$i%02d.txt").code("AccessDenied").build()
+ )
+ val thrown = intercept[RuntimeException] {
+ S3StorageClient.throwOnDeleteErrors("delete-dir/", errors)
+ }
+ assert(thrown.getMessage.contains(s"$errorCount object(s)"))
+ assert(thrown.getMessage.contains("delete-dir/locked-00.txt")) // first
key is listed
+ assert(!thrown.getMessage.contains(f"delete-dir/locked-$cap%02d.txt")) //
capped key is not
+ assert(thrown.getMessage.contains(s"and ${errorCount - cap} more"))
+ }
}