This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c25a556827 Fix bug in auto compaction preserveExistingMetrics feature 
(#12438)
c25a556827 is described below

commit c25a5568275c506115914cc5622a829c89f8e384
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Apr 15 15:47:47 2022 -0700

    Fix bug in auto compaction preserveExistingMetrics feature (#12438)
    
    * fix bug
    
    * fix test
    
    * fix IT
---
 .../coordinator/duty/ITAutoCompactionTest.java     |  96 ++++++++++++++++
 .../incremental/OnheapIncrementalIndex.java        |  10 +-
 .../druid/segment/data/IncrementalIndexTest.java   |  69 ++++++++++++
 .../ClientCompactionTaskQueryTuningConfig.java     |   7 +-
 .../server/coordinator/duty/CompactSegments.java   |   2 +-
 .../duty/NewestSegmentFirstIterator.java           |   2 +-
 .../coordinator/duty/CompactSegmentsTest.java      | 125 +++++++++++++++++++++
 .../duty/NewestSegmentFirstIteratorTest.java       |  18 +--
 .../duty/NewestSegmentFirstPolicyTest.java         |  26 ++---
 9 files changed, 323 insertions(+), 32 deletions(-)

diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 7dd7591615..32d563f25b 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
@@ -121,6 +122,101 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + 
config.getExtraDatasourceNameSuffix();
   }
 
+  @Test
+  public void 
testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetricsUsingAggregatorWithDifferentReturnType()
 throws Exception
+  {
+    // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, 
thetaSketch = 2, HLLSketchBuild = 2
+    loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+    // added = 31, count = null, sum_added = null, quantilesDoublesSketch = 
null, thetaSketch = null, HLLSketchBuild = null
+    loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 2 segments across 1 days...
+      verifySegmentsCount(2);
+      ArrayList<Object> nullList = new ArrayList<Object>();
+      nullList.add(null);
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), 
ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", 
ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", 
ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%QUANTILESRESULT%%", 2,
+          "%%THETARESULT%%", 2.0,
+          "%%HLLRESULT%%", 2
+      );
+      verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(null, null, true),
+          new 
UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+          null,
+          new AggregatorFactory[]{
+              new CountAggregatorFactory("count"),
+              // FloatSumAggregator combine method takes in two Float but 
return Double
+              new FloatSumAggregatorFactory("sum_added", "added"),
+              new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, 
true, false, null),
+              new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 
12, TgtHllType.HLL_4.name(), false),
+              new DoublesSketchAggregatorFactory("quantilesDoublesSketch", 
"delta", 128, 1000000000L)
+          },
+          false
+      );
+      // should now only have 1 row after compaction
+      // added = null, count = 3, sum_added = 93.0
+      forceTriggerAutoCompaction(1);
+
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(3))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(93.0f))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%QUANTILESRESULT%%", 3,
+          "%%THETARESULT%%", 3.0,
+          "%%HLLRESULT%%", 3
+      );
+      verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(intervalsBeforeCompaction);
+
+      List<TaskResponseObject> compactTasksBefore = 
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify rollup segments does not get compacted again
+      forceTriggerAutoCompaction(1);
+      List<TaskResponseObject> compactTasksAfter = 
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
   @Test
   public void 
testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics()
 throws Exception
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index a28a0a620f..f521c3913b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -452,13 +452,13 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
   @Override
   public float getMetricFloatValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), 
aggOffset, Aggregator::getFloat);
+    return ((Number) getMetricHelper(getMetricAggs(), 
concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
   }
 
   @Override
   public long getMetricLongValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), 
aggOffset, Aggregator::getLong);
+    return ((Number) getMetricHelper(getMetricAggs(), 
concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
   }
 
   @Override
@@ -470,7 +470,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   @Override
   protected double getMetricDoubleValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), 
aggOffset, Aggregator::getDouble);
+    return ((Number) getMetricHelper(getMetricAggs(), 
concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
   }
 
   @Override
@@ -544,7 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
    * If preserveExistingMetrics flag is set, then this method will combine 
values from two aggregators, the aggregator
    * for aggregating from input into output field and the aggregator for 
combining already aggregated field, as needed
    */
-  private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] 
aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+  private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] 
aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
   {
     if (preserveExistingMetrics) {
       // Since the preserveExistingMetrics flag is set, we will have to check 
and possibly retrieve the aggregated values
@@ -564,7 +564,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
         AggregatorFactory aggregatorFactory = metrics[aggOffset];
         T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
         T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset 
+ metrics.length]);
-        return (T) aggregatorFactory.combine(aggregatedFromSource, 
aggregatedFromCombined);
+        return aggregatorFactory.combine(aggregatedFromSource, 
aggregatedFromCombined);
       }
     } else {
       // If preserveExistingMetrics flag is not set then we simply get metrics 
from the list of Aggregator, aggs, using the
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 3257ec6d49..77309ff277 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.BoundDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -799,6 +800,74 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void 
testSchemaRollupWithRowWithExistingMetricsAndWithoutMetricUsingAggregatorWithDifferentReturnType()
 throws IndexSizeExceededException
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        // FloatSumAggregator combine method takes in two Float but return 
Double
+        new FloatSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) 
aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 2)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 2, 
"sum_of_x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 3, 
"sum_of_x", 5)
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, 
row.getMetric("count").intValue());
+        Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, 
row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 4 rows
+        if (rowCount == 1 || rowCount == 2) {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(1 + rowCount, 
row.getMetric("sum_of_x").intValue());
+        } else {
+          if (isPreserveExistingMetrics) {
+            Assert.assertEquals(rowCount - 1, 
row.getMetric("count").intValue());
+            Assert.assertEquals(1 + rowCount, 
row.getMetric("sum_of_x").intValue());
+          } else {
+            Assert.assertEquals(1, row.getMetric("count").intValue());
+            // The rows does not have the dim "x", hence metric is null 
(useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+            Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0.0f, 
row.getMetric("sum_of_x"));
+          }
+        }
+      }
+    }
+  }
+
   @Test
   public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws 
IndexSizeExceededException
   {
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index f35257e244..374f61c512 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -79,13 +79,14 @@ public class ClientCompactionTaskQueryTuningConfig
 
   public static ClientCompactionTaskQueryTuningConfig from(
       @Nullable UserCompactionTaskQueryTuningConfig 
userCompactionTaskQueryTuningConfig,
-      @Nullable Integer maxRowsPerSegment
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Boolean preserveExistingMetrics
   )
   {
     if (userCompactionTaskQueryTuningConfig == null) {
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
-          new OnheapIncrementalIndex.Spec(true),
+          new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
           null,
           null,
           null,
@@ -107,7 +108,7 @@ public class ClientCompactionTaskQueryTuningConfig
     } else {
       AppendableIndexSpec appendableIndexSpecToUse = 
userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
                                                      ? 
userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
-                                                     : new 
OnheapIncrementalIndex.Spec(true);
+                                                     : new 
OnheapIncrementalIndex.Spec(preserveExistingMetrics);
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
           appendableIndexSpecToUse,
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index ddae02298b..c809e8168f 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -452,7 +452,7 @@ public class CompactSegments implements 
CoordinatorCustomDuty
             "coordinator-issued",
             segmentsToCompact,
             config.getTaskPriority(),
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment()),
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
             granularitySpec,
             dimensionsSpec,
             config.getMetricsSpec(),
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 8e2f5f3d2c..4f2f1afca5 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -338,7 +338,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   {
     Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
     final ClientCompactionTaskQueryTuningConfig tuningConfig =
-        ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment());
+        ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null);
     final PartitionsSpec partitionsSpecFromConfig = 
findPartitionsSpecFromConfig(tuningConfig);
     final CompactionState lastCompactionState = 
candidates.segments.get(0).getLastCompactionState();
     if (lastCompactionState == null) {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2b91a89abb..08b32feff3 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -69,6 +69,7 @@ import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -1741,6 +1742,130 @@ public class CompactSegmentsTest
     Assert.assertEquals(expected, actual);
   }
 
+  @Test
+  public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new 
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<ClientCompactionTaskQueryTuningConfig> 
clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskQueryTuningConfig.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    
Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue());
+    
Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec());
+    Assert.assertTrue(((OnheapIncrementalIndex.Spec) 
clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics());
+  }
+
+  @Test
+  public void 
testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new 
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<ClientCompactionTaskQueryTuningConfig> 
clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskQueryTuningConfig.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    
Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue());
+    
Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec());
+    Assert.assertFalse(((OnheapIncrementalIndex.Spec) 
clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics());
+  }
+
   private void verifySnapshot(
       CompactSegments compactSegments,
       AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index fd7549c629..cbcd1b906b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -101,7 +101,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -189,7 +189,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, 1000L),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -233,7 +233,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -277,7 +277,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -321,7 +321,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -365,7 +365,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -409,7 +409,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -453,7 +453,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new SingleDimensionPartitionsSpec(10000, null, "dim", false),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
+            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 2026873103..1b7570a972 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -700,7 +700,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -733,7 +733,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -766,7 +766,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -809,7 +809,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -852,7 +852,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -904,7 +904,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -955,7 +955,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1015,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // queryGranularity=DAY for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1075,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // Dimensions=["foo", "bar"] for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1174,7 +1174,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // filter=SelectorDimFilter("dim1", "foo", null) for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1298,7 +1298,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // metricsSpec={CountAggregatorFactory("cnt")} for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1447,7 +1447,7 @@ public class NewestSegmentFirstPolicyTest
     // Different indexSpec as what is set in the auto compaction config
     IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), 
null, null, null);
     Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, 
new TypeReference<Map<String, Object>>() {});
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -1496,7 +1496,7 @@ public class NewestSegmentFirstPolicyTest
   public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
   {
     NullHandling.initializeForTests();
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
         new SegmentGenerateSpec(
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to