This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7ef444c1cc0 bugfix: Create tombstones when needed while doing REPLACE
mode with range partitioning plus parallel indexing (#18938)
7ef444c1cc0 is described below
commit 7ef444c1cc075e8935bfd9372729befd9c2d1bf6
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Feb 4 11:24:18 2026 -0600
bugfix: Create tombstones when needed while doing REPLACE mode with range
partitioning plus parallel indexing (#18938)
* Create tombstones for range and hashed partitioning when everything has
been filtered out
* MSQ compaction doesn't support hash partitioning
* cleanup test file
* Cleanup verbose comments in test code
* Hashed partitioning doesn't actually need the special handling
* fix checkstyle
* test coverage
---
.../embedded/compact/CompactionSupervisorTest.java | 140 +++++++++++++++++++++
.../parallel/ParallelIndexSupervisorTask.java | 14 ++-
.../common/task/CompactionTaskParallelRunTest.java | 102 +++++++++++++++
3 files changed, 254 insertions(+), 2 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index 5355f1601ae..9aba7f75bac 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -24,6 +24,7 @@ import
org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
@@ -31,14 +32,18 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
+import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -56,6 +61,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -268,6 +274,121 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
}
+
+ /**
+ * Tests that when a compaction task filters out all rows using a transform
spec,
+ * tombstones are created to properly drop the old segments. This test
covers both
+ * hash and range partitioning strategies.
+ *
+ * This regression test addresses a bug where compaction with transforms
that filter
+ * all rows would succeed but not create tombstones, leaving old segments
visible
+ * and causing indefinite compaction retries.
+ */
+ @MethodSource("getEngineAndPartitionType")
+ @ParameterizedTest(name = "compactionEngine={0}, partitionType={1}")
+ public void test_compactionWithTransformFilteringAllRows_createsTombstones(
+ CompactionEngine compactionEngine,
+ String partitionType
+ )
+ {
+ configureCompaction(compactionEngine);
+
+ runIngestionAtGranularity(
+ "DAY",
+ "2025-06-01T00:00:00.000Z,hat,105"
+ + "\n2025-06-02T00:00:00.000Z,shirt,210"
+ + "\n2025-06-03T00:00:00.000Z,shirt,150"
+ );
+
+ int initialSegmentCount = getNumSegmentsWith(Granularities.DAY);
+ Assertions.assertEquals(3, initialSegmentCount, "Should have 3 initial
segments");
+
+ InlineSchemaDataSourceCompactionConfig.Builder builder =
InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(dataSource)
+ .withSkipOffsetFromLatest(Period.seconds(0))
+ .withGranularitySpec(
+ new UserCompactionTaskGranularityConfig(Granularities.DAY, null,
null)
+ )
+ .withTransformSpec(
+ // This filter drops all rows: expression "false" always evaluates
to false
+ new CompactionTransformSpec(
+ new NotDimFilter(new SelectorDimFilter("item", "shirt", null))
+ )
+ );
+
+ if (compactionEngine == CompactionEngine.NATIVE) {
+ builder = builder.withIoConfig(
+ // Enable REPLACE mode to create tombstones when no segments are
produced
+ new UserCompactionTaskIOConfig(true)
+ );
+ }
+
+ // Add partitioning spec based on test parameter
+ if ("range".equals(partitionType)) {
+ builder.withTuningConfig(
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ new DimensionRangePartitionsSpec(null, 5000, List.of("item"),
false),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ );
+ } else {
+ // Hash partitioning
+ builder.withTuningConfig(
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ new HashedPartitionsSpec(null, null, null),
+ null,
+ null,
+ null,
+ null,
+ null,
+ 2,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ );
+ }
+
+ InlineSchemaDataSourceCompactionConfig compactionConfig = builder.build();
+
+ runCompactionWithSpec(compactionConfig);
+ waitForAllCompactionTasksToFinish();
+
+ int finalSegmentCount = getNumSegmentsWith(Granularities.DAY);
+ Assertions.assertEquals(
+ 1,
+ finalSegmentCount,
+ "2 of 3 segments should be dropped via tombstones when transform
filters all rows where item = 'shirt'"
+ );
+ }
+
private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
{
overlord
@@ -352,6 +473,7 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.stream()
+ .filter(segment -> !segment.isTombstone())
.filter(segment -> granularity.isAligned(segment.getInterval()))
.count();
}
@@ -374,4 +496,22 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
{
return List.of(CompactionEngine.NATIVE, CompactionEngine.MSQ);
}
+
+ /**
+ * Provides test parameters for both compaction engines and partition types.
+ */
+ public static List<Object[]> getEngineAndPartitionType()
+ {
+ List<Object[]> params = new ArrayList<>();
+ for (CompactionEngine engine : List.of(CompactionEngine.NATIVE,
CompactionEngine.MSQ)) {
+ for (String partitionType : List.of("range", "hash")) {
+ if (engine == CompactionEngine.MSQ && "hash".equals(partitionType)) {
+ // MSQ does not support hash partitioning in this context
+ continue;
+ }
+ params.add(new Object[]{engine, partitionType});
+ }
+ }
+ return params;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 02f11e8f5e2..c4f37192ee1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -761,6 +761,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
if (cardinalityRunner.getReports().isEmpty()) {
String msg = "No valid rows for hash partitioning."
+ " All rows may have invalid timestamps or have been
filtered out.";
+
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
@@ -876,8 +877,17 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
if (intervalToPartitions.isEmpty()) {
String msg = "No valid rows for range partitioning."
+ " All rows may have invalid timestamps or multiple
dimension values.";
- LOG.warn(msg);
- return TaskStatus.success(getId(), msg);
+
+ if (getIngestionMode() == IngestionMode.REPLACE) {
+ // In REPLACE mode, publish segments (and tombstones, when called
for) even when no new data was produced
+ publishSegments(toolbox, Collections.emptyMap());
+ TaskStatus taskStatus = TaskStatus.success(getId(), msg);
+ updateAndWriteCompletionReports(taskStatus);
+ return taskStatus;
+ } else {
+ LOG.warn(msg);
+ return TaskStatus.success(getId(), msg);
+ }
}
}
catch (Exception e) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 53a7259f5bc..3fed0194f4d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -1003,6 +1003,108 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
}
}
+ @Test
+ public void testRunParallelWithRangePartitioningFilteringAllRows() throws
Exception
+ {
+ allowSegmentFetchesByCompactionTask = true;
+
+ // Range partitioning is not supported with segment lock yet
+ Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+
+ runIndexTask(null, true);
+
+ Collection<DataSegment> usedSegments =
getCoordinatorClient().fetchUsedSegments(
+ DATA_SOURCE,
+ ImmutableList.of(INTERVAL_TO_INDEX)
+ ).get();
+ Assert.assertEquals(3, usedSegments.size());
+
+ // Compact with a transform that filters out ALL rows
+ final Builder builder = new Builder(DATA_SOURCE,
getSegmentCacheManagerFactory());
+ final CompactionTask compactionTask = builder
+ .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
// dropExisting=true
+ .tuningConfig(newTuningConfig(
+ new SingleDimensionPartitionsSpec(7, null, "dim", false),
+ 2,
+ true
+ ))
+ .transformSpec(new CompactionTransformSpec(
+ new SelectorDimFilter("dim", "nonexistent_value", null) // Filters
out all rows
+ ))
+ .build();
+
+ runTask(compactionTask);
+
+ usedSegments = getCoordinatorClient().fetchUsedSegments(
+ DATA_SOURCE,
+ ImmutableList.of(INTERVAL_TO_INDEX)
+ ).get();
+
+ Assert.assertNotNull(usedSegments);
+
+ int tombstoneCount = 0;
+ for (DataSegment segment : usedSegments) {
+ if (segment.isTombstone()) {
+ tombstoneCount++;
+ }
+ }
+
+ Assert.assertTrue("Expected tombstones when all rows filtered in REPLACE
mode", tombstoneCount > 0);
+ }
+
+ @Test
+ public void testRunParallelRangePartitioningFilterAllRowsReplaceLegacyMode()
throws Exception
+ {
+ allowSegmentFetchesByCompactionTask = true;
+
+ Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+
+ runIndexTask(null, true);
+
+ Collection<DataSegment> usedSegments =
getCoordinatorClient().fetchUsedSegments(
+ DATA_SOURCE,
+ ImmutableList.of(INTERVAL_TO_INDEX)
+ ).get();
+ Assert.assertEquals(3, usedSegments.size());
+
+ final Builder builder = new Builder(DATA_SOURCE,
getSegmentCacheManagerFactory());
+ final CompactionTask compactionTask = builder
+ .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), false)
// dropExisting=false -> REPLACE_LEGACY mode
+ .tuningConfig(newTuningConfig(
+ new SingleDimensionPartitionsSpec(7, null, "dim", false),
+ 2,
+ true
+ ))
+ .transformSpec(new CompactionTransformSpec(
+ new SelectorDimFilter("dim", "nonexistent_value", null) // Filters
all rows
+ ))
+ .build();
+
+ runTask(compactionTask);
+
+ // In REPLACE_LEGACY mode, should NOT create tombstones when all rows
filtered
+ // Original segments should remain unchanged
+ usedSegments = getCoordinatorClient().fetchUsedSegments(
+ DATA_SOURCE,
+ ImmutableList.of(INTERVAL_TO_INDEX)
+ ).get();
+
+ Assert.assertNotNull(usedSegments);
+
+ Assert.assertEquals(
+ "Original segments should remain in REPLACE_LEGACY mode when all rows
filtered",
+ 3,
+ usedSegments.size()
+ );
+
+ for (DataSegment segment : usedSegments) {
+ Assert.assertFalse(
+ "No tombstones should be created in REPLACE_LEGACY mode",
+ segment.isTombstone()
+ );
+ }
+ }
+
@Override
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient
actionClient) throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]