This is an automated email from the ASF dual-hosted git repository.
gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1e0d03de3 feat: Clean up deep storage after index parallel merge
finishes (#19187)
3d1e0d03de3 is described below
commit 3d1e0d03de341b41027dfe3d493800274f5e895c
Author: Virushade <[email protected]>
AuthorDate: Mon Mar 30 14:07:33 2026 +0800
feat: Clean up deep storage after index parallel merge finishes (#19187)
* Shuffle solution
Add delete for datasegments
Unit Tests and Embedded tests
Cleanup
* Better task logs when fail killing.
* Docs
* Checkstyle
* Cleanup function
* non-volatile
* Clean up unnecessary code
* Test fixes
* HDFS Segment clean-up
* Finish up deepstore
* Fix HDFS pathing bug
* Fix forbidden apis
* Deep Store PR
* Tests
* Correct Docs
* Change DataSegmentKiller to not be aware of supervisor shuffle
* Test unhappy path
* Exception type
* Checkstyle
* Extra space in md file
* Forbidden API
* No need to check taskId as AbstractTask already did
* Rename Shuffle method
* Split method that preserves empty tokens
* Update docs/configuration/index.md
Co-authored-by: Copilot <[email protected]>
* Address co-pilot comments
---------
Co-authored-by: Copilot <[email protected]>
---
docs/configuration/index.md | 2 +-
.../druid/storage/hdfs/HdfsDataSegmentKiller.java | 55 ++++++
.../storage/hdfs/HdfsDataSegmentKillerTest.java | 166 ++++++++++++++++++
.../parallel/ParallelIndexSupervisorTask.java | 10 ++
.../DeepStorageIntermediaryDataManager.java | 25 ++-
.../parallel/ParallelIndexSupervisorTaskTest.java | 187 ++++++++++++++++++++-
.../druid/segment/loading/DataSegmentKiller.java | 12 ++
.../segment/loading/OmniDataSegmentKiller.java | 22 +++
.../segment/loading/OmniDataSegmentKillerTest.java | 18 ++
9 files changed, 494 insertions(+), 3 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c31cf0747cb..851db47e3a6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are
passed through to Peons.
|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the peon.|0|
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal
priority in a FIFO manner.|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
-|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Druid does n [...]
+|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Automated cl [...]
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all
merge/processing memory pools to be allocated in parallel on process launch.
This may significantly speed up Peon launch times if allocating several large
buffers.|`false`|
The amount of direct memory needed by Druid is at least
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
index e135403973e..0f5822312b0 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java
@@ -148,4 +148,59 @@ public class HdfsDataSegmentKiller implements
DataSegmentKiller
log.makeAlert(e, "uncaught exception during segment killer").emit();
}
}
+
+ @Override
+ public void killRecursively(String relativePath) throws IOException
+ {
+ final Path dirToDelete = constructHdfsDeletePath(relativePath);
+ if (dirToDelete == null) {
+ return;
+ }
+
+ final FileSystem fs = dirToDelete.getFileSystem(config);
+ if (!fs.exists(dirToDelete)) {
+ return;
+ }
+ log.info("Deleting deep storage directory[%s]", dirToDelete);
+ if (!fs.delete(dirToDelete, true)) {
+ throw new IOException("Failed to delete deep storage directory[" +
dirToDelete + "].");
+ }
+ }
+
+ /**
+ * Construct a path to delete from HDFS. Returns null if the path is invalid.
+ * Replicates how {@link HdfsDataSegmentPusher#pushToPath} handles ':', by
replacing that with '_'.
+ */
+ @Nullable
+ private Path constructHdfsDeletePath(String relativePath)
+ {
+ if (Strings.isNullOrEmpty(relativePath)) {
+ log.warn("Skipping deep storage directory kill: relative path is empty");
+ return null;
+ }
+ if (relativePath.charAt(0) == '/') {
+ log.warn("Skipping deep storage directory kill: relative path must not
be absolute, got [%s]", relativePath);
+ return null;
+ }
+ if (relativePath.indexOf('\\') >= 0) {
+ log.warn("Skipping deep storage directory kill: backslash not allowed in
path [%s]", relativePath);
+ return null;
+ }
+ for (String segment : StringUtils.splitPreserveAllTokens(relativePath,
'/')) {
+ if (segment.isEmpty() || "..".equals(segment)) {
+ log.warn("Skipping deep storage directory kill: invalid path[%s]",
relativePath);
+ return null;
+ }
+ }
+
+ if (storageDirectory == null) {
+ log.warn("Skipping deep storage directory kill: storage directory not
configured");
+ return null;
+ }
+
+ final String hdfsRelativePath = relativePath.replace(':', '_');
+ final String storageDirectoryString = storageDirectory.toString();
+ final String sep = storageDirectoryString.endsWith(Path.SEPARATOR) ? "" :
Path.SEPARATOR;
+ return new Path(storageDirectoryString + sep + hdfsRelativePath);
+ }
}
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
index 746ebb54583..30335f808de 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKillerTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.storage.hdfs;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -34,6 +35,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.io.File;
import java.io.IOException;
import java.util.UUID;
@@ -202,6 +204,170 @@ public class HdfsDataSegmentKillerTest
killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString()));
}
+ @Test
+ public void testKillRecursive_forWhenConstructedPathReturnsNull() throws
Exception
+ {
+ final File testRoot = FileUtils.createTempDir();
+ final Configuration config = new Configuration();
+ final FileSystem fs = FileSystem.get(config);
+ try {
+ final HdfsDataSegmentKiller killerWithStorage = new
HdfsDataSegmentKiller(
+ config,
+ new HdfsDataSegmentPusherConfig()
+ {
+ @Override
+ public String getStorageDirectory()
+ {
+ return testRoot.getAbsolutePath();
+ }
+ }
+ );
+
+ final Path workspaceRoot = new Path(testRoot.getAbsolutePath(),
"workspace");
+ final Path nested = new Path(workspaceRoot, "evil");
+ Assert.assertTrue(fs.mkdirs(nested));
+ fs.createNewFile(new Path(nested, "probe"));
+
+ final Path stagingRun = new Path(new Path(testRoot.getAbsolutePath(),
"staging"), "some_run_id");
+ Assert.assertTrue(fs.mkdirs(stagingRun));
+
+ killerWithStorage.killRecursively(null);
+ killerWithStorage.killRecursively("");
+ killerWithStorage.killRecursively("/absolute/under/root");
+ killerWithStorage.killRecursively("path\\with\\backslashes");
+ killerWithStorage.killRecursively("workspace/../evil");
+ killerWithStorage.killRecursively("only/../dots");
+ killerWithStorage.killRecursively("..");
+ killerWithStorage.killRecursively("workspace//evil");
+ killerWithStorage.killRecursively("workspace/evil/");
+
+ Assert.assertTrue("workspace/evil should survive null
constructHdfsDeletePath cases", fs.exists(nested));
+ Assert.assertTrue(fs.exists(stagingRun));
+
+ final HdfsDataSegmentKiller killerNoStorage = new HdfsDataSegmentKiller(
+ config,
+ new HdfsDataSegmentPusherConfig()
+ {
+ @Override
+ public String getStorageDirectory()
+ {
+ return "";
+ }
+ }
+ );
+ killerNoStorage.killRecursively("staging/some_run_id");
+ Assert.assertTrue("paths must not be deleted when storage directory is
unset", fs.exists(stagingRun));
+ }
+ finally {
+ fs.delete(new Path(testRoot.getAbsolutePath()), true);
+ }
+ }
+
+ @Test
+ public void testKillRecursively_missingDirectoryIsNoOp() throws Exception
+ {
+ final File testRoot = FileUtils.createTempDir();
+ final Configuration config = new Configuration();
+ final HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
+ config,
+ new HdfsDataSegmentPusherConfig()
+ {
+ @Override
+ public String getStorageDirectory()
+ {
+ return testRoot.getAbsolutePath();
+ }
+ }
+ );
+
+ final FileSystem fs = FileSystem.get(config);
+ try {
+ killer.killRecursively("staging/no_such_directory");
+ }
+ finally {
+ fs.delete(new Path(testRoot.getAbsolutePath()), true);
+ }
+ }
+
+ @Test
+ public void testKillRecursively() throws Exception
+ {
+ final File testRoot = FileUtils.createTempDir();
+ Configuration config = new Configuration();
+ HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
+ config,
+ new HdfsDataSegmentPusherConfig()
+ {
+ @Override
+ public String getStorageDirectory()
+ {
+ return testRoot.getAbsolutePath();
+ }
+ }
+ );
+
+ final FileSystem fs = FileSystem.get(config);
+ try {
+ Path parentDir = new Path(testRoot.getAbsolutePath(), "export");
+ Path taskDir = new Path(new Path(parentDir, "run_a"), "leaf");
+ Assert.assertTrue(fs.mkdirs(taskDir.getParent()));
+ fs.createNewFile(taskDir);
+
+ killer.killRecursively("export/run_a");
+
+ Assert.assertFalse(fs.exists(new Path(parentDir, "run_a")));
+ Assert.assertTrue(fs.exists(parentDir));
+ Assert.assertTrue(fs.delete(parentDir, true));
+ }
+ finally {
+ fs.delete(new Path(testRoot.getAbsolutePath()), true);
+ }
+ }
+
+ /**
+ * {@link HdfsDataSegmentPusher#pushToPath} replaces {@code ':'} with {@code
'_'} in
+ * storage suffixes; cleanup applies the same normalization to the relative
directory path.
+ */
+ @Test
+ public void testKillRecursively_pathWithColonsMatchesHdfsPusherLayout()
throws Exception
+ {
+ final File testRoot = FileUtils.createTempDir();
+ Configuration config = new Configuration();
+ HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
+ config,
+ new HdfsDataSegmentPusherConfig()
+ {
+ @Override
+ public String getStorageDirectory()
+ {
+ return testRoot.getAbsolutePath();
+ }
+ }
+ );
+
+ final FileSystem fs = FileSystem.get(config);
+ try {
+ final String relativePathWithColons =
+
"batch/index_parallel_opa_affiliate_ams_key_metric_hourly_ph_live_hflgnacd_2026-03-23T10:09:40.697Z";
+ final String onDiskRelativePath = relativePathWithColons.replace(':',
'_');
+ Path batchRoot = new Path(testRoot.getAbsolutePath(), "batch");
+ Path taskDir = new Path(
+ testRoot.getAbsolutePath() + Path.SEPARATOR + onDiskRelativePath +
Path.SEPARATOR + "leaf"
+ );
+ Assert.assertTrue(fs.mkdirs(taskDir.getParent()));
+ fs.createNewFile(taskDir);
+
+ killer.killRecursively(relativePathWithColons);
+
+ Assert.assertFalse(fs.exists(new Path(testRoot.getAbsolutePath() +
Path.SEPARATOR + onDiskRelativePath)));
+ Assert.assertTrue(fs.exists(batchRoot));
+ Assert.assertTrue(fs.delete(batchRoot, true));
+ }
+ finally {
+ fs.delete(new Path(testRoot.getAbsolutePath()), true);
+ }
+ }
+
@Test
public void testKillNonZipSegment() throws Exception
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 8d5e9a85e95..44147e24295 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -59,6 +59,7 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
+import
org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@@ -1835,6 +1836,15 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
@Override
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus)
throws Exception
{
+ try {
+ toolbox.getDataSegmentKiller().killRecursively(
+
DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(getId())
+ );
+ }
+ catch (IOException e) {
+ LOG.warn(e, "Failed recursive deep storage cleanup for intermediary path
for task[%s]", getId());
+ }
+
if (!isCompactionTask) {
super.cleanUp(toolbox, taskStatus);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
index 34199fc9e23..336973cee82 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
@@ -39,6 +39,16 @@ public class DeepStorageIntermediaryDataManager implements
IntermediaryDataManag
public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
private final DataSegmentPusher dataSegmentPusher;
+ /**
+ * Deep storage path to the directory that holds all shuffle intermediate
files for {@code supervisorTaskId},
+ * relative to the deep storage root configured for {@link
DataSegmentPusher} (for example
+ * {@code druid.storage.storageDirectory} for local and HDFS). Matches the
prefix used by {@link #addSegment}.
+ */
+ public static String retrieveShuffleDataStoragePath(String supervisorTaskId)
+ {
+ return SHUFFLE_DATA_DIR_PREFIX + "/" + supervisorTaskId;
+ }
+
@Inject
public DeepStorageIntermediaryDataManager(DataSegmentPusher
dataSegmentPusher)
{
@@ -95,9 +105,22 @@ public class DeepStorageIntermediaryDataManager implements
IntermediaryDataManag
throw new UnsupportedOperationException("Not supported, get partition file
using segment loadspec");
}
+ /**
+ * Not implemented for deep storage mode. Unlike {@link
LocalIntermediaryDataManager},
+ * which can walk the local filesystem to find and delete files by
supervisorTaskId,
+ * this manager has no way to discover what files were pushed: it has no
+ * {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not
track pushed
+ * paths, and runs on short-lived peon processes whose state is lost on exit.
+ * <p>
+ * Deep storage shuffle cleanup is handled in {@link
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanUp}
+ * via {@link
org.apache.druid.segment.loading.DataSegmentKiller#killRecursively} on
+ * {@link #retrieveShuffleDataStoragePath(String)} (recursive delete of that
directory).
+ */
@Override
public void deletePartitions(String supervisorTaskId)
{
- throw new UnsupportedOperationException("Not supported");
+ throw new UnsupportedOperationException(
+ "Deep storage shuffle cleanup is handled by
ParallelIndexSupervisorTask, not by the data manager"
+ );
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 4652ecc5444..19dc225b6c9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -35,7 +35,9 @@ import
org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import
org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.java.util.common.Intervals;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.rpc.HttpResponseException;
@@ -45,6 +47,7 @@ import
org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentKiller;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec;
@@ -66,6 +69,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -497,8 +501,11 @@ public class ParallelIndexSupervisorTaskTest
tuningConfig
);
- // If shouldCleanup is false, cleanup should be a no-o, throw a
exception if toolbox is used
+ // Compaction skips super.cleanUp but still runs killRecursively for
intermediary deep-storage files.
TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ final DataSegmentKiller killer =
EasyMock.createNiceMock(DataSegmentKiller.class);
+
EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer).anyTimes();
+ EasyMock.replay(toolbox, killer);
new ParallelIndexSupervisorTask(
null,
@@ -510,6 +517,184 @@ public class ParallelIndexSupervisorTaskTest
true
).cleanUp(toolbox, null);
+ EasyMock.verify(toolbox);
+ }
+
+ @Test
+ public void testCleanUpInvokesKillRecursivelyForIntermediates() throws
Exception
+ {
+ final boolean appendToExisting = false;
+ final boolean forceGuaranteedRollup = true;
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ new InlineInputSource("test"),
+ new JsonInputFormat(null, null, null, null, null),
+ appendToExisting,
+ null
+ );
+ final ParallelIndexTuningConfig tuningConfig = TuningConfigBuilder
+ .forParallelIndexTask()
+ .withMaxRowsInMemory(10)
+ .withMaxBytesInMemory(1000L)
+ .withPartitionsSpec(new HashedPartitionsSpec(null, 10, null))
+ .withIndexSpec(
+ IndexSpec.builder()
+
.withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
+
.withDimensionCompression(CompressionStrategy.UNCOMPRESSED)
+ .withMetricCompression(CompressionStrategy.LZF)
+ .withLongEncoding(LongEncodingStrategy.LONGS)
+ .build()
+ )
+ .withIndexSpecForIntermediatePersists(IndexSpec.getDefault())
+ .withMaxPendingPersists(1)
+ .withForceGuaranteedRollup(forceGuaranteedRollup)
+ .withReportParseExceptions(true)
+ .withPushTimeout(10000L)
+
.withSegmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .withMaxNumConcurrentSubTasks(10)
+ .withMaxRetry(100)
+ .withTaskStatusCheckPeriodMs(20L)
+ .withChatHandlerTimeout(new Duration(3600))
+ .withChatHandlerNumRetries(128)
+ .withLogParseExceptions(false)
+ .build();
+
+ final ParallelIndexIngestionSpec indexIngestionSpec = new
ParallelIndexIngestionSpec(
+ DataSchema.builder()
+ .withDataSource("datasource")
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withDimensions(DimensionsSpec.EMPTY)
+ .build(),
+ ioConfig,
+ tuningConfig
+ );
+
+ final String supervisorTaskId = "index_parallel_cleanup_supervisor_id";
+ TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ final DataSegmentKiller killer =
EasyMock.createStrictMock(DataSegmentKiller.class);
+ EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+ EasyMock.expectLastCall();
+ EasyMock.replay(toolbox, killer);
+
+ new ParallelIndexSupervisorTask(
+ supervisorTaskId,
+ null,
+ null,
+ indexIngestionSpec,
+ null,
+ null,
+ true
+ ).cleanUp(toolbox, null);
+
+ EasyMock.verify(toolbox, killer);
+ }
+
+ @Test
+ public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws
Exception
+ {
+ final ParallelIndexIngestionSpec indexIngestionSpec =
buildParallelIngestionSpecForCleanUpTests();
+
+ final String supervisorTaskId = "index_parallel_ds_2024-01-01";
+ final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+ final DataSegmentKiller killer =
EasyMock.createStrictMock(DataSegmentKiller.class);
+
+ EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+ EasyMock.expectLastCall();
+ EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
+ EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
+ EasyMock.replay(toolbox, taskConfig, killer);
+
+ new ParallelIndexSupervisorTask(
+ supervisorTaskId,
+ null,
+ null,
+ indexIngestionSpec,
+ null,
+ null,
+ false
+ ).cleanUp(toolbox, null);
+
+ EasyMock.verify(toolbox, taskConfig, killer);
+ }
+
+ @Test
+ public void testCleanUp_killRecursivelyFailureDoesNotAbortCleanUp() throws
Exception
+ {
+ final ParallelIndexIngestionSpec indexIngestionSpec =
buildParallelIngestionSpecForCleanUpTests();
+
+ final String supervisorTaskId =
"index_parallel_deep_storage_cleanup_fail";
+ final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+ final DataSegmentKiller killer =
EasyMock.createStrictMock(DataSegmentKiller.class);
+
+ EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+ EasyMock.expectLastCall().andThrow(new IOException("deep storage cleanup
failed"));
+ EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
+ EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
+ EasyMock.replay(toolbox, taskConfig, killer);
+
+ new ParallelIndexSupervisorTask(
+ supervisorTaskId,
+ null,
+ null,
+ indexIngestionSpec,
+ null,
+ null,
+ false
+ ).cleanUp(toolbox, null);
+
+ EasyMock.verify(toolbox, taskConfig, killer);
+ }
+
+ private static ParallelIndexIngestionSpec
buildParallelIngestionSpecForCleanUpTests()
+ {
+ final boolean appendToExisting = false;
+ final boolean forceGuaranteedRollup = true;
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ new InlineInputSource("test"),
+ new JsonInputFormat(null, null, null, null, null),
+ appendToExisting,
+ null
+ );
+ final ParallelIndexTuningConfig tuningConfig = TuningConfigBuilder
+ .forParallelIndexTask()
+ .withMaxRowsInMemory(10)
+ .withMaxBytesInMemory(1000L)
+ .withPartitionsSpec(new HashedPartitionsSpec(null, 10, null))
+ .withIndexSpec(
+ IndexSpec.builder()
+
.withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
+
.withDimensionCompression(CompressionStrategy.UNCOMPRESSED)
+ .withMetricCompression(CompressionStrategy.LZF)
+ .withLongEncoding(LongEncodingStrategy.LONGS)
+ .build()
+ )
+ .withIndexSpecForIntermediatePersists(IndexSpec.getDefault())
+ .withMaxPendingPersists(1)
+ .withForceGuaranteedRollup(forceGuaranteedRollup)
+ .withReportParseExceptions(true)
+ .withPushTimeout(10000L)
+
.withSegmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .withMaxNumConcurrentSubTasks(10)
+ .withMaxRetry(100)
+ .withTaskStatusCheckPeriodMs(20L)
+ .withChatHandlerTimeout(new Duration(3600))
+ .withChatHandlerNumRetries(128)
+ .withLogParseExceptions(false)
+ .build();
+
+ return new ParallelIndexIngestionSpec(
+ DataSchema.builder()
+ .withDataSource("datasource")
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withDimensions(DimensionsSpec.EMPTY)
+ .build(),
+ ioConfig,
+ tuningConfig
+ );
}
private PartitionStat createRangePartitionStat(Interval interval, int
bucketId)
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
index cca09190481..29ae993d8e9 100644
---
a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
+++
b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java
@@ -98,4 +98,16 @@ public interface DataSegmentKiller
* is only implemented by local and HDFS.
*/
void killAll() throws IOException;
+
+ /**
+ * Recursively removes a directory (or object-store prefix) under the
configured deep storage root. The path is
+ * relative to that root: no leading slash, no {@code ..} segments, no
backslashes. If the path does not exist, this
+ * is a no-op. The default implementation does nothing; extensions that
cannot recurse should keep the default.
+ * HDFS currently only implements this method.
+ *
+ * @throws IOException if deletion fails
+ */
+ default void killRecursively(String relativePath) throws IOException
+ {
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
index a3017a5cfb0..f0afa5d1963 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -99,6 +100,27 @@ public class OmniDataSegmentKiller implements
DataSegmentKiller
throw new UnsupportedOperationException("not implemented");
}
+ @Override
+ public void killRecursively(String relativePath) throws IOException
+ {
+ IOException firstFailure = null;
+ for (Supplier<DataSegmentKiller> supplier : killers.values()) {
+ try {
+ supplier.get().killRecursively(relativePath);
+ }
+ catch (IOException e) {
+ if (firstFailure == null) {
+ firstFailure = e;
+ } else {
+ firstFailure.addSuppressed(e);
+ }
+ }
+ }
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
+ }
+
@VisibleForTesting
public Map<String, Supplier<DataSegmentKiller>> getKillers()
{
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
index 2038f3a8d7e..97e9f402554 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java
@@ -37,6 +37,7 @@ import org.mockito.Mockito;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -139,6 +140,23 @@ public class OmniDataSegmentKillerTest
segmentKiller.kill(tombstone);
}
+ @Test
+ public void testKillRecursively_delegatesToAllKillers() throws IOException
+ {
+ final DataSegmentKiller killerA = Mockito.mock(DataSegmentKiller.class);
+ final DataSegmentKiller killerB = Mockito.mock(DataSegmentKiller.class);
+ final Injector injector = createInjectorFromMap(
+ ImmutableMap.of("type_a", killerA, "type_b", killerB)
+ );
+ final OmniDataSegmentKiller segmentKiller =
injector.getInstance(OmniDataSegmentKiller.class);
+
+ final String relativePath = "intermediate/batch_1";
+ segmentKiller.killRecursively(relativePath);
+
+ Mockito.verify(killerA).killRecursively(relativePath);
+ Mockito.verify(killerB).killRecursively(relativePath);
+ }
+
@Test
public void testKillMultipleSegmentsWithType() throws SegmentLoadingException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]