This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ef444c1cc0 bugfix: Create tombstones when needed while doing REPLACE 
mode with range partitioning plus parallel indexing (#18938)
7ef444c1cc0 is described below

commit 7ef444c1cc075e8935bfd9372729befd9c2d1bf6
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Feb 4 11:24:18 2026 -0600

    bugfix: Create tombstones when needed while doing REPLACE mode with range 
partitioning plus parallel indexing (#18938)
    
    * Create tombstones for range and hashed partitioning when everything has 
been filtered out
    
    * MSQ compaction doesn't support hash partitioning
    
    * cleanup test file
    
    * Cleanup verbose comments in test code
    
    * Hashed partitioning doesn't actually need the special handling
    
    * fix checkstyle
    
    * test coverage
---
 .../embedded/compact/CompactionSupervisorTest.java | 140 +++++++++++++++++++++
 .../parallel/ParallelIndexSupervisorTask.java      |  14 ++-
 .../common/task/CompactionTaskParallelRunTest.java | 102 +++++++++++++++
 3 files changed, 254 insertions(+), 2 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index 5355f1601ae..9aba7f75bac 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -24,6 +24,7 @@ import 
org.apache.druid.catalog.guice.CatalogCoordinatorModule;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
 import org.apache.druid.indexing.overlord.Segments;
@@ -31,14 +32,18 @@ import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.rpc.UpdateResponse;
 import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
 import org.apache.druid.segment.metadata.IndexingStateCache;
 import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
+import org.apache.druid.segment.transform.CompactionTransformSpec;
 import org.apache.druid.server.coordinator.ClusterCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -56,6 +61,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -268,6 +274,121 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
   }
 
+
+  /**
+   * Tests that when a compaction task filters out all rows using a transform 
spec,
+   * tombstones are created to properly drop the old segments. This test 
covers both
+   * hash and range partitioning strategies.
+   *
+   * This regression test addresses a bug where compaction with transforms 
that filter
+   * all rows would succeed but not create tombstones, leaving old segments 
visible
+   * and causing indefinite compaction retries.
+   */
+  @MethodSource("getEngineAndPartitionType")
+  @ParameterizedTest(name = "compactionEngine={0}, partitionType={1}")
+  public void test_compactionWithTransformFilteringAllRows_createsTombstones(
+      CompactionEngine compactionEngine,
+      String partitionType
+  )
+  {
+    configureCompaction(compactionEngine);
+
+    runIngestionAtGranularity(
+        "DAY",
+        "2025-06-01T00:00:00.000Z,hat,105"
+        + "\n2025-06-02T00:00:00.000Z,shirt,210"
+        + "\n2025-06-03T00:00:00.000Z,shirt,150"
+    );
+
+    int initialSegmentCount = getNumSegmentsWith(Granularities.DAY);
+    Assertions.assertEquals(3, initialSegmentCount, "Should have 3 initial 
segments");
+
+    InlineSchemaDataSourceCompactionConfig.Builder builder = 
InlineSchemaDataSourceCompactionConfig
+        .builder()
+        .forDataSource(dataSource)
+        .withSkipOffsetFromLatest(Period.seconds(0))
+        .withGranularitySpec(
+            new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
null)
+        )
+        .withTransformSpec(
+            // This filter drops all rows: expression "false" always evaluates 
to false
+            new CompactionTransformSpec(
+                new NotDimFilter(new SelectorDimFilter("item", "shirt", null))
+            )
+        );
+
+    if (compactionEngine == CompactionEngine.NATIVE) {
+      builder = builder.withIoConfig(
+          // Enable REPLACE mode to create tombstones when no segments are 
produced
+          new UserCompactionTaskIOConfig(true)
+      );
+    }
+
+    // Add partitioning spec based on test parameter
+    if ("range".equals(partitionType)) {
+      builder.withTuningConfig(
+          new UserCompactionTaskQueryTuningConfig(
+              null,
+              null,
+              null,
+              null,
+              null,
+              new DimensionRangePartitionsSpec(null, 5000, List.of("item"), 
false),
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null
+          )
+      );
+    } else {
+      // Hash partitioning
+      builder.withTuningConfig(
+          new UserCompactionTaskQueryTuningConfig(
+              null,
+              null,
+              null,
+              null,
+              null,
+              new HashedPartitionsSpec(null, null, null),
+              null,
+              null,
+              null,
+              null,
+              null,
+              2,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null
+          )
+      );
+    }
+
+    InlineSchemaDataSourceCompactionConfig compactionConfig = builder.build();
+
+    runCompactionWithSpec(compactionConfig);
+    waitForAllCompactionTasksToFinish();
+
+    int finalSegmentCount = getNumSegmentsWith(Granularities.DAY);
+    Assertions.assertEquals(
+        1,
+        finalSegmentCount,
+        "2 of 3 segments should be dropped via tombstones when transform 
filters all rows where item = 'shirt'"
+    );
+  }
+
   private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
   {
     overlord
@@ -352,6 +473,7 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
         .segmentsMetadataStorage()
         .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
         .stream()
+        .filter(segment -> !segment.isTombstone())
         .filter(segment -> granularity.isAligned(segment.getInterval()))
         .count();
   }
@@ -374,4 +496,22 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
   {
     return List.of(CompactionEngine.NATIVE, CompactionEngine.MSQ);
   }
+
+  /**
+   * Provides test parameters for both compaction engines and partition types.
+   */
+  public static List<Object[]> getEngineAndPartitionType()
+  {
+    List<Object[]> params = new ArrayList<>();
+    for (CompactionEngine engine : List.of(CompactionEngine.NATIVE, 
CompactionEngine.MSQ)) {
+      for (String partitionType : List.of("range", "hash")) {
+        if (engine == CompactionEngine.MSQ && "hash".equals(partitionType)) {
+          // MSQ does not support hash partitioning in this context
+          continue;
+        }
+        params.add(new Object[]{engine, partitionType});
+      }
+    }
+    return params;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 02f11e8f5e2..c4f37192ee1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -761,6 +761,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
       if (cardinalityRunner.getReports().isEmpty()) {
         String msg = "No valid rows for hash partitioning."
                      + " All rows may have invalid timestamps or have been 
filtered out.";
+
         LOG.warn(msg);
         return TaskStatus.success(getId(), msg);
       }
@@ -876,8 +877,17 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
       if (intervalToPartitions.isEmpty()) {
         String msg = "No valid rows for range partitioning."
                      + " All rows may have invalid timestamps or multiple 
dimension values.";
-        LOG.warn(msg);
-        return TaskStatus.success(getId(), msg);
+
+        if (getIngestionMode() == IngestionMode.REPLACE) {
+          // In REPLACE mode, publish segments (and tombstones, when called 
for) even when no new data was produced
+          publishSegments(toolbox, Collections.emptyMap());
+          TaskStatus taskStatus = TaskStatus.success(getId(), msg);
+          updateAndWriteCompletionReports(taskStatus);
+          return taskStatus;
+        } else {
+          LOG.warn(msg);
+          return TaskStatus.success(getId(), msg);
+        }
       }
     }
     catch (Exception e) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 53a7259f5bc..3fed0194f4d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -1003,6 +1003,108 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
     }
   }
 
+  @Test
+  public void testRunParallelWithRangePartitioningFilteringAllRows() throws 
Exception
+  {
+    allowSegmentFetchesByCompactionTask = true;
+
+    // Range partitioning is not supported with segment lock yet
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+
+    runIndexTask(null, true);
+
+    Collection<DataSegment> usedSegments = 
getCoordinatorClient().fetchUsedSegments(
+        DATA_SOURCE,
+        ImmutableList.of(INTERVAL_TO_INDEX)
+    ).get();
+    Assert.assertEquals(3, usedSegments.size());
+
+    // Compact with a transform that filters out ALL rows
+    final Builder builder = new Builder(DATA_SOURCE, 
getSegmentCacheManagerFactory());
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true) 
// dropExisting=true
+        .tuningConfig(newTuningConfig(
+            new SingleDimensionPartitionsSpec(7, null, "dim", false),
+            2,
+            true
+        ))
+        .transformSpec(new CompactionTransformSpec(
+            new SelectorDimFilter("dim", "nonexistent_value", null) // Filters 
out all rows
+        ))
+        .build();
+
+    runTask(compactionTask);
+
+    usedSegments = getCoordinatorClient().fetchUsedSegments(
+        DATA_SOURCE,
+        ImmutableList.of(INTERVAL_TO_INDEX)
+    ).get();
+
+    Assert.assertNotNull(usedSegments);
+
+    int tombstoneCount = 0;
+    for (DataSegment segment : usedSegments) {
+      if (segment.isTombstone()) {
+        tombstoneCount++;
+      }
+    }
+
+    Assert.assertTrue("Expected tombstones when all rows filtered in REPLACE 
mode", tombstoneCount > 0);
+  }
+
+  @Test
+  public void testRunParallelRangePartitioningFilterAllRowsReplaceLegacyMode() 
throws Exception
+  {
+    allowSegmentFetchesByCompactionTask = true;
+
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+
+    runIndexTask(null, true);
+
+    Collection<DataSegment> usedSegments = 
getCoordinatorClient().fetchUsedSegments(
+        DATA_SOURCE,
+        ImmutableList.of(INTERVAL_TO_INDEX)
+    ).get();
+    Assert.assertEquals(3, usedSegments.size());
+
+    final Builder builder = new Builder(DATA_SOURCE, 
getSegmentCacheManagerFactory());
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), false) 
// dropExisting=false -> REPLACE_LEGACY mode
+        .tuningConfig(newTuningConfig(
+            new SingleDimensionPartitionsSpec(7, null, "dim", false),
+            2,
+            true
+        ))
+        .transformSpec(new CompactionTransformSpec(
+            new SelectorDimFilter("dim", "nonexistent_value", null) // Filters 
all rows
+        ))
+        .build();
+
+    runTask(compactionTask);
+
+    // In REPLACE_LEGACY mode, should NOT create tombstones when all rows 
filtered
+    // Original segments should remain unchanged
+    usedSegments = getCoordinatorClient().fetchUsedSegments(
+        DATA_SOURCE,
+        ImmutableList.of(INTERVAL_TO_INDEX)
+    ).get();
+
+    Assert.assertNotNull(usedSegments);
+
+    Assert.assertEquals(
+        "Original segments should remain in REPLACE_LEGACY mode when all rows 
filtered",
+        3,
+        usedSegments.size()
+    );
+
+    for (DataSegment segment : usedSegments) {
+      Assert.assertFalse(
+          "No tombstones should be created in REPLACE_LEGACY mode",
+          segment.isTombstone()
+      );
+    }
+  }
+
   @Override
   protected TaskToolbox createTaskToolbox(Task task, TaskActionClient 
actionClient) throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to