This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e37fe93f093 Add support for a custom `DimensionSchema` in 
`DataSourceMSQDestination` (#16864)
e37fe93f093 is described below

commit e37fe93f093a2d2991d39ff3245fc1d07f1e85d4
Author: Vishesh Garg <[email protected]>
AuthorDate: Fri Aug 16 15:24:49 2024 +0530

    Add support for a custom `DimensionSchema` in `DataSourceMSQDestination` 
(#16864)
    
    This PR adds support for passing in a custom DimensionSchema map to MSQ 
query destination of type DataSourceMSQDestination
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 49 +++++++++++------
 .../druid/msq/indexing/MSQCompactionRunner.java    | 10 +++-
 .../org/apache/druid/msq/indexing/MSQSpec.java     |  3 +-
 .../destination/DataSourceMSQDestination.java      | 26 +++++++--
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |  3 +-
 .../druid/msq/exec/MSQParseExceptionsTest.java     |  4 +-
 .../msq/indexing/MSQCompactionRunnerTest.java      | 17 +++---
 .../druid/msq/indexing/MSQControllerTaskTest.java  |  3 +-
 .../destination/DataSourceMSQDestinationTest.java  | 26 ++++++++-
 .../sql/resources/SqlStatementResourceTest.java    |  1 +
 .../msq/util/SqlStatementResourceHelperTest.java   |  1 +
 .../coordinator/duty/ITAutoCompactionTest.java     | 62 ++++++++++++++++++++++
 12 files changed, 167 insertions(+), 38 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 839839db4e4..278a85685dd 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1950,7 +1950,8 @@ public class ControllerImpl implements Controller
             destination.getSegmentSortOrder(),
             columnMappings,
             isRollupQuery,
-            querySpec.getQuery()
+            querySpec.getQuery(),
+            destination.getDimensionToSchemaMap()
         );
 
     return new DataSchema(
@@ -2122,13 +2123,34 @@ public class ControllerImpl implements Controller
     return new StringTuple(array);
   }
 
+  private static DimensionSchema getDimensionSchema(
+      final String outputColumnName,
+      @Nullable final ColumnType queryType,
+      QueryContext context,
+      @Nullable Map<String, DimensionSchema> dimensionToSchemaMap
+  )
+  {
+    if (dimensionToSchemaMap != null && 
dimensionToSchemaMap.containsKey(outputColumnName)) {
+      return dimensionToSchemaMap.get(outputColumnName);
+    }
+    // In case of ingestion, or when metrics are converted to dimensions when 
compaction is performed without rollup,
+    // we won't have an entry in the map. For those cases, use the default 
config.
+    return DimensionSchemaUtils.createDimensionSchema(
+        outputColumnName,
+        queryType,
+        MultiStageQueryContext.useAutoColumnSchemas(context),
+        MultiStageQueryContext.getArrayIngestMode(context)
+    );
+  }
+
   private static Pair<List<DimensionSchema>, List<AggregatorFactory>> 
makeDimensionsAndAggregatorsForIngestion(
       final RowSignature querySignature,
       final ClusterBy queryClusterBy,
       final List<String> segmentSortOrder,
       final ColumnMappings columnMappings,
       final boolean isRollupQuery,
-      final Query<?> query
+      final Query<?> query,
+      @Nullable final Map<String, DimensionSchema> dimensionToSchemaMap
   )
   {
     // Log a warning unconditionally if arrayIngestMode is MVD, since the 
behaviour is incorrect, and is subject to
@@ -2214,18 +2236,14 @@ public class ControllerImpl implements Controller
               outputColumnAggregatorFactories,
               outputColumnName,
               type,
-              query.context()
+              query.context(),
+              dimensionToSchemaMap
           );
         } else {
           // complex columns only
           if 
(DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName()))
 {
             dimensions.add(
-                DimensionSchemaUtils.createDimensionSchema(
-                    outputColumnName,
-                    type,
-                    
MultiStageQueryContext.useAutoColumnSchemas(query.context()),
-                    MultiStageQueryContext.getArrayIngestMode(query.context())
-                )
+                getDimensionSchema(outputColumnName, type, query.context(), 
dimensionToSchemaMap)
             );
           } else if (!isRollupQuery) {
             aggregators.add(new PassthroughAggregatorFactory(outputColumnName, 
type.getComplexTypeName()));
@@ -2236,7 +2254,8 @@ public class ControllerImpl implements Controller
                 outputColumnAggregatorFactories,
                 outputColumnName,
                 type,
-                query.context()
+                query.context(),
+                dimensionToSchemaMap
             );
           }
         }
@@ -2263,19 +2282,15 @@ public class ControllerImpl implements Controller
       Map<String, AggregatorFactory> outputColumnAggregatorFactories,
       String outputColumn,
       ColumnType type,
-      QueryContext context
+      QueryContext context,
+      Map<String, DimensionSchema> dimensionToSchemaMap
   )
   {
     if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
       aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
     } else {
       dimensions.add(
-          DimensionSchemaUtils.createDimensionSchema(
-              outputColumn,
-              type,
-              MultiStageQueryContext.useAutoColumnSchemas(context),
-              MultiStageQueryContext.getArrayIngestMode(context)
-          )
+          getDimensionSchema(outputColumn, type, context, dimensionToSchemaMap)
       );
     }
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index efc2cbb2afb..3abf4b59c7d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -81,6 +81,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class MSQCompactionRunner implements CompactionRunner
@@ -237,7 +238,11 @@ public class MSQCompactionRunner implements 
CompactionRunner
         dataSchema.getDataSource(),
         dataSchema.getGranularitySpec().getSegmentGranularity(),
         null,
-        ImmutableList.of(replaceInterval)
+        ImmutableList.of(replaceInterval),
+        dataSchema.getDimensionsSpec()
+                  .getDimensions()
+                  .stream()
+                  .collect(Collectors.toMap(DimensionSchema::getName, 
Function.identity()))
     );
   }
 
@@ -494,9 +499,10 @@ public class MSQCompactionRunner implements 
CompactionRunner
     // Used for writing the data schema during segment generation phase.
     context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, 
false);
     // Add appropriate finalization to native query context i.e. for the 
GroupBy query
-    context.put(QueryContexts.FINALIZE_KEY, false);
+    context.putIfAbsent(QueryContexts.FINALIZE_KEY, false);
     // Only scalar or array-type dimensions are allowed as grouping keys.
     
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, 
false);
+    context.putIfAbsent(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
     return context;
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
index 065471d32ba..4bb4e32754f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
@@ -28,7 +28,6 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.query.Query;
 import org.apache.druid.sql.calcite.planner.ColumnMappings;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Objects;
 
@@ -43,7 +42,7 @@ public class MSQSpec
   @JsonCreator
   public MSQSpec(
       @JsonProperty("query") Query<?> query,
-      @JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings,
+      @JsonProperty("columnMappings") ColumnMappings columnMappings,
       @JsonProperty("destination") MSQDestination destination,
       @JsonProperty("assignmentStrategy") WorkerAssignmentStrategy 
assignmentStrategy,
       @JsonProperty("tuningConfig") MSQTuningConfig tuningConfig
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
index ea3072bfe45..74be1329467 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -35,6 +36,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -49,18 +51,23 @@ public class DataSourceMSQDestination implements 
MSQDestination
   @Nullable
   private final List<Interval> replaceTimeChunks;
 
+  @Nullable
+  private final Map<String, DimensionSchema> dimensionToSchemaMap;
+
   @JsonCreator
   public DataSourceMSQDestination(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("segmentGranularity") Granularity segmentGranularity,
       @JsonProperty("segmentSortOrder") @Nullable List<String> 
segmentSortOrder,
-      @JsonProperty("replaceTimeChunks") @Nullable List<Interval> 
replaceTimeChunks
+      @JsonProperty("replaceTimeChunks") @Nullable List<Interval> 
replaceTimeChunks,
+      @JsonProperty("dimensionToSchemaMap") @Nullable Map<String, 
DimensionSchema> dimensionToSchemaMap
   )
   {
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, 
"segmentGranularity");
     this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : 
Collections.emptyList();
     this.replaceTimeChunks = replaceTimeChunks;
+    this.dimensionToSchemaMap = dimensionToSchemaMap;
 
     if (replaceTimeChunks != null) {
       // Verify that if replaceTimeChunks is provided, it is nonempty.
@@ -125,6 +132,17 @@ public class DataSourceMSQDestination implements 
MSQDestination
     return replaceTimeChunks;
   }
 
+  /**
+   * Returns the map of dimension name to its schema.
+   */
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<String, DimensionSchema> getDimensionToSchemaMap()
+  {
+    return dimensionToSchemaMap;
+  }
+
   /**
    * Whether this object is in replace-existing-time-chunks mode.
    */
@@ -158,13 +176,14 @@ public class DataSourceMSQDestination implements 
MSQDestination
     return Objects.equals(dataSource, that.dataSource)
            && Objects.equals(segmentGranularity, that.segmentGranularity)
            && Objects.equals(segmentSortOrder, that.segmentSortOrder)
-           && Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
+           && Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
+           && Objects.equals(dimensionToSchemaMap, that.dimensionToSchemaMap);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, 
replaceTimeChunks);
+    return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, 
replaceTimeChunks, dimensionToSchemaMap);
   }
 
   @Override
@@ -175,6 +194,7 @@ public class DataSourceMSQDestination implements 
MSQDestination
            ", segmentGranularity=" + segmentGranularity +
            ", segmentSortOrder=" + segmentSortOrder +
            ", replaceTimeChunks=" + replaceTimeChunks +
+           ", dimensionToSchemaMap=" + dimensionToSchemaMap +
            '}';
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index c6396c0b306..7af34a1eb55 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -246,7 +246,8 @@ public class MSQTaskQueryMaker implements QueryMaker
           targetDataSource.getDestinationName(),
           segmentGranularityObject,
           segmentSortOrder,
-          replaceTimeChunks
+          replaceTimeChunks,
+          null
       );
       MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
                                                         
dataSourceMSQDestination.isReplaceTimeChunks());
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
index 330f1cdbbe6..bc8d517ffba 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
@@ -225,7 +225,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
                         new ColumnMapping("v1", "agent_category")
                     )
                 ))
-                .destination(new DataSourceMSQDestination("foo1", 
Granularities.ALL, null, null))
+                .destination(new DataSourceMSQDestination("foo1", 
Granularities.ALL, null, null, null))
                 .tuningConfig(MSQTuningConfig.defaultConfig())
                 .build())
         .setQueryContext(DEFAULT_MSQ_CONTEXT)
@@ -318,7 +318,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
                         new ColumnMapping("agent_category", "agent_category")
                     )
                 ))
-                .destination(new DataSourceMSQDestination("foo1", 
Granularities.ALL, null, null))
+                .destination(new DataSourceMSQDestination("foo1", 
Granularities.ALL, null, null, null))
                 .tuningConfig(MSQTuningConfig.defaultConfig())
                 .build())
         .setQueryContext(runtimeContext)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index d868ddf20e5..9c4bb91637c 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -74,6 +74,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class MSQCompactionRunnerTest
@@ -87,13 +88,9 @@ public class MSQCompactionRunnerTest
   private static final GranularityType QUERY_GRANULARITY = 
GranularityType.HOUR;
   private static List<String> PARTITION_DIMENSIONS;
 
-  private static final StringDimensionSchema DIM1 = new StringDimensionSchema(
-      "string_dim",
-      null,
-      null
-  );
-  private static final LongDimensionSchema LONG_DIMENSION_SCHEMA = new 
LongDimensionSchema("long_dim");
-  private static final List<DimensionSchema> DIMENSIONS = 
ImmutableList.of(DIM1, LONG_DIMENSION_SCHEMA);
+  private static final StringDimensionSchema STRING_DIMENSION = new 
StringDimensionSchema("string_dim", null, null);
+  private static final LongDimensionSchema LONG_DIMENSION = new 
LongDimensionSchema("long_dim");
+  private static final List<DimensionSchema> DIMENSIONS = 
ImmutableList.of(STRING_DIMENSION, LONG_DIMENSION);
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final AggregatorFactory AGG1 = new 
CountAggregatorFactory("agg_0");
   private static final AggregatorFactory AGG2 = new 
LongSumAggregatorFactory("sum_added", "sum_added");
@@ -291,7 +288,8 @@ public class MSQCompactionRunnerTest
             DATA_SOURCE,
             SEGMENT_GRANULARITY.getDefaultGranularity(),
             null,
-            Collections.singletonList(COMPACTION_INTERVAL)
+            Collections.singletonList(COMPACTION_INTERVAL),
+            
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, 
Function.identity()))
         ),
         actualMSQSpec.getDestination()
     );
@@ -360,7 +358,8 @@ public class MSQCompactionRunnerTest
             DATA_SOURCE,
             SEGMENT_GRANULARITY.getDefaultGranularity(),
             null,
-            Collections.singletonList(COMPACTION_INTERVAL)
+            Collections.singletonList(COMPACTION_INTERVAL),
+            
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, 
Function.identity()))
         ),
         actualMSQSpec.getDestination()
     );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index 9df6c38f30e..e969e209387 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -58,7 +58,8 @@ public class MSQControllerTaskTest
           "target",
           Granularities.DAY,
           null,
-          INTERVALS
+          INTERVALS,
+          null
       ))
       .query(new Druids.ScanQueryBuilder()
                  
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
index 6d3a5ebfd9b..242c00213e2 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
@@ -20,9 +20,14 @@
 package org.apache.druid.msq.indexing.destination;
 
 
+import com.google.common.collect.ImmutableMap;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class DataSourceMSQDestinationTest
 {
 
@@ -30,7 +35,26 @@ public class DataSourceMSQDestinationTest
   public void testEquals()
   {
     EqualsVerifier.forClass(DataSourceMSQDestination.class)
-                  .withNonnullFields("dataSource", "segmentGranularity", 
"segmentSortOrder")
+                  .withNonnullFields("dataSource", "segmentGranularity", 
"segmentSortOrder", "dimensionToSchemaMap")
+                  .withPrefabValues(
+                      Map.class,
+                      ImmutableMap.of(
+                          "language",
+                          new StringDimensionSchema(
+                              "language",
+                              DimensionSchema.MultiValueHandling.SORTED_ARRAY,
+                              false
+                          )
+                      ),
+                      ImmutableMap.of(
+                          "region",
+                          new StringDimensionSchema(
+                              "region",
+                              DimensionSchema.MultiValueHandling.SORTED_ARRAY,
+                              false
+                          )
+                      )
+                  )
                   .usingGetClass()
                   .verify();
   }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 4ea2993050e..2a753e21d16 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -199,6 +199,7 @@ public class SqlStatementResourceTest extends MSQTestBase
                  "test",
                  Granularities.DAY,
                  null,
+                 null,
                  null
              ))
              .tuningConfig(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 1966d1e5b10..58856adf366 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -375,6 +375,7 @@ public class SqlStatementResourceHelperTest
             "test",
             Granularities.DAY,
             null,
+            null,
             null
         )
     );
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 24ede82d0de..31a6bccffc7 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -25,7 +25,9 @@ import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@@ -52,6 +54,8 @@ import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggrega
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.AutoTypeColumnSchema;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -515,6 +519,42 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
+  @Test(dataProvider = "engine")
+  public void 
testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(CompactionEngine 
engine) throws Exception
+  {
+    loadData(INDEX_TASK);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+      LOG.info("Auto compaction test with YEAR segment granularity, 
dropExisting is true");
+      Granularity newSegmentGranularity = Granularities.YEAR;
+
+      List<DimensionSchema> dimensionSchemas = ImmutableList.of(
+          new StringDimensionSchema("language", 
DimensionSchema.MultiValueHandling.SORTED_ARRAY, false),
+          new AutoTypeColumnSchema("deleted", ColumnType.DOUBLE)
+      );
+
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newSegmentGranularity, null, 
true),
+          new UserCompactionTaskDimensionsConfig(dimensionSchemas),
+          null,
+          new AggregatorFactory[] {new LongSumAggregatorFactory("added", 
"added")},
+          true,
+          engine
+      );
+      //...compacted into 1 segment for the entire year.
+      forceTriggerAutoCompaction(1);
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      verifySegmentsCompactedDimensionSchema(dimensionSchemas);
+    }
+  }
+
   @Test
   public void testAutoCompactionDutySubmitAndVerifyCompaction() throws 
Exception
   {
@@ -1941,6 +1981,28 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
       
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec(),
 partitionsSpec);
     }
+
+  }
+
+  private void verifySegmentsCompactedDimensionSchema(List<DimensionSchema> 
dimensionSchemas)
+  {
+    List<DataSegment> segments = 
coordinator.getFullSegmentsMetadata(fullDatasourceName);
+    List<DataSegment> foundCompactedSegments = new ArrayList<>();
+    for (DataSegment segment : segments) {
+      if (segment.getLastCompactionState() != null) {
+        foundCompactedSegments.add(segment);
+      }
+    }
+    for (DataSegment compactedSegment : foundCompactedSegments) {
+      MatcherAssert.assertThat(
+          dimensionSchemas,
+          Matchers.containsInAnyOrder(
+              compactedSegment.getLastCompactionState()
+                              .getDimensionsSpec()
+                              .getDimensions()
+                              .toArray(new DimensionSchema[0]))
+      );
+    }
   }
 
   private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int 
maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to