This is an automated email from the ASF dual-hosted git repository.

gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1e0d03de3 feat: Clean up deep storage after index parallel merge 
finishes  (#19187)
3d1e0d03de3 is described below

commit 3d1e0d03de341b41027dfe3d493800274f5e895c
Author: Virushade <[email protected]>
AuthorDate: Mon Mar 30 14:07:33 2026 +0800

    feat: Clean up deep storage after index parallel merge finishes  (#19187)
    
    * Shuffle solution
    
    Add delete for datasegments
    
    Unit Tests and Embedded tests
    
    Cleanup
    
    * Better task logs when fail killing.
    
    * Docs
    
    * Checkstyle
    
    * Cleanup function
    
    * non-volatile
    
    * Clean up unnecessary code
    
    * Test fixes
    
    * HDFS Segment clean-up
    
    * Finish up deepstore
    
    * Fix HDFS pathing bug
    
    * Fix forbidden apis
    
    * Deep Store PR
    
    * Tests
    
    * Correct Docs
    
    * Change DataSegmentKiller to not be aware of supervisor shuffle
    
    * Test unhappy path
    
    * Exception type
    
    * Checkstyle
    
    * Extra space in md file
    
    * Forbidden API
    
    * No need to check taskId as AbstractTask already did
    
    * Rename Shuffle method
    
    * Split method that preserves empty tokens
    
    * Update docs/configuration/index.md
    
    Co-authored-by: Copilot <[email protected]>
    
    * Address co-pilot comments
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 docs/configuration/index.md                        |   2 +-
 .../druid/storage/hdfs/HdfsDataSegmentKiller.java  |  55 ++++++
 .../storage/hdfs/HdfsDataSegmentKillerTest.java    | 166 ++++++++++++++++++
 .../parallel/ParallelIndexSupervisorTask.java      |  10 ++
 .../DeepStorageIntermediaryDataManager.java        |  25 ++-
 .../parallel/ParallelIndexSupervisorTaskTest.java  | 187 ++++++++++++++++++++-
 .../druid/segment/loading/DataSegmentKiller.java   |  12 ++
 .../segment/loading/OmniDataSegmentKiller.java     |  22 +++
 .../segment/loading/OmniDataSegmentKillerTest.java |  18 ++
 9 files changed, 494 insertions(+), 3 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c31cf0747cb..851db47e3a6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are 
passed through to Peons.
 |`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts. Setting this value to `0` 
removes the ability to service per-segment timeouts, irrespective of 
`perSegmentTimeout` query context parameter. As these threads are just 
servicing timers, it's recommended to set this value to some small percent 
(e.g. 5%) of the total query processing cores available to the peon.|0|
 |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal 
priority in a FIFO manner.|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
-|`druid.processing.intermediaryData.storage.type`|Storage type for 
intermediary segments of data shuffle between native parallel index tasks. <br 
/>Set to `local` to store segment files in the local storage of the Middle 
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for 
better fault tolerance during rolling updates. When the storage type is 
`deepstore`, Druid stores the data in the `shuffle-data` directory under the 
configured deep storage path. Druid does n [...]
+|`druid.processing.intermediaryData.storage.type`|Storage type for 
intermediary segments of data shuffle between native parallel index tasks. <br 
/>Set to `local` to store segment files in the local storage of the Middle 
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for 
better fault tolerance during rolling updates. When the storage type is 
`deepstore`, Druid stores the data in the `shuffle-data` directory under the 
configured deep storage path. Automated cl [...]
 |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all 
merge/processing memory pools to be allocated in parallel on process launch. 
This may significantly speed up Peon launch times if allocating several large 
buffers.|`false`|
 
 The amount of direct memory needed by Druid is at least
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
index e135403973e..0f5822312b0 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
@@ -148,4 +148,59 @@ public class HdfsDataSegmentKiller implements 
DataSegmentKiller
       log.makeAlert(e, "uncaught exception during segment killer").emit();
     }
   }
+
+  @Override
+  public void killRecursively(String relativePath) throws IOException
+  {
+    final Path dirToDelete = constructHdfsDeletePath(relativePath);
+    if (dirToDelete == null) {
+      return;
+    }
+
+    final FileSystem fs = dirToDelete.getFileSystem(config);
+    if (!fs.exists(dirToDelete)) {
+      return;
+    }
+    log.info("Deleting deep storage directory[%s]", dirToDelete);
+    if (!fs.delete(dirToDelete, true)) {
+      throw new IOException("Failed to delete deep storage directory[" + 
dirToDelete + "].");
+    }
+  }
+
+  /**
+   * Construct a path to delete from HDFS. Returns null if the path is invalid.
+   * Replicates how {@link HdfsDataSegmentPusher#pushToPath} handles ':', by 
replacing that with '_'.
+   */
+  @Nullable
+  private Path constructHdfsDeletePath(String relativePath)
+  {
+    if (Strings.isNullOrEmpty(relativePath)) {
+      log.warn("Skipping deep storage directory kill: relative path is empty");
+      return null;
+    }
+    if (relativePath.charAt(0) == '/') {
+      log.warn("Skipping deep storage directory kill: relative path must not 
be absolute, got [%s]", relativePath);
+      return null;
+    }
+    if (relativePath.indexOf('\\') >= 0) {
+      log.warn("Skipping deep storage directory kill: backslash not allowed in 
path [%s]", relativePath);
+      return null;
+    }
+    for (String segment : StringUtils.splitPreserveAllTokens(relativePath, 
'/')) {
+      if (segment.isEmpty() || "..".equals(segment)) {
+        log.warn("Skipping deep storage directory kill: invalid path[%s]", 
relativePath);
+        return null;
+      }
+    }
+
+    if (storageDirectory == null) {
+      log.warn("Skipping deep storage directory kill: storage directory not 
configured");
+      return null;
+    }
+
+    final String hdfsRelativePath = relativePath.replace(':', '_');
+    final String storageDirectoryString = storageDirectory.toString();
+    final String sep = storageDirectoryString.endsWith(Path.SEPARATOR) ? "" : 
Path.SEPARATOR;
+    return new Path(storageDirectoryString + sep + hdfsRelativePath);
+  }
 }
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
index 746ebb54583..30335f808de 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.storage.hdfs;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -34,6 +35,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
 
@@ -202,6 +204,170 @@ public class HdfsDataSegmentKillerTest
     killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString()));
   }
 
+  @Test
+  public void testKillRecursive_forWhenConstructedPathReturnsNull() throws 
Exception
+  {
+    final File testRoot = FileUtils.createTempDir();
+    final Configuration config = new Configuration();
+    final FileSystem fs = FileSystem.get(config);
+    try {
+      final HdfsDataSegmentKiller killerWithStorage = new 
HdfsDataSegmentKiller(
+          config,
+          new HdfsDataSegmentPusherConfig()
+          {
+            @Override
+            public String getStorageDirectory()
+            {
+              return testRoot.getAbsolutePath();
+            }
+          }
+      );
+
+      final Path workspaceRoot = new Path(testRoot.getAbsolutePath(), 
"workspace");
+      final Path nested = new Path(workspaceRoot, "evil");
+      Assert.assertTrue(fs.mkdirs(nested));
+      fs.createNewFile(new Path(nested, "probe"));
+
+      final Path stagingRun = new Path(new Path(testRoot.getAbsolutePath(), 
"staging"), "some_run_id");
+      Assert.assertTrue(fs.mkdirs(stagingRun));
+
+      killerWithStorage.killRecursively(null);
+      killerWithStorage.killRecursively("");
+      killerWithStorage.killRecursively("/absolute/under/root");
+      killerWithStorage.killRecursively("path\\with\\backslashes");
+      killerWithStorage.killRecursively("workspace/../evil");
+      killerWithStorage.killRecursively("only/../dots");
+      killerWithStorage.killRecursively("..");
+      killerWithStorage.killRecursively("workspace//evil");
+      killerWithStorage.killRecursively("workspace/evil/");
+
+      Assert.assertTrue("workspace/evil should survive null 
constructHdfsDeletePath cases", fs.exists(nested));
+      Assert.assertTrue(fs.exists(stagingRun));
+
+      final HdfsDataSegmentKiller killerNoStorage = new HdfsDataSegmentKiller(
+          config,
+          new HdfsDataSegmentPusherConfig()
+          {
+            @Override
+            public String getStorageDirectory()
+            {
+              return "";
+            }
+          }
+      );
+      killerNoStorage.killRecursively("staging/some_run_id");
+      Assert.assertTrue("paths must not be deleted when storage directory is 
unset", fs.exists(stagingRun));
+    }
+    finally {
+      fs.delete(new Path(testRoot.getAbsolutePath()), true);
+    }
+  }
+
+  @Test
+  public void testKillRecursively_missingDirectoryIsNoOp() throws Exception
+  {
+    final File testRoot = FileUtils.createTempDir();
+    final Configuration config = new Configuration();
+    final HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
+        config,
+        new HdfsDataSegmentPusherConfig()
+        {
+          @Override
+          public String getStorageDirectory()
+          {
+            return testRoot.getAbsolutePath();
+          }
+        }
+    );
+
+    final FileSystem fs = FileSystem.get(config);
+    try {
+      killer.killRecursively("staging/no_such_directory");
+    }
+    finally {
+      fs.delete(new Path(testRoot.getAbsolutePath()), true);
+    }
+  }
+
+  @Test
+  public void testKillRecursively() throws Exception
+  {
+    final File testRoot = FileUtils.createTempDir();
+    Configuration config = new Configuration();
+    HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
+        config,
+        new HdfsDataSegmentPusherConfig()
+        {
+          @Override
+          public String getStorageDirectory()
+          {
+            return testRoot.getAbsolutePath();
+          }
+        }
+    );
+
+    final FileSystem fs = FileSystem.get(config);
+    try {
+      Path parentDir = new Path(testRoot.getAbsolutePath(), "export");
+      Path taskDir = new Path(new Path(parentDir, "run_a"), "leaf");
+      Assert.assertTrue(fs.mkdirs(taskDir.getParent()));
+      fs.createNewFile(taskDir);
+
+      killer.killRecursively("export/run_a");
+
+      Assert.assertFalse(fs.exists(new Path(parentDir, "run_a")));
+      Assert.assertTrue(fs.exists(parentDir));
+      Assert.assertTrue(fs.delete(parentDir, true));
+    }
+    finally {
+      fs.delete(new Path(testRoot.getAbsolutePath()), true);
+    }
+  }
+
+  /**
+   * {@link HdfsDataSegmentPusher#pushToPath} replaces {@code ':'} with {@code 
'_'} in
+   * storage suffixes; cleanup applies the same normalization to the relative 
directory path.
+   */
+  @Test
+  public void testKillRecursively_pathWithColonsMatchesHdfsPusherLayout() 
throws Exception
+  {
+    final File testRoot = FileUtils.createTempDir();
+    Configuration config = new Configuration();
+    HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
+        config,
+        new HdfsDataSegmentPusherConfig()
+        {
+          @Override
+          public String getStorageDirectory()
+          {
+            return testRoot.getAbsolutePath();
+          }
+        }
+    );
+
+    final FileSystem fs = FileSystem.get(config);
+    try {
+      final String relativePathWithColons =
+          
"batch/index_parallel_opa_affiliate_ams_key_metric_hourly_ph_live_hflgnacd_2026-03-23T10:09:40.697Z";
+      final String onDiskRelativePath = relativePathWithColons.replace(':', 
'_');
+      Path batchRoot = new Path(testRoot.getAbsolutePath(), "batch");
+      Path taskDir = new Path(
+          testRoot.getAbsolutePath() + Path.SEPARATOR + onDiskRelativePath + 
Path.SEPARATOR + "leaf"
+      );
+      Assert.assertTrue(fs.mkdirs(taskDir.getParent()));
+      fs.createNewFile(taskDir);
+
+      killer.killRecursively(relativePathWithColons);
+
+      Assert.assertFalse(fs.exists(new Path(testRoot.getAbsolutePath() + 
Path.SEPARATOR + onDiskRelativePath)));
+      Assert.assertTrue(fs.exists(batchRoot));
+      Assert.assertTrue(fs.delete(batchRoot, true));
+    }
+    finally {
+      fs.delete(new Path(testRoot.getAbsolutePath()), true);
+    }
+  }
+
   @Test
   public void testKillNonZipSegment() throws Exception
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 8d5e9a85e95..44147e24295 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -59,6 +59,7 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import 
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
+import 
org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -1835,6 +1836,15 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
   @Override
   public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) 
throws Exception
   {
+    try {
+      toolbox.getDataSegmentKiller().killRecursively(
+          
DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(getId())
+      );
+    }
+    catch (IOException e) {
+      LOG.warn(e, "Failed recursive deep storage cleanup for intermediary path 
for task[%s]", getId());
+    }
+
     if (!isCompactionTask) {
       super.cleanUp(toolbox, taskStatus);
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
index 34199fc9e23..336973cee82 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
@@ -39,6 +39,16 @@ public class DeepStorageIntermediaryDataManager implements 
IntermediaryDataManag
   public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
   private final DataSegmentPusher dataSegmentPusher;
 
+  /**
+   * Deep storage path to the directory that holds all shuffle intermediate 
files for {@code supervisorTaskId},
+   * relative to the deep storage root configured for {@link 
DataSegmentPusher} (for example
+   * {@code druid.storage.storageDirectory} for local and HDFS). Matches the 
prefix used by {@link #addSegment}.
+   */
+  public static String retrieveShuffleDataStoragePath(String supervisorTaskId)
+  {
+    return SHUFFLE_DATA_DIR_PREFIX + "/" + supervisorTaskId;
+  }
+
   @Inject
   public DeepStorageIntermediaryDataManager(DataSegmentPusher 
dataSegmentPusher)
   {
@@ -95,9 +105,22 @@ public class DeepStorageIntermediaryDataManager implements 
IntermediaryDataManag
     throw new UnsupportedOperationException("Not supported, get partition file 
using segment loadspec");
   }
 
+  /**
+   * Not implemented for deep storage mode. Unlike {@link 
LocalIntermediaryDataManager},
+   * which can walk the local filesystem to find and delete files by 
supervisorTaskId,
+   * this manager has no way to discover what files were pushed: it has no
+   * {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not 
track pushed
+   * paths, and runs on short-lived peon processes whose state is lost on exit.
+   * <p>
+   * Deep storage shuffle cleanup is handled in {@link 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanUp}
+   * via {@link 
org.apache.druid.segment.loading.DataSegmentKiller#killRecursively} on
+   * {@link #retrieveShuffleDataStoragePath(String)} (recursive delete of that 
directory).
+   */
   @Override
   public void deletePartitions(String supervisorTaskId)
   {
-    throw new UnsupportedOperationException("Not supported");
+    throw new UnsupportedOperationException(
+        "Deep storage shuffle cleanup is handled by 
ParallelIndexSupervisorTask, not by the data manager"
+    );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 4652ecc5444..19dc225b6c9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -35,7 +35,9 @@ import 
org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexer.report.KillTaskReport;
 import org.apache.druid.indexer.report.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import 
org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
 import org.apache.druid.java.util.common.Intervals;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.rpc.HttpResponseException;
@@ -45,6 +47,7 @@ import 
org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentKiller;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec;
@@ -66,6 +69,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -497,8 +501,11 @@ public class ParallelIndexSupervisorTaskTest
               tuningConfig
       );
 
-      // If shouldCleanup is false, cleanup should be a no-o, throw a 
exception if toolbox is used
+      // Compaction skips super.cleanUp but still runs killRecursively for 
intermediary deep-storage files.
       TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+      final DataSegmentKiller killer = 
EasyMock.createNiceMock(DataSegmentKiller.class);
+      
EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer).anyTimes();
+      EasyMock.replay(toolbox, killer);
 
       new ParallelIndexSupervisorTask(
               null,
@@ -510,6 +517,184 @@ public class ParallelIndexSupervisorTaskTest
               true
       ).cleanUp(toolbox, null);
 
+      EasyMock.verify(toolbox);
+    }
+
+    @Test
+    public void testCleanUpInvokesKillRecursivelyForIntermediates() throws 
Exception
+    {
+      final boolean appendToExisting = false;
+      final boolean forceGuaranteedRollup = true;
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          new InlineInputSource("test"),
+          new JsonInputFormat(null, null, null, null, null),
+          appendToExisting,
+          null
+      );
+      final ParallelIndexTuningConfig tuningConfig = TuningConfigBuilder
+          .forParallelIndexTask()
+          .withMaxRowsInMemory(10)
+          .withMaxBytesInMemory(1000L)
+          .withPartitionsSpec(new HashedPartitionsSpec(null, 10, null))
+          .withIndexSpec(
+              IndexSpec.builder()
+                       
.withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
+                       
.withDimensionCompression(CompressionStrategy.UNCOMPRESSED)
+                       .withMetricCompression(CompressionStrategy.LZF)
+                       .withLongEncoding(LongEncodingStrategy.LONGS)
+                       .build()
+          )
+          .withIndexSpecForIntermediatePersists(IndexSpec.getDefault())
+          .withMaxPendingPersists(1)
+          .withForceGuaranteedRollup(forceGuaranteedRollup)
+          .withReportParseExceptions(true)
+          .withPushTimeout(10000L)
+          
.withSegmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+          .withMaxNumConcurrentSubTasks(10)
+          .withMaxRetry(100)
+          .withTaskStatusCheckPeriodMs(20L)
+          .withChatHandlerTimeout(new Duration(3600))
+          .withChatHandlerNumRetries(128)
+          .withLogParseExceptions(false)
+          .build();
+
+      final ParallelIndexIngestionSpec indexIngestionSpec = new 
ParallelIndexIngestionSpec(
+          DataSchema.builder()
+                    .withDataSource("datasource")
+                    .withTimestamp(TimestampSpec.DEFAULT)
+                    .withDimensions(DimensionsSpec.EMPTY)
+                    .build(),
+          ioConfig,
+          tuningConfig
+      );
+
+      final String supervisorTaskId = "index_parallel_cleanup_supervisor_id";
+      TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+      final DataSegmentKiller killer = 
EasyMock.createStrictMock(DataSegmentKiller.class);
+      EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+      
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+      EasyMock.expectLastCall();
+      EasyMock.replay(toolbox, killer);
+
+      new ParallelIndexSupervisorTask(
+          supervisorTaskId,
+          null,
+          null,
+          indexIngestionSpec,
+          null,
+          null,
+          true
+      ).cleanUp(toolbox, null);
+
+      EasyMock.verify(toolbox, killer);
+    }
+
+    @Test
+    public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws 
Exception
+    {
+      final ParallelIndexIngestionSpec indexIngestionSpec = 
buildParallelIngestionSpecForCleanUpTests();
+
+      final String supervisorTaskId = "index_parallel_ds_2024-01-01";
+      final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+      final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+      final DataSegmentKiller killer = 
EasyMock.createStrictMock(DataSegmentKiller.class);
+
+      EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+      
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+      EasyMock.expectLastCall();
+      EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
+      EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
+      EasyMock.replay(toolbox, taskConfig, killer);
+
+      new ParallelIndexSupervisorTask(
+          supervisorTaskId,
+          null,
+          null,
+          indexIngestionSpec,
+          null,
+          null,
+          false
+      ).cleanUp(toolbox, null);
+
+      EasyMock.verify(toolbox, taskConfig, killer);
+    }
+
+    @Test
+    public void testCleanUp_killRecursivelyFailureDoesNotAbortCleanUp() throws 
Exception
+    {
+      final ParallelIndexIngestionSpec indexIngestionSpec = 
buildParallelIngestionSpecForCleanUpTests();
+
+      final String supervisorTaskId = 
"index_parallel_deep_storage_cleanup_fail";
+      final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+      final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+      final DataSegmentKiller killer = 
EasyMock.createStrictMock(DataSegmentKiller.class);
+
+      EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+      
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+      EasyMock.expectLastCall().andThrow(new IOException("deep storage cleanup 
failed"));
+      EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
+      EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
+      EasyMock.replay(toolbox, taskConfig, killer);
+
+      new ParallelIndexSupervisorTask(
+          supervisorTaskId,
+          null,
+          null,
+          indexIngestionSpec,
+          null,
+          null,
+          false
+      ).cleanUp(toolbox, null);
+
+      EasyMock.verify(toolbox, taskConfig, killer);
+    }
+
+    private static ParallelIndexIngestionSpec 
buildParallelIngestionSpecForCleanUpTests()
+    {
+      final boolean appendToExisting = false;
+      final boolean forceGuaranteedRollup = true;
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          new InlineInputSource("test"),
+          new JsonInputFormat(null, null, null, null, null),
+          appendToExisting,
+          null
+      );
+      final ParallelIndexTuningConfig tuningConfig = TuningConfigBuilder
+          .forParallelIndexTask()
+          .withMaxRowsInMemory(10)
+          .withMaxBytesInMemory(1000L)
+          .withPartitionsSpec(new HashedPartitionsSpec(null, 10, null))
+          .withIndexSpec(
+              IndexSpec.builder()
+                       
.withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
+                       
.withDimensionCompression(CompressionStrategy.UNCOMPRESSED)
+                       .withMetricCompression(CompressionStrategy.LZF)
+                       .withLongEncoding(LongEncodingStrategy.LONGS)
+                       .build()
+          )
+          .withIndexSpecForIntermediatePersists(IndexSpec.getDefault())
+          .withMaxPendingPersists(1)
+          .withForceGuaranteedRollup(forceGuaranteedRollup)
+          .withReportParseExceptions(true)
+          .withPushTimeout(10000L)
+          
.withSegmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+          .withMaxNumConcurrentSubTasks(10)
+          .withMaxRetry(100)
+          .withTaskStatusCheckPeriodMs(20L)
+          .withChatHandlerTimeout(new Duration(3600))
+          .withChatHandlerNumRetries(128)
+          .withLogParseExceptions(false)
+          .build();
+
+      return new ParallelIndexIngestionSpec(
+          DataSchema.builder()
+                    .withDataSource("datasource")
+                    .withTimestamp(TimestampSpec.DEFAULT)
+                    .withDimensions(DimensionsSpec.EMPTY)
+                    .build(),
+          ioConfig,
+          tuningConfig
+      );
     }
 
     private PartitionStat createRangePartitionStat(Interval interval, int 
bucketId)
diff --git 
a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
 
b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
index cca09190481..29ae993d8e9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
@@ -98,4 +98,16 @@ public interface DataSegmentKiller
    * is only implemented by local and HDFS.
    */
   void killAll() throws IOException;
+
+  /**
+   * Recursively removes a directory (or object-store prefix) under the 
configured deep storage root. The path is
+   * relative to that root: no leading slash, no {@code ..} segments, no 
backslashes. If the path does not exist, this
+   * is a no-op. The default implementation does nothing; extensions that 
cannot recurse should keep the default.
+   * HDFS currently only implements this method.
+   *
+   * @throws IOException if deletion fails
+   */
+  default void killRecursively(String relativePath) throws IOException
+  {
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
 
b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
index a3017a5cfb0..f0afa5d1963 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.MapUtils;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -99,6 +100,27 @@ public class OmniDataSegmentKiller implements 
DataSegmentKiller
     throw new UnsupportedOperationException("not implemented");
   }
 
+  @Override
+  public void killRecursively(String relativePath) throws IOException
+  {
+    IOException firstFailure = null;
+    for (Supplier<DataSegmentKiller> supplier : killers.values()) {
+      try {
+        supplier.get().killRecursively(relativePath);
+      }
+      catch (IOException e) {
+        if (firstFailure == null) {
+          firstFailure = e;
+        } else {
+          firstFailure.addSuppressed(e);
+        }
+      }
+    }
+    if (firstFailure != null) {
+      throw firstFailure;
+    }
+  }
+
   @VisibleForTesting
   public Map<String, Supplier<DataSegmentKiller>> getKillers()
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
index 2038f3a8d7e..97e9f402554 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
@@ -37,6 +37,7 @@ import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -139,6 +140,23 @@ public class OmniDataSegmentKillerTest
     segmentKiller.kill(tombstone);
   }
 
+  @Test
+  public void testKillRecursively_delegatesToAllKillers() throws IOException
+  {
+    final DataSegmentKiller killerA = Mockito.mock(DataSegmentKiller.class);
+    final DataSegmentKiller killerB = Mockito.mock(DataSegmentKiller.class);
+    final Injector injector = createInjectorFromMap(
+        ImmutableMap.of("type_a", killerA, "type_b", killerB)
+    );
+    final OmniDataSegmentKiller segmentKiller = 
injector.getInstance(OmniDataSegmentKiller.class);
+
+    final String relativePath = "intermediate/batch_1";
+    segmentKiller.killRecursively(relativePath);
+
+    Mockito.verify(killerA).killRecursively(relativePath);
+    Mockito.verify(killerB).killRecursively(relativePath);
+  }
+
   @Test
   public void testKillMultipleSegmentsWithType() throws SegmentLoadingException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to