This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fe6603ece [multistage] add observability logging for thread 
scheduling (#9933)
0fe6603ece is described below

commit 0fe6603ece46dad5fe765cfdf7fc44a38cfe9301
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Dec 8 16:27:00 2022 -0800

    [multistage] add observability logging for thread scheduling (#9933)
---
 .../query/runtime/executor/OpChainScheduler.java   |  5 ++
 .../runtime/executor/OpChainSchedulerService.java  | 33 +++++++--
 .../runtime/executor/RoundRobinScheduler.java      | 16 ++++-
 .../pinot/query/runtime/operator/OpChain.java      | 29 ++++----
 .../pinot/query/runtime/operator/OpChainStats.java | 81 ++++++++++++++++++++++
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  2 +-
 .../executor/OpChainSchedulerServiceTest.java      | 12 ++--
 .../runtime/executor/RoundRobinSchedulerTest.java  | 14 ++--
 8 files changed, 157 insertions(+), 35 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index 0e198dcd7e..8b35772542 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -55,4 +55,9 @@ public interface OpChainScheduler {
    *         prior to this call
    */
   OpChain next();
+
+  /**
+   * @return the number of operator chains that are awaiting execution
+   */
+  int size();
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 32472e044a..78d2f9d6b7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -25,7 +25,6 @@ import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +61,8 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
 
   @Override
   protected void triggerShutdown() {
-    // this wil just notify all waiters that the scheduler is shutting down
+    LOGGER.info("Triggered shutdown on OpChainScheduler...");
+    // this will just notify all waiters that the scheduler is shutting down
     _monitor.enter();
     _monitor.leave();
   }
@@ -78,18 +78,19 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
         }
 
         OpChain operatorChain = _scheduler.next();
+        LOGGER.trace("({}): Scheduling", operatorChain);
         _workerPool.submit(new TraceRunnable() {
           @Override
           public void runJob() {
             try {
-              ThreadResourceUsageProvider timer = 
operatorChain.getAndStartTimer();
+              LOGGER.trace("({}): Executing", operatorChain);
+              operatorChain.getStats().executing();
 
               // so long as there's work to be done, keep getting the next 
block
               // when the operator chain returns a NOOP block, then yield the 
execution
               // of this to another worker
               TransferableBlock result = operatorChain.getRoot().nextBlock();
               while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
-                LOGGER.debug("Got block with {} rows.", result.getNumRows());
                 result = operatorChain.getRoot().nextBlock();
               }
 
@@ -97,10 +98,15 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
                 // not complete, needs to re-register for scheduling
                 register(operatorChain, false);
               } else {
-                LOGGER.debug("Execution time: " + timer.getThreadTimeNs());
+                LOGGER.debug("({}): Completed {}",
+                    operatorChain,
+                    operatorChain.getStats());
               }
             } catch (Exception e) {
-              LOGGER.error("Failed to execute query!", e);
+              LOGGER.error("({}): Failed to execute operator chain! {}",
+                  operatorChain,
+                  operatorChain.getStats(),
+                  e);
             }
           }
         });
@@ -117,12 +123,26 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
    */
   public final void register(OpChain operatorChain) {
     register(operatorChain, true);
+    LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
+            + "There are a total of {} chains awaiting execution.",
+        operatorChain,
+        operatorChain.getReceivingMailbox(),
+        _scheduler.size());
+
+    // we want to track the time that it takes from registering
+    // an operator chain to when it completes, so make sure to
+    // start the timer here
+    operatorChain.getStats().startExecutionTimer();
   }
 
   public final void register(OpChain operatorChain, boolean isNew) {
     _monitor.enter();
     try {
+      LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}",
+          operatorChain, isNew, _scheduler.size());
+
       _scheduler.register(operatorChain, isNew);
+      operatorChain.getStats().queued();
     } finally {
       _monitor.leave();
     }
@@ -138,6 +158,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
   public final void onDataAvailable(MailboxIdentifier mailbox) {
     _monitor.enter();
     try {
+      LOGGER.trace("Notified onDataAvailable for mailbox {}", mailbox);
       _scheduler.onDataAvailable(mailbox);
     } finally {
       _monitor.leave();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index 5fc8649291..fe628d52b4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -64,6 +64,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
     // immediately be considered ready in case it does not need
     // read from any mailbox (e.g. with a LiteralValueOperator)
     (isNew ? _ready : _available).add(operatorChain);
+    trace("registered " + operatorChain);
   }
 
   @Override
@@ -85,6 +86,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
     //
     // TODO: fix the memory leak by adding a close(opChain) callback
     _seenMail.add(mailbox);
+    trace("got mail for " + mailbox);
   }
 
   @Override
@@ -95,7 +97,14 @@ public class RoundRobinScheduler implements OpChainScheduler 
{
 
   @Override
   public OpChain next() {
-    return _ready.poll();
+    OpChain op = _ready.poll();
+    trace("Polled " + op);
+    return op;
+  }
+
+  @Override
+  public int size() {
+    return _ready.size() + _available.size();
   }
 
   private void computeReady() {
@@ -121,4 +130,9 @@ public class RoundRobinScheduler implements 
OpChainScheduler {
       }
     }
   }
+
+  private void trace(String operation) {
+    LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}",
+        operation, _ready, _available, _seenMail);
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index af9cd6faef..0bdc656164 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -18,15 +18,12 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import com.google.common.base.Suppliers;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Supplier;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 
 
 /**
@@ -36,28 +33,32 @@ import 
org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 public class OpChain {
 
   private final Operator<TransferableBlock> _root;
-  // TODO: build timers that are partial-execution aware
-  private final Supplier<ThreadResourceUsageProvider> _timer;
   private final Set<MailboxIdentifier> _receivingMailbox;
+  private final OpChainStats _stats;
+  private final String _id;
 
-  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> 
receivingMailboxes) {
+  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> 
receivingMailboxes, long requestId,
+      int stageId) {
     _root = root;
     _receivingMailbox = new HashSet<>(receivingMailboxes);
-
-    // use memoized supplier so that the timing doesn't start until the
-    // first time we get the timer
-    _timer = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
+    _id = String.format("%s_%s", requestId, stageId);
+    _stats = new OpChainStats(_id);
   }
 
   public Operator<TransferableBlock> getRoot() {
     return _root;
   }
 
-  public ThreadResourceUsageProvider getAndStartTimer() {
-    return _timer.get();
-  }
-
   public Set<MailboxIdentifier> getReceivingMailbox() {
     return _receivingMailbox;
   }
+
+  public OpChainStats getStats() {
+    return _stats;
+  }
+
+  @Override
+  public String toString() {
+    return "OpChain{ " + _id + "}";
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
new file mode 100644
index 0000000000..58327c40da
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Suppliers;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+
+
+/**
+ * {@code OpChainStats} tracks execution statistics for {@link OpChain}s.
+ */
+public class OpChainStats {
+
+  // use memoized supplier so that the timing doesn't start until the
+  // first time we get the timer
+  private final Supplier<ThreadResourceUsageProvider> _exTimer
+      = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
+
+  // this is used to make sure that toString() doesn't have side
+  // effects (accidentally starting the timer)
+  private volatile boolean _exTimerStarted = false;
+
+  private final Stopwatch _queuedStopwatch = Stopwatch.createUnstarted();
+  private final AtomicLong _queuedCount = new AtomicLong();
+
+  private final String _id;
+
+  public OpChainStats(String id) {
+    _id = id;
+  }
+
+  public void executing() {
+    startExecutionTimer();
+    if (_queuedStopwatch.isRunning()) {
+      _queuedStopwatch.stop();
+    }
+  }
+
+  public void queued() {
+    _queuedCount.incrementAndGet();
+    if (!_queuedStopwatch.isRunning()) {
+      _queuedStopwatch.start();
+    }
+  }
+
+  public void startExecutionTimer() {
+    _exTimerStarted = true;
+    _exTimer.get();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued 
Time: %sms",
+        _id,
+        _queuedCount.get(),
+        _exTimerStarted ? 
TimeUnit.NANOSECONDS.toMillis(_exTimer.get().getThreadTimeNs()) : 0,
+        _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+    );
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 9f7002baf2..6754b675af 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -58,7 +58,7 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<Transferab
 
   public static OpChain build(StageNode node, PlanRequestContext context) {
     Operator<TransferableBlock> root = node.visit(INSTANCE, context);
-    return new OpChain(root, context.getReceivingMailboxes());
+    return new OpChain(root, context.getReceivingMailboxes(), 
context.getRequestId(), context.getStageId());
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 0a378ed291..5a94c3708f 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -71,7 +71,7 @@ public class OpChainSchedulerServiceTest {
   }
 
   private OpChain getChain(Operator<TransferableBlock> operator) {
-    return new OpChain(operator, ImmutableList.of());
+    return new OpChain(operator, ImmutableList.of(), 123, 1);
   }
 
   @Test
@@ -91,7 +91,7 @@ public class OpChainSchedulerServiceTest {
 
     // When:
     scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
+    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
 
     // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be 
called in less than 10 seconds");
@@ -114,7 +114,7 @@ public class OpChainSchedulerServiceTest {
     });
 
     // When:
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
+    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
     scheduler.startAsync().awaitRunning();
 
     // Then:
@@ -141,7 +141,7 @@ public class OpChainSchedulerServiceTest {
 
     // When:
     scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
+    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
 
     // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be 
called in less than 10 seconds");
@@ -182,8 +182,8 @@ public class OpChainSchedulerServiceTest {
 
     // When:
     scheduler.startAsync().awaitRunning();
-    scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
-    scheduler.register(new OpChain(_operatorB, ImmutableList.of()));
+    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+    scheduler.register(new OpChain(_operatorB, ImmutableList.of(), 123, 1));
 
     // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be 
called in less than 10 seconds");
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 4862c6f2f5..1dd3a80cfd 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -56,7 +56,7 @@ public class RoundRobinSchedulerTest {
   @Test
   public void shouldScheduleNewOpChainsImmediately() {
     // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, 
1);
     RoundRobinScheduler scheduler = new RoundRobinScheduler();
 
     // When:
@@ -70,7 +70,7 @@ public class RoundRobinSchedulerTest {
   @Test
   public void shouldNotScheduleRescheduledOpChainsImmediately() {
     // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, 
1);
     RoundRobinScheduler scheduler = new RoundRobinScheduler();
 
     // When:
@@ -83,8 +83,8 @@ public class RoundRobinSchedulerTest {
   @Test
   public void shouldScheduleRescheduledOpChainOnDataAvailable() {
     // Given:
-    OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
-    OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2));
+    OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, 
1);
+    OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), 123, 
1);
     RoundRobinScheduler scheduler = new RoundRobinScheduler();
 
     // When:
@@ -101,7 +101,7 @@ public class RoundRobinSchedulerTest {
   @Test
   public void shouldScheduleRescheduledOpChainOnDataAvailableBeforeRegister() {
     // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, 
1);
     RoundRobinScheduler scheduler = new RoundRobinScheduler();
 
     // When:
@@ -116,7 +116,7 @@ public class RoundRobinSchedulerTest {
   @Test
   public void 
shouldNotScheduleRescheduledOpChainOnDataAvailableForDifferentMailbox() {
     // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, 
1);
     RoundRobinScheduler scheduler = new RoundRobinScheduler();
 
     // When:
@@ -130,7 +130,7 @@ public class RoundRobinSchedulerTest {
   @Test
   public void shouldScheduleRescheduledOpChainOnDataAvailableForAnyMailbox() {
     // Given:
-    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1, 
MAILBOX_2));
+    OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1, 
MAILBOX_2), 123, 1);
     RoundRobinScheduler scheduler = new RoundRobinScheduler();
 
     // When:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to