This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0fe6603ece [multistage] add observability logging for thread
scheduling (#9933)
0fe6603ece is described below
commit 0fe6603ece46dad5fe765cfdf7fc44a38cfe9301
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Dec 8 16:27:00 2022 -0800
[multistage] add observability logging for thread scheduling (#9933)
---
.../query/runtime/executor/OpChainScheduler.java | 5 ++
.../runtime/executor/OpChainSchedulerService.java | 33 +++++++--
.../runtime/executor/RoundRobinScheduler.java | 16 ++++-
.../pinot/query/runtime/operator/OpChain.java | 29 ++++----
.../pinot/query/runtime/operator/OpChainStats.java | 81 ++++++++++++++++++++++
.../query/runtime/plan/PhysicalPlanVisitor.java | 2 +-
.../executor/OpChainSchedulerServiceTest.java | 12 ++--
.../runtime/executor/RoundRobinSchedulerTest.java | 14 ++--
8 files changed, 157 insertions(+), 35 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index 0e198dcd7e..8b35772542 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -55,4 +55,9 @@ public interface OpChainScheduler {
* prior to this call
*/
OpChain next();
+
+ /**
+ * @return the number of operator chains that are awaiting execution
+ */
+ int size();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 32472e044a..78d2f9d6b7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -25,7 +25,6 @@ import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +61,8 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
@Override
protected void triggerShutdown() {
- // this wil just notify all waiters that the scheduler is shutting down
+ LOGGER.info("Triggered shutdown on OpChainScheduler...");
+ // this will just notify all waiters that the scheduler is shutting down
_monitor.enter();
_monitor.leave();
}
@@ -78,18 +78,19 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
}
OpChain operatorChain = _scheduler.next();
+ LOGGER.trace("({}): Scheduling", operatorChain);
_workerPool.submit(new TraceRunnable() {
@Override
public void runJob() {
try {
- ThreadResourceUsageProvider timer =
operatorChain.getAndStartTimer();
+ LOGGER.trace("({}): Executing", operatorChain);
+ operatorChain.getStats().executing();
// so long as there's work to be done, keep getting the next
block
// when the operator chain returns a NOOP block, then yield the
execution
// of this to another worker
TransferableBlock result = operatorChain.getRoot().nextBlock();
while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
- LOGGER.debug("Got block with {} rows.", result.getNumRows());
result = operatorChain.getRoot().nextBlock();
}
@@ -97,10 +98,15 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
// not complete, needs to re-register for scheduling
register(operatorChain, false);
} else {
- LOGGER.debug("Execution time: " + timer.getThreadTimeNs());
+ LOGGER.debug("({}): Completed {}",
+ operatorChain,
+ operatorChain.getStats());
}
} catch (Exception e) {
- LOGGER.error("Failed to execute query!", e);
+ LOGGER.error("({}): Failed to execute operator chain! {}",
+ operatorChain,
+ operatorChain.getStats(),
+ e);
}
}
});
@@ -117,12 +123,26 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
*/
public final void register(OpChain operatorChain) {
register(operatorChain, true);
+ LOGGER.debug("({}): Scheduler is now handling operator chain listening to
mailboxes {}. "
+ + "There are a total of {} chains awaiting execution.",
+ operatorChain,
+ operatorChain.getReceivingMailbox(),
+ _scheduler.size());
+
+ // we want to track the time that it takes from registering
+ // an operator chain to when it completes, so make sure to
+ // start the timer here
+ operatorChain.getStats().startExecutionTimer();
}
public final void register(OpChain operatorChain, boolean isNew) {
_monitor.enter();
try {
+ LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}",
+ operatorChain, isNew, _scheduler.size());
+
_scheduler.register(operatorChain, isNew);
+ operatorChain.getStats().queued();
} finally {
_monitor.leave();
}
@@ -138,6 +158,7 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
public final void onDataAvailable(MailboxIdentifier mailbox) {
_monitor.enter();
try {
+ LOGGER.trace("Notified onDataAvailable for mailbox {}", mailbox);
_scheduler.onDataAvailable(mailbox);
} finally {
_monitor.leave();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index 5fc8649291..fe628d52b4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -64,6 +64,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
// immediately be considered ready in case it does not need
// read from any mailbox (e.g. with a LiteralValueOperator)
(isNew ? _ready : _available).add(operatorChain);
+ trace("registered " + operatorChain);
}
@Override
@@ -85,6 +86,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
//
// TODO: fix the memory leak by adding a close(opChain) callback
_seenMail.add(mailbox);
+ trace("got mail for " + mailbox);
}
@Override
@@ -95,7 +97,14 @@ public class RoundRobinScheduler implements OpChainScheduler
{
@Override
public OpChain next() {
- return _ready.poll();
+ OpChain op = _ready.poll();
+ trace("Polled " + op);
+ return op;
+ }
+
+ @Override
+ public int size() {
+ return _ready.size() + _available.size();
}
private void computeReady() {
@@ -121,4 +130,9 @@ public class RoundRobinScheduler implements
OpChainScheduler {
}
}
}
+
+ private void trace(String operation) {
+ LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}",
+ operation, _ready, _available, _seenMail);
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index af9cd6faef..0bdc656164 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -18,15 +18,12 @@
*/
package org.apache.pinot.query.runtime.operator;
-import com.google.common.base.Suppliers;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.function.Supplier;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
/**
@@ -36,28 +33,32 @@ import
org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
public class OpChain {
private final Operator<TransferableBlock> _root;
- // TODO: build timers that are partial-execution aware
- private final Supplier<ThreadResourceUsageProvider> _timer;
private final Set<MailboxIdentifier> _receivingMailbox;
+ private final OpChainStats _stats;
+ private final String _id;
- public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier>
receivingMailboxes) {
+ public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier>
receivingMailboxes, long requestId,
+ int stageId) {
_root = root;
_receivingMailbox = new HashSet<>(receivingMailboxes);
-
- // use memoized supplier so that the timing doesn't start until the
- // first time we get the timer
- _timer = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
+ _id = String.format("%s_%s", requestId, stageId);
+ _stats = new OpChainStats(_id);
}
public Operator<TransferableBlock> getRoot() {
return _root;
}
- public ThreadResourceUsageProvider getAndStartTimer() {
- return _timer.get();
- }
-
public Set<MailboxIdentifier> getReceivingMailbox() {
return _receivingMailbox;
}
+
+ public OpChainStats getStats() {
+ return _stats;
+ }
+
+ @Override
+ public String toString() {
+ return "OpChain{ " + _id + "}";
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
new file mode 100644
index 0000000000..58327c40da
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Suppliers;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+
+
+/**
+ * {@code OpChainStats} tracks execution statistics for {@link OpChain}s.
+ */
+public class OpChainStats {
+
+ // use memoized supplier so that the timing doesn't start until the
+ // first time we get the timer
+ private final Supplier<ThreadResourceUsageProvider> _exTimer
+ = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
+
+ // this is used to make sure that toString() doesn't have side
+ // effects (accidentally starting the timer)
+ private volatile boolean _exTimerStarted = false;
+
+ private final Stopwatch _queuedStopwatch = Stopwatch.createUnstarted();
+ private final AtomicLong _queuedCount = new AtomicLong();
+
+ private final String _id;
+
+ public OpChainStats(String id) {
+ _id = id;
+ }
+
+ public void executing() {
+ startExecutionTimer();
+ if (_queuedStopwatch.isRunning()) {
+ _queuedStopwatch.stop();
+ }
+ }
+
+ public void queued() {
+ _queuedCount.incrementAndGet();
+ if (!_queuedStopwatch.isRunning()) {
+ _queuedStopwatch.start();
+ }
+ }
+
+ public void startExecutionTimer() {
+ _exTimerStarted = true;
+ _exTimer.get();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued
Time: %sms",
+ _id,
+ _queuedCount.get(),
+ _exTimerStarted ?
TimeUnit.NANOSECONDS.toMillis(_exTimer.get().getThreadTimeNs()) : 0,
+ _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ );
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 9f7002baf2..6754b675af 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -58,7 +58,7 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<Operator<Transferab
public static OpChain build(StageNode node, PlanRequestContext context) {
Operator<TransferableBlock> root = node.visit(INSTANCE, context);
- return new OpChain(root, context.getReceivingMailboxes());
+ return new OpChain(root, context.getReceivingMailboxes(),
context.getRequestId(), context.getStageId());
}
@Override
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 0a378ed291..5a94c3708f 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -71,7 +71,7 @@ public class OpChainSchedulerServiceTest {
}
private OpChain getChain(Operator<TransferableBlock> operator) {
- return new OpChain(operator, ImmutableList.of());
+ return new OpChain(operator, ImmutableList.of(), 123, 1);
}
@Test
@@ -91,7 +91,7 @@ public class OpChainSchedulerServiceTest {
// When:
scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
+ scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
// Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be
called in less than 10 seconds");
@@ -114,7 +114,7 @@ public class OpChainSchedulerServiceTest {
});
// When:
- scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
+ scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
scheduler.startAsync().awaitRunning();
// Then:
@@ -141,7 +141,7 @@ public class OpChainSchedulerServiceTest {
// When:
scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
+ scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
// Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be
called in less than 10 seconds");
@@ -182,8 +182,8 @@ public class OpChainSchedulerServiceTest {
// When:
scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of()));
- scheduler.register(new OpChain(_operatorB, ImmutableList.of()));
+ scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+ scheduler.register(new OpChain(_operatorB, ImmutableList.of(), 123, 1));
// Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be
called in less than 10 seconds");
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 4862c6f2f5..1dd3a80cfd 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -56,7 +56,7 @@ public class RoundRobinSchedulerTest {
@Test
public void shouldScheduleNewOpChainsImmediately() {
// Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123,
1);
RoundRobinScheduler scheduler = new RoundRobinScheduler();
// When:
@@ -70,7 +70,7 @@ public class RoundRobinSchedulerTest {
@Test
public void shouldNotScheduleRescheduledOpChainsImmediately() {
// Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123,
1);
RoundRobinScheduler scheduler = new RoundRobinScheduler();
// When:
@@ -83,8 +83,8 @@ public class RoundRobinSchedulerTest {
@Test
public void shouldScheduleRescheduledOpChainOnDataAvailable() {
// Given:
- OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
- OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2));
+ OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123,
1);
+ OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), 123,
1);
RoundRobinScheduler scheduler = new RoundRobinScheduler();
// When:
@@ -101,7 +101,7 @@ public class RoundRobinSchedulerTest {
@Test
public void shouldScheduleRescheduledOpChainOnDataAvailableBeforeRegister() {
// Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123,
1);
RoundRobinScheduler scheduler = new RoundRobinScheduler();
// When:
@@ -116,7 +116,7 @@ public class RoundRobinSchedulerTest {
@Test
public void
shouldNotScheduleRescheduledOpChainOnDataAvailableForDifferentMailbox() {
// Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1));
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123,
1);
RoundRobinScheduler scheduler = new RoundRobinScheduler();
// When:
@@ -130,7 +130,7 @@ public class RoundRobinSchedulerTest {
@Test
public void shouldScheduleRescheduledOpChainOnDataAvailableForAnyMailbox() {
// Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1,
MAILBOX_2));
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1,
MAILBOX_2), 123, 1);
RoundRobinScheduler scheduler = new RoundRobinScheduler();
// When:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]