cryptoe commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1064634362
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -38,27 +41,35 @@
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.stream.IntStream;
/**
* Controller-side state machine for each stage. Used by {@link
ControllerQueryKernel} to form the overall state
* machine for an entire query.
- *
+ * <p>
* Package-private: stage trackers are an internal implementation detail of
{@link ControllerQueryKernel}, not meant
* for separate use.
*/
class ControllerStageTracker
{
+ private static final Logger log = new Logger(ControllerStageTracker.class);
private final StageDefinition stageDef;
+
private final int workerCount;
private final WorkerInputs workerInputs;
+
+ // worker-> workerStagePhase
+ private final Int2ObjectMap<WorkerStagePhase> workerToPhase = new
Int2ObjectOpenHashMap<>();
Review Comment:
// worker-> workerStagePhase
// Controller keeps track of the stage with this map.
// Currently, we rely on the serial nature of the state machine to keep
things in sync between the controller and the worker.
// So the worker state in the controller can go out of sync with the
actual worker state.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -38,27 +41,35 @@
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.stream.IntStream;
/**
* Controller-side state machine for each stage. Used by {@link
ControllerQueryKernel} to form the overall state
* machine for an entire query.
- *
+ * <p>
* Package-private: stage trackers are an internal implementation detail of
{@link ControllerQueryKernel}, not meant
* for separate use.
*/
class ControllerStageTracker
{
+ private static final Logger log = new Logger(ControllerStageTracker.class);
private final StageDefinition stageDef;
+
private final int workerCount;
private final WorkerInputs workerInputs;
+
+ // worker-> workerStagePhase
+ private final Int2ObjectMap<WorkerStagePhase> workerToPhase = new
Int2ObjectOpenHashMap<>();
Review Comment:
Added the comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]