Copilot commented on code in PR #19187:
URL: https://github.com/apache/druid/pull/19187#discussion_r2988737172
##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java:
##########
@@ -39,6 +39,15 @@ public class DeepStorageIntermediaryDataManager implements
IntermediaryDataManag
public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
private final DataSegmentPusher dataSegmentPusher;
+ /**
+ * Deep storage path (relative to the extension root) of the directory that
holds all shuffle intermediate files for
+ * {@code supervisorTaskId}, matching the prefix used by {@link #addSegment}.
+ */
+ public static String retrieveShuffleDataStoragePath(String supervisorTaskId)
+ {
+ return SHUFFLE_DATA_DIR_PREFIX + "/" + supervisorTaskId;
+ }
Review Comment:
The Javadoc for `retrieveShuffleDataStoragePath` says the returned path is
relative to the "extension root", but the actual root is the configured deep
storage root (e.g., `druid.storage.storageDirectory` / the pusher's base
directory). Consider updating the wording to match the configuration
terminology used elsewhere to avoid confusion.
##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java:
##########
@@ -148,4 +148,58 @@ private void removeEmptyParentDirectories(final FileSystem
fs, final Path segmen
log.makeAlert(e, "uncaught exception during segment killer").emit();
}
}
+
+ @Override
+ public void killRecursively(String relativePath) throws IOException
+ {
+ final Path dirToDelete = constructHdfsDeletePath(relativePath);
+ if (dirToDelete == null) {
+ return;
+ }
+ final FileSystem fs = dirToDelete.getFileSystem(config);
+ if (!fs.exists(dirToDelete)) {
+ return;
+ }
+ log.info("Deleting deep storage directory[%s]", dirToDelete);
+ if (!fs.delete(dirToDelete, true)) {
+ throw new IOException("Failed to delete deep storage directory.");
Review Comment:
In killRecursively, the thrown IOException message is too generic and loses
important context (which path failed and why). Include the target directory in
the message (and ideally wrap/propagate the underlying IOException if
available) so operators can diagnose failures from logs and task reports.
```suggestion
throw new IOException("Failed to delete deep storage directory [" +
dirToDelete + "].");
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -510,6 +517,181 @@ public void testCompactionTaskDoesntCleanup() throws
Exception
true
).cleanUp(toolbox, null);
+ EasyMock.verify(toolbox);
+ }
+
+ @Test
+ public void testCleanUpInvokesKillRecursivelyForIntermediates() throws
Exception
+ {
+ final boolean appendToExisting = false;
+ final boolean forceGuaranteedRollup = true;
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ new InlineInputSource("test"),
+ new JsonInputFormat(null, null, null, null, null),
+ appendToExisting,
+ null
+ );
+ final ParallelIndexTuningConfig tuningConfig = TuningConfigBuilder
+ .forParallelIndexTask()
+ .withMaxRowsInMemory(10)
+ .withMaxBytesInMemory(1000L)
+ .withPartitionsSpec(new HashedPartitionsSpec(null, 10, null))
+ .withIndexSpec(
+ IndexSpec.builder()
+
.withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
+
.withDimensionCompression(CompressionStrategy.UNCOMPRESSED)
+ .withMetricCompression(CompressionStrategy.LZF)
+ .withLongEncoding(LongEncodingStrategy.LONGS)
+ .build()
+ )
+ .withIndexSpecForIntermediatePersists(IndexSpec.getDefault())
+ .withMaxPendingPersists(1)
+ .withForceGuaranteedRollup(forceGuaranteedRollup)
+ .withReportParseExceptions(true)
+ .withPushTimeout(10000L)
+
.withSegmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .withMaxNumConcurrentSubTasks(10)
+ .withMaxRetry(100)
+ .withTaskStatusCheckPeriodMs(20L)
+ .withChatHandlerTimeout(new Duration(3600))
+ .withChatHandlerNumRetries(128)
+ .withLogParseExceptions(false)
+ .build();
+
+ final ParallelIndexIngestionSpec indexIngestionSpec = new
ParallelIndexIngestionSpec(
+ DataSchema.builder()
+ .withDataSource("datasource")
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withDimensions(DimensionsSpec.EMPTY)
+ .build(),
+ ioConfig,
+ tuningConfig
+ );
+
+ final String supervisorTaskId = "index_parallel_cleanup_supervisor_id";
+ TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ final DataSegmentKiller killer =
EasyMock.createStrictMock(DataSegmentKiller.class);
+ EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+ EasyMock.expectLastCall();
+ EasyMock.replay(toolbox, killer);
+
+ new ParallelIndexSupervisorTask(
+ supervisorTaskId,
+ null,
+ null,
+ indexIngestionSpec,
+ null,
+ null,
+ true
+ ).cleanUp(toolbox, null);
+
+ EasyMock.verify(toolbox, killer);
+ }
+
+ @Test
+ public void testCleanUp_nonCompactionRunsAbstractTaskCleanUp() throws
Exception
+ {
+ final ParallelIndexIngestionSpec indexIngestionSpec =
buildParallelIngestionSpecForCleanUpTests();
+
+ final String supervisorTaskId = "index_parallel_ds_2024-01-01";
+ final TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ final TaskConfig taskConfig = EasyMock.createNiceMock(TaskConfig.class);
+ final DataSegmentKiller killer =
EasyMock.createStrictMock(DataSegmentKiller.class);
+
+ EasyMock.expect(toolbox.getDataSegmentKiller()).andReturn(killer);
+
killer.killRecursively(DeepStorageIntermediaryDataManager.retrieveShuffleDataStoragePath(supervisorTaskId));
+ EasyMock.expectLastCall();
+ EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig).anyTimes();
+
EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false).anyTimes();
+ EasyMock.replay(toolbox, taskConfig, killer);
Review Comment:
This test name implies it verifies that `super.cleanUp` is invoked for
non-compaction tasks, but the expectations on `toolbox.getConfig()` /
`taskConfig.isEncapsulatedTask()` are set to `anyTimes()`, so the test would
still pass even if `super.cleanUp` were not called. Tighten the expectations
(e.g., require at least one call) so the test actually guards the intended
behavior.
##########
docs/configuration/index.md:
##########
@@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are
passed through to Peons.
|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the peon.|0|
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal
priority in a FIFO manner.|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
-|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Druid does not support automated cleanup for the
`shuffle-data` directory. You can set up cloud storage lifecycle rules for
automated cleanup of data at the `shuffle-data` prefix location.|`local`|
+|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Druid only supports automated cleanup for the
directory with HDFS. For other deepstore types, you can set up cloud storage
lifecycle rules for automated cleanup of data at the shuffle-data prefix
location.|`local`|
Review Comment:
The sentence "Druid only supports automated cleanup for the directory with
HDFS" is ambiguous/grammatically unclear. Consider rephrasing to explicitly tie
the behavior to the deep storage type (e.g., automated cleanup is only
supported when deep storage is HDFS) and consistently format `shuffle-data` as
code.
```suggestion
|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Automated cleanup of the `shuffle-data` directory
is only supported when deep storage is HDFS-based. For other deepstore types,
you can set up cloud storage lifecycle rules for automated cleanup of data at
the `shuffle-data` prefix location.|`local`|
```
##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentKiller.java:
##########
@@ -148,4 +148,58 @@ private void removeEmptyParentDirectories(final FileSystem
fs, final Path segmen
log.makeAlert(e, "uncaught exception during segment killer").emit();
}
}
+
+ @Override
+ public void killRecursively(String relativePath) throws IOException
+ {
+ final Path dirToDelete = constructHdfsDeletePath(relativePath);
+ if (dirToDelete == null) {
+ return;
+ }
+ final FileSystem fs = dirToDelete.getFileSystem(config);
+ if (!fs.exists(dirToDelete)) {
+ return;
+ }
+ log.info("Deleting deep storage directory[%s]", dirToDelete);
+ if (!fs.delete(dirToDelete, true)) {
+ throw new IOException("Failed to delete deep storage directory.");
+ }
+ }
+
+ /**
+ * Construct a path to delete from HDFS. Returns null if the path is invalid.
+ * Replicates how {@link HdfsDataSegmentPusher#pushToPath} handles ':', by
replacing that with '_'.
+ */
+ @Nullable
+ private Path constructHdfsDeletePath(String relativePath)
+ {
+ if (Strings.isNullOrEmpty(relativePath)) {
+ log.warn("Skipping deep storage directory kill: relative path is empty");
+ return null;
+ }
+ if (relativePath.charAt(0) == '/') {
+ log.warn("Skipping deep storage directory kill: relative path must not
be absolute, got [%s]", relativePath);
+ return null;
+ }
+ if (relativePath.indexOf('\\') >= 0) {
+ log.warn("Skipping deep storage directory kill: backslash not allowed in
path [%s]", relativePath);
+ return null;
+ }
+ for (String segment : StringUtils.split(relativePath, '/')) {
+ if (segment.isEmpty() || "..".equals(segment)) {
+ log.warn("Skipping deep storage directory kill: invalid path[%s]",
relativePath);
+ return null;
+ }
+ }
Review Comment:
constructHdfsDeletePath attempts to reject empty path segments via
`segment.isEmpty()`, but
`org.apache.commons.lang3.StringUtils.split(relativePath, '/')` discards empty
tokens, making this branch effectively unreachable and allowing inputs like
`a//b` to pass validation. If the intent is to forbid empty segments, consider
using a split method that preserves empty tokens or add an explicit check for
`"//"` (and similar) before splitting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]