This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.2 by this push:
     new c9d3a73205 fix(workflow-core): paginate S3 deleteDirectory deletions 
(#5569)
c9d3a73205 is described below

commit c9d3a732055bfa9d4ad0f28ff953ec63ccaea004
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Fri Jun 12 19:04:04 2026 +0000

    fix(workflow-core): paginate S3 deleteDirectory deletions (#5569)
    
    ### What changes were proposed in this PR?
    
    `S3StorageClient.deleteDirectory` listed objects with a single
    `listObjectsV2` call and issued one `deleteObjects` batch. Both S3 APIs
    cap at 1000 keys per call, so for any prefix holding more than 1000
    objects only the first 1000 were deleted and the rest causes a storage
    leak. This affects dataset deletion (`DatasetResource`) and
    per-execution cleanup (`LargeBinaryManager`), either of which can exceed
    1000 objects under one prefix.
    
    This PR:
    - Lists via `listObjectsV2Paginator`, which follows the continuation
    token across all pages, and deletes in batches of at most 1000 keys.
    Keys are streamed so memory stays bounded to a single batch.
    - Inspects each `DeleteObjects` response and throws if any key failed.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5281
    
    ### How was this PR tested?
    
    1. Create more than 1000 files `for i in {1..1100}; do printf 'x' >
    "file_$i.txt"; done`
    2. Upload them in a dataset. (There is a frontend memory issue when you
    upload all 1100 files at the same time. Try to upload batch-by-batch)
    3. Delete the dataset.
    4. Check if all the files are removed in the minio console. (Before this
    fix, some files remain)
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    (backported from commit 227cbd73960afbcaa734b30f3ac108dc669324f3)
    
    Generated-by: Claude Code (Claude Opus 4.8)
---
 .../texera/service/util/LargeBinaryManager.scala   |   4 +-
 .../texera/service/util/S3StorageClient.scala      |  87 ++++++++++-------
 .../texera/service/util/S3StorageClientSpec.scala  | 108 ++++++++++++++++++++-
 3 files changed, 158 insertions(+), 41 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
index 2fa4acb530..df61981252 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
@@ -75,8 +75,8 @@ object LargeBinaryManager extends LazyLogging {
   }
 
   /**
-    * Deletes all large binaries for one execution. Uses deleteDirectory, 
which removes one
-    * listing page (<= 1000 objects) — enough for expected counts; more needs 
a paginated delete.
+    * Deletes all large binaries for one execution via deleteDirectory, which 
removes every object
+    * under the prefix.
     *
     * @param executionId the execution whose large binaries should be removed
     */
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index 148205a681..956de75520 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -27,7 +27,6 @@ import software.amazon.awssdk.services.s3.{S3Client, 
S3Configuration}
 import software.amazon.awssdk.core.sync.RequestBody
 
 import java.io.InputStream
-import java.security.MessageDigest
 import scala.jdk.CollectionConverters._
 
 /**
@@ -40,6 +39,10 @@ object S3StorageClient {
   val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000
   //Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180
   val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24
+  // S3 DeleteObjects accepts at most 1000 keys per request.
+  val MAX_KEYS_PER_DELETE_REQUEST = 1000
+  // Cap how many failed keys are listed in the error message.
+  private[util] val MAX_LISTED_DELETE_ERRORS = 10
 
   // Initialize MinIO-compatible S3 Client
   private lazy val s3Client: S3Client = {
@@ -97,50 +100,60 @@ object S3StorageClient {
   }
 
   /**
-    * Deletes a directory (all objects under a given prefix) from a bucket.
+    * Deletes every object whose key begins with `directoryPrefix`. S3 keys 
are a flat namespace
+    * with no real directories, so a "directory" is just a shared key prefix.
+    *
+    * A trailing `/` is added when missing so the prefix matches on a path 
boundary (`a/b` deletes
+    * `a/b/file` but not `a/bc/file`). An empty prefix would match the whole 
bucket and is rejected.
     *
     * @param bucketName Target S3/MinIO bucket.
-    * @param directoryPrefix The directory to delete (must end with `/`).
+    * @param directoryPrefix Non-empty key prefix to delete.
     */
   def deleteDirectory(bucketName: String, directoryPrefix: String): Unit = {
-    // Ensure the directory prefix ends with `/` to avoid accidental deletions
+    require(directoryPrefix.nonEmpty, "directoryPrefix must not be empty")
     val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else 
directoryPrefix + "/"
 
-    // List objects under the given prefix
-    val listRequest = ListObjectsV2Request
-      .builder()
-      .bucket(bucketName)
-      .prefix(prefix)
-      .build()
-
-    val listResponse = s3Client.listObjectsV2(listRequest)
-
-    // Extract object keys
-    val objectKeys = listResponse.contents().asScala.map(_.key())
-
-    if (objectKeys.nonEmpty) {
-      val objectsToDelete =
-        objectKeys.map(key => 
ObjectIdentifier.builder().key(key).build()).asJava
-
-      val deleteRequest = Delete
-        .builder()
-        .objects(objectsToDelete)
-        .build()
-
-      // Compute MD5 checksum for MinIO if required
-      val md5Hash = MessageDigest
-        .getInstance("MD5")
-        .digest(deleteRequest.toString.getBytes("UTF-8"))
+    val listRequest = 
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()
+
+    // Delete in batches capped at the per-request key limit. Attempt every 
batch before raising,
+    // so one undeletable key can't strand the rest; `quiet(true)` keeps each 
response to just the
+    // failures.
+    val errors = s3Client
+      .listObjectsV2Paginator(listRequest)
+      .contents()
+      .asScala
+      .iterator
+      .map(obj => ObjectIdentifier.builder().key(obj.key()).build())
+      .grouped(MAX_KEYS_PER_DELETE_REQUEST)
+      .flatMap { batch =>
+        s3Client
+          .deleteObjects(
+            DeleteObjectsRequest
+              .builder()
+              .bucket(bucketName)
+              
.delete(Delete.builder().objects(batch.asJava).quiet(true).build())
+              .build()
+          )
+          .errors()
+          .asScala
+      }
+      .toList
 
-      // Convert object keys to S3 DeleteObjectsRequest format
-      val deleteObjectsRequest = DeleteObjectsRequest
-        .builder()
-        .bucket(bucketName)
-        .delete(deleteRequest)
-        .build()
+    throwOnDeleteErrors(prefix, errors)
+  }
 
-      // Perform batch deletion
-      s3Client.deleteObjects(deleteObjectsRequest)
+  /** Raise if any object failed to delete, listing up to 
`MAX_LISTED_DELETE_ERRORS` keys. */
+  private[util] def throwOnDeleteErrors(prefix: String, errors: Seq[S3Error]): 
Unit = {
+    if (errors.nonEmpty) {
+      val listed = errors.take(MAX_LISTED_DELETE_ERRORS).map(e => s"${e.key()} 
(${e.code()})")
+      val summary =
+        if (errors.size > MAX_LISTED_DELETE_ERRORS)
+          s" (and ${errors.size - MAX_LISTED_DELETE_ERRORS} more)"
+        else ""
+      throw new RuntimeException(
+        s"Failed to delete ${errors.size} object(s) under prefix '$prefix': " +
+          listed.mkString(", ") + summary
+      )
     }
   }
 
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
index a1662cf8c3..c98f3c589c 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala
@@ -21,8 +21,12 @@ package org.apache.texera.service.util
 
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.scalatest.funsuite.AnyFunSuite
+import software.amazon.awssdk.services.s3.model.S3Error
 
 import java.io.ByteArrayInputStream
+import java.util.concurrent.Executors
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.util.Random
 
 class S3StorageClientSpec
@@ -39,9 +43,9 @@ class S3StorageClientSpec
   }
 
   override def afterAll(): Unit = {
-    // Clean up test bucket
+    // Best-effort cleanup of the prefixes these tests use (deleteDirectory 
rejects an empty prefix).
     try {
-      S3StorageClient.deleteDirectory(testBucketName, "")
+      Seq("test", 
"delete-dir").foreach(S3StorageClient.deleteDirectory(testBucketName, _))
     } catch {
       case _: Exception => // Ignore cleanup errors
     }
@@ -334,4 +338,104 @@ class S3StorageClientSpec
 
     S3StorageClient.deleteObject(testBucketName, objectKey)
   }
+
+  // ========================================
+  // deleteDirectory Tests
+  // ========================================
+
+  test("deleteDirectory should delete all objects under a prefix") {
+    val prefix = "delete-dir/small"
+    val keys = (0 until 5).map(i => s"$prefix/object-$i.txt")
+    keys.foreach(key =>
+      S3StorageClient.uploadObject(testBucketName, key, 
createInputStream("data"))
+    )
+
+    assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+    S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+    assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+  }
+
+  test("deleteDirectory should not delete siblings that merely share the 
prefix string") {
+    // The trailing-slash guard: deleting "delete-dir/small" (→ 
"delete-dir/small/") must leave the
+    // sibling "delete-dir/small-sibling.txt" untouched.
+    val prefix = "delete-dir/small"
+    S3StorageClient.uploadObject(testBucketName, s"$prefix/object.txt", 
createInputStream("data"))
+    val sibling = "delete-dir/small-sibling.txt"
+    S3StorageClient.uploadObject(testBucketName, sibling, 
createInputStream("keep me"))
+
+    S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+    assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+    val survivor = S3StorageClient.downloadObject(testBucketName, sibling)
+    assert(new String(readInputStream(survivor)) == "keep me")
+    survivor.close()
+    S3StorageClient.deleteObject(testBucketName, sibling)
+  }
+
+  test("deleteDirectory should delete more than 1000 objects under a prefix") {
+    // >1000 objects exercises pagination and delete batching; without them 
the tail is orphaned.
+    val prefix = "delete-dir/large"
+    val objectCount = 1001
+
+    // Upload concurrently to keep the test reasonably fast.
+    val pool = Executors.newFixedThreadPool(16)
+    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool)
+    try {
+      val uploads = (0 until objectCount).map { i =>
+        Future {
+          S3StorageClient.uploadObject(
+            testBucketName,
+            f"$prefix/object-$i%05d.txt",
+            createInputStream("")
+          )
+        }
+      }
+      Await.result(Future.sequence(uploads), 5.minutes)
+    } finally {
+      pool.shutdown()
+    }
+
+    assert(S3StorageClient.directoryExists(testBucketName, prefix))
+
+    S3StorageClient.deleteDirectory(testBucketName, prefix)
+
+    assert(!S3StorageClient.directoryExists(testBucketName, prefix))
+  }
+
+  test("deleteDirectory should not throw for a prefix with no objects") {
+    // Empty listing: no DeleteObjects request is issued.
+    S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent")
+  }
+
+  // A real per-key failure needs object-lock setup, so test 
throwOnDeleteErrors directly.
+
+  test("throwOnDeleteErrors should raise on a per-key delete failure") {
+    val errors = 
Seq(S3Error.builder().key("delete-dir/locked.txt").code("AccessDenied").build())
+    val thrown = intercept[RuntimeException] {
+      S3StorageClient.throwOnDeleteErrors("delete-dir/", errors)
+    }
+    assert(thrown.getMessage.contains("delete-dir/locked.txt"))
+    assert(thrown.getMessage.contains("AccessDenied"))
+  }
+
+  test("throwOnDeleteErrors should not throw when there are no errors") {
+    S3StorageClient.throwOnDeleteErrors("delete-dir/", Seq.empty[S3Error])
+  }
+
+  test("throwOnDeleteErrors should report the true total but list at most the 
cap") {
+    val cap = S3StorageClient.MAX_LISTED_DELETE_ERRORS
+    val errorCount = cap + 5
+    val errors = (0 until errorCount).map(i =>
+      
S3Error.builder().key(f"delete-dir/locked-$i%02d.txt").code("AccessDenied").build()
+    )
+    val thrown = intercept[RuntimeException] {
+      S3StorageClient.throwOnDeleteErrors("delete-dir/", errors)
+    }
+    assert(thrown.getMessage.contains(s"$errorCount object(s)"))
+    assert(thrown.getMessage.contains("delete-dir/locked-00.txt")) // first 
key is listed
+    assert(!thrown.getMessage.contains(f"delete-dir/locked-$cap%02d.txt")) // 
capped key is not
+    assert(thrown.getMessage.contains(s"and ${errorCount - cap} more"))
+  }
 }

Reply via email to