This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c0e491c843e add missing compaction status evaluator for projections
(#17905)
c0e491c843e is described below
commit c0e491c843ee7ed08d8a5827802c5d3d488c0db7
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Apr 14 05:03:02 2025 -0700
add missing compaction status evaluator for projections (#17905)
---
.../druid/server/compaction/CompactionStatus.java | 13 ++-
.../server/compaction/CompactionStatusTest.java | 110 +++++++++++++++++++++
2 files changed, 122 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index 77786993ac9..98194eefdbd 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -73,7 +73,8 @@ public class CompactionStatus
Evaluator::rollupIsUpToDate,
Evaluator::dimensionsSpecIsUpToDate,
Evaluator::metricsSpecIsUpToDate,
- Evaluator::transformSpecFilterIsUpToDate
+ Evaluator::transformSpecFilterIsUpToDate,
+ Evaluator::projectionsAreUpToDate
);
private final State state;
@@ -340,6 +341,16 @@ public class CompactionStatus
);
}
+ private CompactionStatus projectionsAreUpToDate()
+ {
+ return CompactionStatus.completeIfEqual(
+ "projections",
+ compactionConfig.getProjections(),
+ lastCompactionState.getProjections(),
+ String::valueOf
+ );
+ }
+
private CompactionStatus segmentGranularityIsUpToDate()
{
if (configuredGranularitySpec == null
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
index 836a0a0b190..41d9a8d3d5f 100644
---
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
+++
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
@@ -21,6 +21,9 @@ package org.apache.druid.server.compaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.granularity.GranularitySpec;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@@ -30,8 +33,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -43,6 +49,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
+import java.util.List;
public class CompactionStatusTest
{
@@ -321,6 +328,109 @@ public class CompactionStatusTest
Assert.assertTrue(status.isComplete());
}
+ @Test
+ public void testStatusWhenProjectionsMatch()
+ {
+ final GranularitySpec currentGranularitySpec
+ = new UniformGranularitySpec(Granularities.HOUR, null, null);
+ final PartitionsSpec currentPartitionsSpec = new
DynamicPartitionsSpec(100, 0L);
+ final IndexSpec currentIndexSpec
+ =
IndexSpec.builder().withDimensionCompression(CompressionStrategy.ZSTD).build();
+ final AggregateProjectionSpec projection1 = new AggregateProjectionSpec(
+ "foo",
+ VirtualColumns.create(
+ Granularities.toVirtualColumn(Granularities.HOUR,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+ ),
+ List.of(
+ new
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+ new StringDimensionSchema("a")
+ ),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("sum_long", "long")
+ }
+ );
+ final CompactionState lastCompactionState = new CompactionState(
+ currentPartitionsSpec,
+ null,
+ null,
+ null,
+ currentIndexSpec,
+ currentGranularitySpec,
+ List.of(projection1)
+ );
+ final DataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(TestDataSource.WIKI)
+ .withTuningConfig(createTuningConfig(currentPartitionsSpec,
currentIndexSpec))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+ .withProjections(List.of(projection1))
+ .build();
+
+ final DataSegment segment =
DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build();
+ final CompactionStatus status = CompactionStatus.compute(
+ CompactionCandidate.from(Collections.singletonList(segment)),
+ compactionConfig,
+ OBJECT_MAPPER
+ );
+ Assert.assertTrue(status.isComplete());
+ }
+
+ @Test
+ public void testStatusWhenProjectionsMismatch()
+ {
+ final GranularitySpec currentGranularitySpec
+ = new UniformGranularitySpec(Granularities.HOUR, null, null);
+ final PartitionsSpec currentPartitionsSpec = new
DynamicPartitionsSpec(100, 0L);
+ final IndexSpec currentIndexSpec
+ =
IndexSpec.builder().withDimensionCompression(CompressionStrategy.ZSTD).build();
+ final AggregateProjectionSpec projection1 = new AggregateProjectionSpec(
+ "1",
+ VirtualColumns.create(
+ Granularities.toVirtualColumn(Granularities.HOUR,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
+ ),
+ List.of(
+ new
LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME),
+ new StringDimensionSchema("a")
+ ),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("sum_long", "long")
+ }
+ );
+ final AggregateProjectionSpec projection2 = new AggregateProjectionSpec(
+ "2",
+ VirtualColumns.EMPTY,
+ Collections.emptyList(),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("sum_long", "long")
+ }
+ );
+
+ final CompactionState lastCompactionState = new CompactionState(
+ currentPartitionsSpec,
+ null,
+ null,
+ null,
+ currentIndexSpec,
+ currentGranularitySpec,
+ List.of(projection1)
+ );
+ final DataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(TestDataSource.WIKI)
+ .withTuningConfig(createTuningConfig(currentPartitionsSpec,
currentIndexSpec))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+ .withProjections(List.of(projection1, projection2))
+ .build();
+
+ final DataSegment segment =
DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build();
+ final CompactionStatus status = CompactionStatus.compute(
+ CompactionCandidate.from(Collections.singletonList(segment)),
+ compactionConfig,
+ OBJECT_MAPPER
+ );
+ Assert.assertFalse(status.isComplete());
+ }
+
private void verifyCompactionStatusIsPendingBecause(
CompactionState lastCompactionState,
DataSourceCompactionConfig compactionConfig,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]