This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c0e491c843e add missing compaction status evaluator for projections 
(#17905)
c0e491c843e is described below

commit c0e491c843ee7ed08d8a5827802c5d3d488c0db7
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Apr 14 05:03:02 2025 -0700

    add missing compaction status evaluator for projections (#17905)
---
 .../druid/server/compaction/CompactionStatus.java  |  13 ++-
 .../server/compaction/CompactionStatusTest.java    | 110 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index 77786993ac9..98194eefdbd 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -73,7 +73,8 @@ public class CompactionStatus
       Evaluator::rollupIsUpToDate,
       Evaluator::dimensionsSpecIsUpToDate,
       Evaluator::metricsSpecIsUpToDate,
-      Evaluator::transformSpecFilterIsUpToDate
+      Evaluator::transformSpecFilterIsUpToDate,
+      Evaluator::projectionsAreUpToDate
   );
 
   private final State state;
@@ -340,6 +341,16 @@ public class CompactionStatus
       );
     }
 
+    private CompactionStatus projectionsAreUpToDate()
+    {
+      return CompactionStatus.completeIfEqual(
+          "projections",
+          compactionConfig.getProjections(),
+          lastCompactionState.getProjections(),
+          String::valueOf
+      );
+    }
+
     private CompactionStatus segmentGranularityIsUpToDate()
     {
       if (configuredGranularitySpec == null
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
index 836a0a0b190..41d9a8d3d5f 100644
--- 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
@@ -21,6 +21,9 @@ package org.apache.druid.server.compaction;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.indexer.granularity.GranularitySpec;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@@ -30,8 +33,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -43,6 +49,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.List;
 
 public class CompactionStatusTest
 {
@@ -321,6 +328,109 @@ public class CompactionStatusTest
     Assert.assertTrue(status.isComplete());
   }
 
+  @Test
+  public void testStatusWhenProjectionsMatch()
+  {
+    final GranularitySpec currentGranularitySpec
+        = new UniformGranularitySpec(Granularities.HOUR, null, null);
+    final PartitionsSpec currentPartitionsSpec = new 
DynamicPartitionsSpec(100, 0L);
+    final IndexSpec currentIndexSpec
+        = 
IndexSpec.builder().withDimensionCompression(CompressionStrategy.ZSTD).build();
+    final AggregateProjectionSpec projection1 = new AggregateProjectionSpec(
+        "foo",
+        VirtualColumns.create(
+            Granularities.toVirtualColumn(Granularities.HOUR, 
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+        ),
+        List.of(
+            new 
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+            new StringDimensionSchema("a")
+        ),
+        new AggregatorFactory[]{
+            new LongSumAggregatorFactory("sum_long", "long")
+        }
+    );
+    final CompactionState lastCompactionState = new CompactionState(
+        currentPartitionsSpec,
+        null,
+        null,
+        null,
+        currentIndexSpec,
+        currentGranularitySpec,
+        List.of(projection1)
+    );
+    final DataSourceCompactionConfig compactionConfig = 
InlineSchemaDataSourceCompactionConfig
+        .builder()
+        .forDataSource(TestDataSource.WIKI)
+        .withTuningConfig(createTuningConfig(currentPartitionsSpec, 
currentIndexSpec))
+        .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+        .withProjections(List.of(projection1))
+        .build();
+
+    final DataSegment segment = 
DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build();
+    final CompactionStatus status = CompactionStatus.compute(
+        CompactionCandidate.from(Collections.singletonList(segment)),
+        compactionConfig,
+        OBJECT_MAPPER
+    );
+    Assert.assertTrue(status.isComplete());
+  }
+
+  @Test
+  public void testStatusWhenProjectionsMismatch()
+  {
+    final GranularitySpec currentGranularitySpec
+        = new UniformGranularitySpec(Granularities.HOUR, null, null);
+    final PartitionsSpec currentPartitionsSpec = new 
DynamicPartitionsSpec(100, 0L);
+    final IndexSpec currentIndexSpec
+        = 
IndexSpec.builder().withDimensionCompression(CompressionStrategy.ZSTD).build();
+    final AggregateProjectionSpec projection1 = new AggregateProjectionSpec(
+        "1",
+        VirtualColumns.create(
+            Granularities.toVirtualColumn(Granularities.HOUR, 
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+        ),
+        List.of(
+            new 
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+            new StringDimensionSchema("a")
+        ),
+        new AggregatorFactory[]{
+            new LongSumAggregatorFactory("sum_long", "long")
+        }
+    );
+    final AggregateProjectionSpec projection2 = new AggregateProjectionSpec(
+        "2",
+        VirtualColumns.EMPTY,
+        Collections.emptyList(),
+        new AggregatorFactory[]{
+            new LongSumAggregatorFactory("sum_long", "long")
+        }
+    );
+
+    final CompactionState lastCompactionState = new CompactionState(
+        currentPartitionsSpec,
+        null,
+        null,
+        null,
+        currentIndexSpec,
+        currentGranularitySpec,
+        List.of(projection1)
+    );
+    final DataSourceCompactionConfig compactionConfig = 
InlineSchemaDataSourceCompactionConfig
+        .builder()
+        .forDataSource(TestDataSource.WIKI)
+        .withTuningConfig(createTuningConfig(currentPartitionsSpec, 
currentIndexSpec))
+        .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+        .withProjections(List.of(projection1, projection2))
+        .build();
+
+    final DataSegment segment = 
DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build();
+    final CompactionStatus status = CompactionStatus.compute(
+        CompactionCandidate.from(Collections.singletonList(segment)),
+        compactionConfig,
+        OBJECT_MAPPER
+    );
+    Assert.assertFalse(status.isComplete());
+  }
+
   private void verifyCompactionStatusIsPendingBecause(
       CompactionState lastCompactionState,
       DataSourceCompactionConfig compactionConfig,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to