cecemei commented on code in PR #18950:
URL: https://github.com/apache/druid/pull/18950#discussion_r2770161133


##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
               .to(WindowOperatorQueryKit.class);
     binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
   }
+
+  @Provides
+  IndexerControllerContext.Builder providesContextBuilder(Injector injector)

Review Comment:
   there's no functional change, it's just easier for testing so that we can 
inject the context in tests



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -75,6 +75,11 @@
  */
 public class IndexerControllerContext implements ControllerContext
 {
+  public interface Builder

Review Comment:
   updated



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+    return this;
+  }
+
+  public TaskActionTestKit setBatchSegmentAllocation(boolean 
batchSegmentAllocation)
+  {
+    if (configFinalized.get()) {

Review Comment:
   well this is just to prevent ppl from accidently calling it inside test 
method



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -411,12 +411,18 @@ private static List<DimensionSpec> getAggregateDimensions(
   {
     List<DimensionSpec> dimensionSpecs = new ArrayList<>();
 
-    if (isQueryGranularityEmptyOrNone(dataSchema)) {
-      // Dimensions in group-by aren't allowed to have time column name as the 
output name.
-      dimensionSpecs.add(new 
DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME, TIME_VIRTUAL_COLUMN, 
ColumnType.LONG));
-    } else {
-      // The changed granularity would result in a new virtual column that 
needs to be aggregated upon.
-      dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, 
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+    if 
(!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME))
 {

Review Comment:
   added



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)

Review Comment:
   it's not supported by msq engine, schema is not saved



##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
               .to(WindowOperatorQueryKit.class);
     binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
   }
+
+  @Provides
+  IndexerControllerContext.Builder providesContextBuilder(Injector injector)

Review Comment:
   controller context need the injector... i originally added separate class as 
well but i think some tests breaks since the guice module asks for class 
instance even before it asks to provide context instance. it's not ideal but 
injector is in many places.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+    return this;
+  }
+
+  public TaskActionTestKit setBatchSegmentAllocation(boolean 
batchSegmentAllocation)
+  {
+    if (configFinalized.get()) {

Review Comment:
   the before method was called before @test, wont that mean we can't change 
the config between tests, but in setup only?



##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -206,4 +206,92 @@ public static Function<Set<DataSegment>, Set<DataSegment>> 
addCompactionStateToS
         .map(s -> s.withLastCompactionState(compactionState))
         .collect(Collectors.toSet());
   }
+
+
+  public Builder toBuilder()

Review Comment:
   ah i thought this toBuilder() is more fluent style? like:
   > getDefaultCompactionState().toBuilder().dimensionSpec().build()



##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -1012,7 +1016,10 @@ public void testReplaceOnFoo1WithWhereExtern(String 
contextName, Map<String, Obj
                              Collections.emptyList(),
                              Collections.singletonList(new 
StringDimensionSchema("user")),
                              GranularityType.HOUR,
-                             
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z")
+                             
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z"),
+                             new CompactionTransformSpec(

Review Comment:
   tests without filters are still the same (without transform spec). 



##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -108,7 +113,10 @@ public IndexerControllerContext(
     this.clientFactory = clientFactory;
     this.overlordClient = overlordClient;
     this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
-    final StorageConnectorProvider storageConnectorProvider = 
injector.getInstance(Key.get(StorageConnectorProvider.class, 
MultiStageQuery.class));
+    final StorageConnectorProvider storageConnectorProvider = 
injector.getInstance(Key.get(

Review Comment:
   this has been reverted



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestSpyTaskActionClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test utility that wraps a {@link TaskActionClient} to track published 
segments and their schema mappings.
+ * <p>
+ * This spy intercepts {@link SegmentTransactionalInsertAction}, {@link 
SegmentTransactionalReplaceAction},
+ * and {@link SegmentTransactionalAppendAction} submissions to collect the 
segments and schema mappings being
+ * published. All other task actions are delegated to the wrapped client 
without modification.
+ * <p>
+ * Useful for verifying that tasks publish the expected segments and schemas 
in integration tests.
+ */
+public class TestSpyTaskActionClient implements TaskActionClient

Review Comment:
   this is done



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -137,30 +138,35 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-@RunWith(Parameterized.class)
-public class CompactionTaskRunTest extends IngestionTestBase
+public abstract class CompactionTaskRunBase

Review Comment:
   motivation to change this test is that it doesnt cover msq runner, and while 
re-writing it i find some tests too long to read, and some issues in msq runner 
have been discovered due to making it run with msq runner. the test coverage 
has improved a lot in this class, priori to this pr, this test only tests 
native compaction runner and also dont use a real task action client. i was 
surprised by how little test coverage we have on msq compaction runner, wanted 
to use this test file for any msq compaction runner related change in the 
future.
   
   the changes in this pr actually was mostly not covered by this test (before 
this pr), only the interval locking is related. i re-ran the old test with the 
interval lock change, the failures are due to the interval diff in compaction 
state, and another test failed due to lock interval covers the input interval 
now. so all seems expected. 



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
   private BlockingExecutorService metadataCachePollExec;
 
   private boolean useSegmentMetadataCache = false;
+  private boolean useCentralizedDatasourceSchema = false;
+  private boolean batchSegmentAllocation = true;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+  private AtomicBoolean configFinalized = new AtomicBoolean();
+
+  public TaskActionTestKit setUseSegmentMetadataCache(boolean 
useSegmentMetadataCache)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useSegmentMetadataCache = useSegmentMetadataCache;
+    return this;
+  }
+
+  public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean 
useCentralizedDatasourceSchema)
+  {
+    if (configFinalized.get()) {
+      throw new IllegalStateException("Test config already finalized");
+    }
+    this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+    return this;
+  }
+
+  public TaskActionTestKit setBatchSegmentAllocation(boolean 
batchSegmentAllocation)

Review Comment:
   i could not reproduce the test cases any more, maybe it disappeared after we 
switched to use the widen interval. anyways, so i added the segmentQueue test 
param.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -571,513 +479,277 @@ public void 
testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc
       }
     }
 
-    Assert.assertTrue(compactionFuture.get().lhs.isSuccess());
-
-    dataSegmentsWithSchemas = compactionFuture.get().rhs;
-    verifySchema(dataSegmentsWithSchemas);
-    segments = new ArrayList<>(dataSegmentsWithSchemas.getSegments());
-    Assert.assertEquals(3, segments.size());
-
-    for (int i = 0; i < 3; i++) {
-      Assert.assertEquals(
-          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
-          segments.get(i).getInterval()
-      );
-      Assert.assertEquals(
-          getDefaultCompactionState(
-              Granularities.HOUR,
-              Granularities.MINUTE,
-              ImmutableList.of(Intervals.of(
-                  "2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
-                  i,
-                  i + 1
-              ))
-          ),
-          segments.get(i).getLastCompactionState()
-      );
-      if (lockGranularity == LockGranularity.SEGMENT) {
-        Assert.assertEquals(
-            new 
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, 
(short) 1, (short) 1),
-            segments.get(i).getShardSpec()
-        );
-      } else {
-        Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
-      }
-    }
-  }
-
-  @Test
-  public void testWithSegmentGranularity() throws Exception

Review Comment:
   segment granularity has been tested very throughly in this test after this 
change, so we dont need it any more. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java:
##########
@@ -60,6 +60,7 @@ public CompactionInputSpec getInputSpec()
     return inputSpec;
   }
 
+  @Deprecated

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to