kfaraz commented on code in PR #13463:
URL: https://github.com/apache/druid/pull/13463#discussion_r1047359656
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -615,7 +615,9 @@ public void updatePartialKeyStatisticsInformation(int
stageNumber, int workerNum
queryKernel.addPartialKeyStatisticsForStageAndWorker(stageId,
workerNumber, partialKeyStatisticsInformation);
if
(queryKernel.getStagePhase(stageId).equals(ControllerStagePhase.MERGING_STATISTICS))
{
- List<String> workerTaskIds = workerTaskLauncher.getTaskList();
+ // we only need tasks which are active for this stage.
+ List<String> workerTaskIds = workerTaskLauncher.getTaskList()
Review Comment:
Doesn't `getTaskList()` already return the list of active tasks only?
Or can it include active tasks from other stages too?
If yes, can we be sure of the order of the items in the returned list such
that `subList` always works as expected?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -571,16 +571,37 @@ public void postFinish()
@Override
public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId)
{
+ if (stageKernelMap.get(stageId) == null) {
+ throw new ISE("Requested statistics snapshot for non-existent stageId
%s.", stageId);
+ }
+ if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) {
+ throw new ISE(
+ "Requested statistics snapshot is not generated yet for stageId[%s]",
+ stageId
+ );
+ }
return stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot();
Review Comment:
Style: might be easier to read as an `if-else-if` chain.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.exec.Worker;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerChatHandlerTest
+{
+ private static final StageId TEST_STAGE = new StageId("123", 0);
+ @Mock
+ private HttpServletRequest req;
+
+ private TaskToolbox toolbox;
+ private AutoCloseable mocks;
+
+ private final TestWorker worker = new TestWorker();
+
+ @Before
+ public void setUp()
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ IndexIO indexIO = new IndexIO(mapper, () -> 0);
+ IndexMergerV9 indexMerger = new IndexMergerV9(
+ mapper,
+ indexIO,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ );
+
+ mocks = MockitoAnnotations.openMocks(this);
Review Comment:
Style: Since there is a single mocked instance, I suppose it might be
simpler to just use `Mockito.mock()` for the `HttpServletRequest req` rather
than using `MockitoAnnotations`.
If you do want to use `@Mock` however, the preferred method would be
annotate the class with `@RunWith(MockitoJUnitRunner)`
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.exec.Worker;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerChatHandlerTest
+{
+ private static final StageId TEST_STAGE = new StageId("123", 0);
+ @Mock
+ private HttpServletRequest req;
+
+ private TaskToolbox toolbox;
+ private AutoCloseable mocks;
+
+ private final TestWorker worker = new TestWorker();
+
+ @Before
+ public void setUp()
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ IndexIO indexIO = new IndexIO(mapper, () -> 0);
+ IndexMergerV9 indexMerger = new IndexMergerV9(
+ mapper,
+ indexIO,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ );
+
+ mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .thenReturn(new AuthenticationResult("druid", "druid", null, null));
+ TaskToolbox.Builder builder = new TaskToolbox.Builder();
+ toolbox = builder.authorizerMapper(CalciteTests.TEST_AUTHORIZER_MAPPER)
+ .indexIO(indexIO)
+ .indexMergerV9(indexMerger)
+ .taskReportFileWriter(
+ new TaskReportFileWriter()
Review Comment:
Nit: You could use the existing `NoopTaskReportFileWriter` here instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]