kfaraz commented on code in PR #18950:
URL: https://github.com/apache/druid/pull/18950#discussion_r2769468924


##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -206,4 +206,92 @@ public static Function<Set<DataSegment>, Set<DataSegment>> 
addCompactionStateToS
         .map(s -> s.withLastCompactionState(compactionState))
         .collect(Collectors.toSet());
   }
+
+
+  public Builder toBuilder()

Review Comment:
   Nit: Would it be to make this method static?
   
   ```
   public static Builder builder(CompactionState state) {...}
   ```



##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -1012,7 +1016,10 @@ public void testReplaceOnFoo1WithWhereExtern(String 
contextName, Map<String, Obj
                              Collections.emptyList(),
                              Collections.singletonList(new 
StringDimensionSchema("user")),
                              GranularityType.HOUR,
-                             
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z")
+                             
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z"),
+                             new CompactionTransformSpec(

Review Comment:
   Do all the tests now have a transform spec? Should we retain some tests 
without transform spec too?



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)

Review Comment:
   Did centralized schema also run into an issue similar to batch allocation?



##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
               .to(WindowOperatorQueryKit.class);
     binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
   }
+
+  @Provides
+  IndexerControllerContext.Builder providesContextBuilder(Injector injector)

Review Comment:
   We need not use the `Injector` directly here.
   
   ```suggestion
     IndexerControllerContext.Builder getControllerContextBuilder(
         @EscalatedGlobal ServiceClientFactory serviceClientFactory,
         OverlordClient overlordClient
     )
   ```



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -75,6 +75,11 @@
  */
 public class IndexerControllerContext implements ControllerContext
 {
+  public interface Builder

Review Comment:
   It might be cleaner to move this into a separate file and call it 
`IndexerControllerContextFactory` to align with other similar factory classes 
in Druid.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestSpyTaskActionClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test utility that wraps a {@link TaskActionClient} to track published 
segments and their schema mappings.
+ * <p>
+ * This spy intercepts {@link SegmentTransactionalInsertAction}, {@link 
SegmentTransactionalReplaceAction},
+ * and {@link SegmentTransactionalAppendAction} submissions to collect the 
segments and schema mappings being
+ * published. All other task actions are delegated to the wrapped client 
without modification.
+ * <p>
+ * Useful for verifying that tasks publish the expected segments and schemas 
in integration tests.
+ */
+public class TestSpyTaskActionClient implements TaskActionClient

Review Comment:
   This action client is very specific to segment transactions and might not 
have a wide usage.
   
   I would suggest the following:
   - Move this test class to `CompactionTaskRunBase` itself
   - Rename it to something simpler `TestTaskActionClient` or 
`WrappingTaskActionClient`. (The "spy" is a little misleading since it suggests 
some usage of Mockito spy utilities).
   - Avoid javadocs since this code is unit-test-only and seems self 
explanatory. Add only 1-line javadocs or regular comments where necessary.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+    return this;
+  }
+
+  public TaskActionTestKit setBatchSegmentAllocation(boolean 
batchSegmentAllocation)
+  {
+    if (configFinalized.get()) {

Review Comment:
   Why do we want to impose this limit? It should be okay to be able to change 
the config between tests.
   The `before` method would be invoked before every test, right?



##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Providers;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.SegmentWranglerModule;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CompactionTaskRunBase;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.IndexerControllerContext;
+import org.apache.druid.msq.indexing.MSQCompactionRunner;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.msq.test.MSQTestControllerContext;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
+import org.apache.druid.segment.DataSegmentsWithSchemas;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountedSegmentProvider;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.serde.ComplexMetrics;
+import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.calcite.util.LookylooModule;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for CompactionTask using MSQCompactionRunner.
+ * Extends CompactionTaskRunTest to reuse all test infrastructure.
+ */
+@RunWith(Parameterized.class)
+public class MSQCompactionTaskRunTest extends CompactionTaskRunBase
+{
+  private final ConcurrentHashMap<String, TaskActionClient> taskActionClients 
= new ConcurrentHashMap<>();
+  private Injector injector;
+
+  @Parameterized.Parameters(name = "name: {0}, inputInterval={5}, 
segmentGran={6}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+
+    for (LockGranularity lockGranularity : new 
LockGranularity[]{LockGranularity.TIME_CHUNK}) {
+      for (boolean useCentralizedDatasourceSchema : new boolean[]{false}) {
+        for (boolean useSegmentMetadataCache : new boolean[]{false, true}) {
+          for (boolean useConcurrentLocks : new boolean[]{false, true}) {
+            for (Interval inputInterval : new Interval[]{TEST_INTERVAL}) {
+              for (Granularity segmentGran : new 
Granularity[]{Granularities.SIX_HOUR}) {
+                String name = StringUtils.format(

Review Comment:
   This should be done in the `@Paramaters` annotation itself.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -108,7 +113,10 @@ public IndexerControllerContext(
     this.clientFactory = clientFactory;
     this.overlordClient = overlordClient;
     this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
-    final StorageConnectorProvider storageConnectorProvider = 
injector.getInstance(Key.get(StorageConnectorProvider.class, 
MultiStageQuery.class));
+    final StorageConnectorProvider storageConnectorProvider = 
injector.getInstance(Key.get(

Review Comment:
   Suggestion: Change doesn't seem necessary. It's best to avoid formatting 
changes unless they really help with readability.



##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Providers;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.SegmentWranglerModule;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CompactionTaskRunBase;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.IndexerControllerContext;
+import org.apache.druid.msq.indexing.MSQCompactionRunner;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.msq.test.MSQTestControllerContext;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
+import org.apache.druid.segment.DataSegmentsWithSchemas;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountedSegmentProvider;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.serde.ComplexMetrics;
+import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.calcite.util.LookylooModule;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for CompactionTask using MSQCompactionRunner.
+ * Extends CompactionTaskRunTest to reuse all test infrastructure.
+ */
+@RunWith(Parameterized.class)
+public class MSQCompactionTaskRunTest extends CompactionTaskRunBase
+{
+  private final ConcurrentHashMap<String, TaskActionClient> taskActionClients 
= new ConcurrentHashMap<>();
+  private Injector injector;
+
+  @Parameterized.Parameters(name = "name: {0}, inputInterval={5}, 
segmentGran={6}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+
+    for (LockGranularity lockGranularity : new 
LockGranularity[]{LockGranularity.TIME_CHUNK}) {
+      for (boolean useCentralizedDatasourceSchema : new boolean[]{false}) {
+        for (boolean useSegmentMetadataCache : new boolean[]{false, true}) {
+          for (boolean useConcurrentLocks : new boolean[]{false, true}) {
+            for (Interval inputInterval : new Interval[]{TEST_INTERVAL}) {
+              for (Granularity segmentGran : new 
Granularity[]{Granularities.SIX_HOUR}) {

Review Comment:
   I think only 2 for loops (for `useSegmentMetadataCache` and 
`useConcurrentLocks`) should be enough here. The other parameters seem to be 
taking a single value only.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+    return this;
+  }
+
+  public TaskActionTestKit setBatchSegmentAllocation(boolean 
batchSegmentAllocation)

Review Comment:
   Okay, in that case, let's skip the test case for segment lock and add a 
comment or add a test case and mark it disabled. That way, we would be aware 
that there is a bug and can address it later.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
               .to(WindowOperatorQueryKit.class);
     binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
   }
+
+  @Provides
+  IndexerControllerContext.Builder providesContextBuilder(Injector injector)

Review Comment:
   Also, is there any functional change here or is the code just being moved 
from `MSQControllerTask`?



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -137,30 +138,35 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-@RunWith(Parameterized.class)
-public class CompactionTaskRunTest extends IngestionTestBase
+public abstract class CompactionTaskRunBase

Review Comment:
   The diff for this class seems big and it is a little difficult to ensure 
that no assertions have actually changed.
   Since we are changing some core logic in this PR, I would advise minimizing 
the changes to this test class so that we can be certain that the new code 
works correctly with all existing tests.
   
   If any refactor is needed in this test, we can do it in a follow up PR.
   For the time being, if some methods need to be reused for the 
`MSQCompactionTaskRunTest`, they may be copied over or we may make them `public 
static` where applicable.



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -411,12 +411,18 @@ private static List<DimensionSpec> getAggregateDimensions(
   {
     List<DimensionSpec> dimensionSpecs = new ArrayList<>();
 
-    if (isQueryGranularityEmptyOrNone(dataSchema)) {
-      // Dimensions in group-by aren't allowed to have time column name as the 
output name.
-      dimensionSpecs.add(new 
DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME, TIME_VIRTUAL_COLUMN, 
ColumnType.LONG));
-    } else {
-      // The changed granularity would result in a new virtual column that 
needs to be aggregated upon.
-      dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, 
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+    if 
(!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME))
 {

Review Comment:
   Please add a 1-line comment before this `if` clause clarifying the reason 
behind the check.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java:
##########
@@ -60,6 +60,7 @@ public CompactionInputSpec getInputSpec()
     return inputSpec;
   }
 
+  @Deprecated

Review Comment:
   Nit: Adding a short javadoc indicating why this is deprecated would be nice.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -571,513 +479,277 @@ public void 
testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc
       }
     }
 
-    Assert.assertTrue(compactionFuture.get().lhs.isSuccess());
-
-    dataSegmentsWithSchemas = compactionFuture.get().rhs;
-    verifySchema(dataSegmentsWithSchemas);
-    segments = new ArrayList<>(dataSegmentsWithSchemas.getSegments());
-    Assert.assertEquals(3, segments.size());
-
-    for (int i = 0; i < 3; i++) {
-      Assert.assertEquals(
-          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
-          segments.get(i).getInterval()
-      );
-      Assert.assertEquals(
-          getDefaultCompactionState(
-              Granularities.HOUR,
-              Granularities.MINUTE,
-              ImmutableList.of(Intervals.of(
-                  "2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
-                  i,
-                  i + 1
-              ))
-          ),
-          segments.get(i).getLastCompactionState()
-      );
-      if (lockGranularity == LockGranularity.SEGMENT) {
-        Assert.assertEquals(
-            new 
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, 
(short) 1, (short) 1),
-            segments.get(i).getShardSpec()
-        );
-      } else {
-        Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
-      }
-    }
-  }
-
-  @Test
-  public void testWithSegmentGranularity() throws Exception

Review Comment:
   Is this test not needed anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to