Copilot commented on code in PR #17797:
URL: https://github.com/apache/pinot/pull/17797#discussion_r2875543044


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -91,14 +101,42 @@ public void registeredReader(Reader reader) {
     _blocks.registerReader(reader);
   }
 
+  /// Sets the context of the thread owning this [ReceivingMailbox].
+  public synchronized void setThreadContext(@Nullable QueryThreadContext 
threadContext) {
+    assert _threadContext == null;
+    // NOTE: In production code, threadContext should never be null. It might 
be null in tests when QueryThreadContext
+    //       is not set up.
+    if (threadContext == null) {
+      return;
+    }
+    _threadContext = threadContext;
+    if (_untrackedCpuTimeNs > 0 || _untrackedAllocatedBytes > 0) {
+      updateResourceUsage(threadContext, _untrackedCpuTimeNs, 
_untrackedAllocatedBytes);
+      _untrackedCpuTimeNs = 0;
+      _untrackedAllocatedBytes = 0;
+    }

Review Comment:
   `setThreadContext()` relies on a Java `assert` to enforce single-assignment 
(`_threadContext == null`). Asserts are typically disabled in production, so 
this invariant won’t be enforced and the mailbox context could be overwritten 
silently, leading to incorrect resource attribution. Use a runtime check (e.g., 
`Preconditions.checkState(_threadContext == null, ...)`) or make the method 
idempotent by returning early when `_threadContext` is already set (optionally 
verifying it matches).
   ```suggestion
       // NOTE: In production code, threadContext should never be null. It 
might be null in tests when QueryThreadContext
       //       is not set up.
       if (threadContext == null) {
         return;
       }
   
       if (_threadContext == null) {
         _threadContext = threadContext;
         if (_untrackedCpuTimeNs > 0 || _untrackedAllocatedBytes > 0) {
           updateResourceUsage(threadContext, _untrackedCpuTimeNs, 
_untrackedAllocatedBytes);
           _untrackedCpuTimeNs = 0;
           _untrackedAllocatedBytes = 0;
         }
         return;
       }
   
       // Multiple non-null contexts for the same mailbox indicate a misuse; 
allow idempotent re-setting
       // with the same instance but disallow changing to a different context.
       Preconditions.checkState(_threadContext == threadContext,
           "ReceivingMailbox %s already has a different thread context set", 
_id);
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -139,7 +182,18 @@ public ReceivingMailboxStatus offerRaw(List<ByteBuffer> 
byteBuffers, long timeou
       }
       stats = List.of();
     }
-    return offerPrivate(block, stats, timeoutMs);
+    ReceivingMailboxStatus status = offerPrivate(block, stats, timeoutMs);
+    long cpuTimeNs = resourceSnapshot.getCpuTimeNs();
+    long allocatedBytes = resourceSnapshot.getAllocatedBytes();
+    synchronized (this) {
+      if (_threadContext != null) {
+        updateResourceUsage(_threadContext, cpuTimeNs, allocatedBytes);
+      } else {
+        _untrackedCpuTimeNs +=  cpuTimeNs;

Review Comment:
   Minor style: there’s an extra space in `_untrackedCpuTimeNs +=  cpuTimeNs;`. 
Consider fixing to a single space to match surrounding style.
   ```suggestion
           _untrackedCpuTimeNs += cpuTimeNs;
   ```



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java:
##########
@@ -328,4 +328,39 @@ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> 
offer(MseBlock block,
       ExecutorService executor) {
     return CompletableFuture.supplyAsync(() -> receivingMailbox.offer(block, 
List.of(), 10_000), executor);
   }
+
+  @Test
+  public void testResourceTracking()
+      throws Exception {
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+
+    QueryThreadContext threadContext = mock(QueryThreadContext.class);
+
+    // Receive after setting thread context
+    TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id", 
threadContext);
+    receivingMailbox.setThreadContext(threadContext);
+    
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
+
+    // Receive before setting thread context
+    receivingMailbox = new TestReceivingMailbox("id", threadContext);
+    
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
+    receivingMailbox.setThreadContext(threadContext);
+  }
+
+  private static class TestReceivingMailbox extends ReceivingMailbox {
+    final QueryThreadContext _expectedThreadContext;
+
+    public TestReceivingMailbox(String id, QueryThreadContext 
expectedThreadContext) {
+      super(id);
+      _expectedThreadContext = expectedThreadContext;
+    }
+
+    @Override
+    void updateResourceUsage(QueryThreadContext threadContext, long cpuTimeNs, 
long allocatedBytes) {
+      assertSame(threadContext, _expectedThreadContext);
+      assertTrue(cpuTimeNs > 0);
+      assertTrue(allocatedBytes > 0);

Review Comment:
   `cpuTimeNs > 0` / `allocatedBytes > 0` can be flaky: the underlying 
measurements may be unsupported (in which case enabling is a no-op and deltas 
stay 0) or the operation may be too small to register a positive delta. 
Consider skipping the test when measurements aren’t actually enabled (check 
`ThreadResourceUsageProvider.isThread*MeasurementEnabled()` after setting), 
and/or relax the assertions to `>= 0` while still asserting that 
`updateResourceUsage()` is invoked.
   ```suggestion
         assertTrue(cpuTimeNs >= 0);
         assertTrue(allocatedBytes >= 0);
   ```



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java:
##########
@@ -328,4 +328,39 @@ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> 
offer(MseBlock block,
       ExecutorService executor) {
     return CompletableFuture.supplyAsync(() -> receivingMailbox.offer(block, 
List.of(), 10_000), executor);
   }
+
+  @Test
+  public void testResourceTracking()
+      throws Exception {
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+
+    QueryThreadContext threadContext = mock(QueryThreadContext.class);
+
+    // Receive after setting thread context
+    TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id", 
threadContext);
+    receivingMailbox.setThreadContext(threadContext);
+    
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
+
+    // Receive before setting thread context
+    receivingMailbox = new TestReceivingMailbox("id", threadContext);
+    
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
+    receivingMailbox.setThreadContext(threadContext);

Review Comment:
   This test enables thread CPU/memory measurement globally via 
`ThreadResourceUsageProvider` but never restores the previous settings. Because 
these flags are static, this can make the overall test suite order-dependent. 
Capture the prior enabled states and restore them in a `finally` block (or an 
`@AfterMethod`).
   ```suggestion
       boolean prevCpuEnabled = 
ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
       boolean prevMemEnabled = 
ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
       ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
       ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
       try {
         QueryThreadContext threadContext = mock(QueryThreadContext.class);
   
         // Receive after setting thread context
         TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id", 
threadContext);
         receivingMailbox.setThreadContext(threadContext);
         
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
   
         // Receive before setting thread context
         receivingMailbox = new TestReceivingMailbox("id", threadContext);
         
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
         receivingMailbox.setThreadContext(threadContext);
       } finally {
         
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(prevCpuEnabled);
         
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(prevMemEnabled);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to