Copilot commented on code in PR #17797:
URL: https://github.com/apache/pinot/pull/17797#discussion_r2875543044
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -91,14 +101,42 @@ public void registeredReader(Reader reader) {
_blocks.registerReader(reader);
}
+ /// Sets the context of the thread owning this [ReceivingMailbox].
+ public synchronized void setThreadContext(@Nullable QueryThreadContext
threadContext) {
+ assert _threadContext == null;
+ // NOTE: In production code, threadContext should never be null. It might
be null in tests when QueryThreadContext
+ // is not set up.
+ if (threadContext == null) {
+ return;
+ }
+ _threadContext = threadContext;
+ if (_untrackedCpuTimeNs > 0 || _untrackedAllocatedBytes > 0) {
+ updateResourceUsage(threadContext, _untrackedCpuTimeNs,
_untrackedAllocatedBytes);
+ _untrackedCpuTimeNs = 0;
+ _untrackedAllocatedBytes = 0;
+ }
Review Comment:
`setThreadContext()` relies on a Java `assert` to enforce single-assignment
(`_threadContext == null`). Asserts are typically disabled in production, so
this invariant won’t be enforced and the mailbox context could be overwritten
silently, leading to incorrect resource attribution. Use a runtime check (e.g.,
`Preconditions.checkState(_threadContext == null, ...)`) or make the method
idempotent by returning early when `_threadContext` is already set (optionally
verifying it matches).
```suggestion
// NOTE: In production code, threadContext should never be null. It
might be null in tests when QueryThreadContext
// is not set up.
if (threadContext == null) {
return;
}
if (_threadContext == null) {
_threadContext = threadContext;
if (_untrackedCpuTimeNs > 0 || _untrackedAllocatedBytes > 0) {
updateResourceUsage(threadContext, _untrackedCpuTimeNs,
_untrackedAllocatedBytes);
_untrackedCpuTimeNs = 0;
_untrackedAllocatedBytes = 0;
}
return;
}
// Multiple non-null contexts for the same mailbox indicate a misuse;
allow idempotent re-setting
// with the same instance but disallow changing to a different context.
Preconditions.checkState(_threadContext == threadContext,
"ReceivingMailbox %s already has a different thread context set",
_id);
```
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -139,7 +182,18 @@ public ReceivingMailboxStatus offerRaw(List<ByteBuffer>
byteBuffers, long timeou
}
stats = List.of();
}
- return offerPrivate(block, stats, timeoutMs);
+ ReceivingMailboxStatus status = offerPrivate(block, stats, timeoutMs);
+ long cpuTimeNs = resourceSnapshot.getCpuTimeNs();
+ long allocatedBytes = resourceSnapshot.getAllocatedBytes();
+ synchronized (this) {
+ if (_threadContext != null) {
+ updateResourceUsage(_threadContext, cpuTimeNs, allocatedBytes);
+ } else {
+ _untrackedCpuTimeNs += cpuTimeNs;
Review Comment:
Minor style: there’s an extra space in `_untrackedCpuTimeNs += cpuTimeNs;`.
Consider fixing to a single space to match surrounding style.
```suggestion
_untrackedCpuTimeNs += cpuTimeNs;
```
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java:
##########
@@ -328,4 +328,39 @@ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus>
offer(MseBlock block,
ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> receivingMailbox.offer(block,
List.of(), 10_000), executor);
}
+
+ @Test
+ public void testResourceTracking()
+ throws Exception {
+ ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+
+ QueryThreadContext threadContext = mock(QueryThreadContext.class);
+
+ // Receive after setting thread context
+ TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id",
threadContext);
+ receivingMailbox.setThreadContext(threadContext);
+
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
+
+ // Receive before setting thread context
+ receivingMailbox = new TestReceivingMailbox("id", threadContext);
+
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
+ receivingMailbox.setThreadContext(threadContext);
+ }
+
+ private static class TestReceivingMailbox extends ReceivingMailbox {
+ final QueryThreadContext _expectedThreadContext;
+
+ public TestReceivingMailbox(String id, QueryThreadContext
expectedThreadContext) {
+ super(id);
+ _expectedThreadContext = expectedThreadContext;
+ }
+
+ @Override
+ void updateResourceUsage(QueryThreadContext threadContext, long cpuTimeNs,
long allocatedBytes) {
+ assertSame(threadContext, _expectedThreadContext);
+ assertTrue(cpuTimeNs > 0);
+ assertTrue(allocatedBytes > 0);
Review Comment:
`cpuTimeNs > 0` / `allocatedBytes > 0` can be flaky: the underlying
measurements may be unsupported (in which case enabling is a no-op and deltas
stay 0) or the operation may be too small to register a positive delta.
Consider skipping the test when measurements aren’t actually enabled (check
`ThreadResourceUsageProvider.isThread*MeasurementEnabled()` after setting),
and/or relax the assertions to `>= 0` while still asserting that
`updateResourceUsage()` is invoked.
```suggestion
assertTrue(cpuTimeNs >= 0);
assertTrue(allocatedBytes >= 0);
```
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java:
##########
@@ -328,4 +328,39 @@ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus>
offer(MseBlock block,
ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> receivingMailbox.offer(block,
List.of(), 10_000), executor);
}
+
+ @Test
+ public void testResourceTracking()
+ throws Exception {
+ ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+
+ QueryThreadContext threadContext = mock(QueryThreadContext.class);
+
+ // Receive after setting thread context
+ TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id",
threadContext);
+ receivingMailbox.setThreadContext(threadContext);
+
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
+
+ // Receive before setting thread context
+ receivingMailbox = new TestReceivingMailbox("id", threadContext);
+
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
+ receivingMailbox.setThreadContext(threadContext);
Review Comment:
This test enables thread CPU/memory measurement globally via
`ThreadResourceUsageProvider` but never restores the previous settings. Because
these flags are static, this can make the overall test suite order-dependent.
Capture the prior enabled states and restore them in a `finally` block (or an
`@AfterMethod`).
```suggestion
boolean prevCpuEnabled =
ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
boolean prevMemEnabled =
ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
try {
QueryThreadContext threadContext = mock(QueryThreadContext.class);
// Receive after setting thread context
TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id",
threadContext);
receivingMailbox.setThreadContext(threadContext);
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
// Receive before setting thread context
receivingMailbox = new TestReceivingMailbox("id", threadContext);
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
receivingMailbox.setThreadContext(threadContext);
} finally {
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(prevCpuEnabled);
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(prevMemEnabled);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]