Copilot commented on code in PR #945:
URL: https://github.com/apache/mahout/pull/945#discussion_r2729489977


##########
qdp/qdp-core/src/gpu/overlap_tracker.rs:
##########
@@ -0,0 +1,453 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Overlap tracking for H2D copy and compute operations.
+//!
+//! Uses CUDA events to measure the overlap between host-to-device copy 
operations
+//! and compute operations, enabling verification of the >60% overlap target.
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::cuda_ffi::{
+    CUDA_EVENT_DEFAULT, CUDA_SUCCESS, cudaEventCreateWithFlags, 
cudaEventDestroy,
+    cudaEventElapsedTime, cudaEventRecord, cudaEventSynchronize,
+};
+use cudarc::driver::safe::CudaStream;
+use std::ffi::c_void;
+
+/// Tracks overlap between H2D copy and compute operations using CUDA events.
+///
+/// Creates events for each pool slot to track copy and compute start/end 
times.
+/// Can be optionally enabled via environment variable to minimize overhead 
when disabled.
+pub struct OverlapTracker {
+    copy_start_events: Vec<*mut c_void>,
+    copy_end_events: Vec<*mut c_void>,
+    compute_start_events: Vec<*mut c_void>,
+    compute_end_events: Vec<*mut c_void>,
+    pool_size: usize,
+    enabled: bool,
+}
+
+impl OverlapTracker {
+    /// Create a new OverlapTracker for the given pool size.
+    ///
+    /// If disabled, no events are created and all operations are no-ops.
+    pub fn new(pool_size: usize, enabled: bool) -> Result<Self> {
+        if !enabled {
+            return Ok(Self {
+                copy_start_events: Vec::new(),
+                copy_end_events: Vec::new(),
+                compute_start_events: Vec::new(),
+                compute_end_events: Vec::new(),
+                pool_size,
+                enabled: false,
+            });
+        }
+
+        let mut copy_start: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+        let mut copy_end: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+        let mut compute_start: Vec<*mut c_void> = 
Vec::with_capacity(pool_size);
+        let mut compute_end: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+
+        unsafe {
+            for _ in 0..pool_size {
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                copy_start.push(ev);
+
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                copy_end.push(ev);
+
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                compute_start.push(ev);
+
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                compute_end.push(ev);
+            }
+        }
+
+        Ok(Self {
+            copy_start_events: copy_start,
+            copy_end_events: copy_end,
+            compute_start_events: compute_start,
+            compute_end_events: compute_end,
+            pool_size,
+            enabled,
+        })
+    }
+
+    unsafe fn cleanup_events(events: &[&Vec<*mut c_void>]) {
+        for event_vec in events {
+            for ev in event_vec.iter() {
+                if !ev.is_null() {
+                    unsafe {
+                        let _ = cudaEventDestroy(*ev);
+                    }
+                }
+            }
+        }
+    }
+
+    fn validate_slot(&self, slot: usize) -> Result<()> {
+        if slot >= self.pool_size {
+            return Err(MahoutError::InvalidInput(format!(
+                "Slot {} out of range (max: {})",
+                slot,
+                self.pool_size.saturating_sub(1)
+            )));
+        }
+        Ok(())
+    }
+
+    fn record_event(&self, event: *mut c_void, stream: &CudaStream) -> 
Result<()> {
+        // Validate event is not null before recording
+        // Ref: 
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+        // cudaEventRecord returns cudaErrorInvalidResourceHandle if event is 
NULL
+        if event.is_null() {
+            return Err(MahoutError::Cuda(
+                "Cannot record event: event is null (invalid resource 
handle)".to_string(),
+            ));
+        }
+
+        unsafe {
+            // Ref: 
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+            // cudaEventRecord captures the contents of stream at the time of 
this call
+            // The operation is asynchronous - use cudaEventQuery or 
cudaEventSynchronize
+            // to determine when the event has actually been recorded
+            let ret = cudaEventRecord(event, stream.stream as *mut c_void);
+            if ret != CUDA_SUCCESS {
+                return Err(MahoutError::Cuda(format!(
+                    "cudaEventRecord failed: {} ({}). Event may be invalid or 
stream may be invalid.",
+                    ret,
+                    crate::error::cuda_error_to_string(ret)
+                )));
+            }
+        }
+        Ok(())
+    }
+
+    /// Record the start of a copy operation on the copy stream.
+    pub fn record_copy_start(&self, stream: &CudaStream, slot: usize) -> 
Result<()> {
+        if !self.enabled {
+            return Ok(());
+        }
+        self.validate_slot(slot)?;
+        self.record_event(self.copy_start_events[slot], stream)
+    }
+
+    /// Record the end of a copy operation on the copy stream.
+    pub fn record_copy_end(&self, stream: &CudaStream, slot: usize) -> 
Result<()> {
+        if !self.enabled {
+            return Ok(());
+        }
+        self.validate_slot(slot)?;
+        self.record_event(self.copy_end_events[slot], stream)
+    }
+
+    /// Record the start of a compute operation on the compute stream.
+    pub fn record_compute_start(&self, stream: &CudaStream, slot: usize) -> 
Result<()> {
+        if !self.enabled {
+            return Ok(());
+        }
+        self.validate_slot(slot)?;
+        self.record_event(self.compute_start_events[slot], stream)
+    }
+
+    /// Record the end of a compute operation on the compute stream.
+    pub fn record_compute_end(&self, stream: &CudaStream, slot: usize) -> 
Result<()> {
+        if !self.enabled {
+            return Ok(());
+        }
+        self.validate_slot(slot)?;
+        self.record_event(self.compute_end_events[slot], stream)
+    }
+
+    /// Calculate the overlap ratio for a specific chunk.
+    ///
+    /// Returns overlap ratio (0.0-1.0): min(copy_time, compute_time) / 
max(copy_time, compute_time)
+    ///
+    /// Ref: 
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+    /// Events must be recorded before querying. This function waits for 
events to complete.
+    ///
+    /// Note: For detailed timing diagnostics, use 
`calculate_overlap_with_timing()`.
+    pub fn calculate_overlap(&self, chunk_idx: usize) -> Result<f64> {
+        self.calculate_overlap_with_timing(chunk_idx)
+            .map(|(overlap, _, _, _)| overlap)
+    }
+
+    /// Calculate overlap ratio with detailed timing information.
+    ///
+    /// Returns (overlap_ratio, copy_time_ms, compute_time_ms, overlap_time_ms)
+    /// for detailed diagnostics at DEBUG level.
+    fn calculate_overlap_with_timing(&self, chunk_idx: usize) -> Result<(f64, 
f32, f32, f32)> {
+        if !self.enabled {
+            return Ok((0.0, 0.0, 0.0, 0.0));
+        }
+
+        let slot = chunk_idx % self.pool_size;
+
+        // Validate events are not null before querying
+        // Ref: 
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+        // cudaEventQuery returns cudaErrorInvalidResourceHandle if event is 
NULL
+        if self.copy_end_events[slot].is_null() || 
self.compute_end_events[slot].is_null() {
+            return Err(MahoutError::Cuda(format!(
+                "Event is null for chunk {} slot {}: events may not have been 
created",
+                chunk_idx, slot
+            )));
+        }
+
+        unsafe {
+            // Wait for events to complete before calculating elapsed time
+            // Ref: 
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+            //
+            // Critical: According to CUDA docs (2026):

Review Comment:
   The comment references "CUDA docs (2026)" which is referring to a future 
date (or the current year). This appears to be an error - either the year 
should be changed to reflect when the documentation was actually consulted, or 
the reference should be more specific about which version of CUDA documentation 
is being referenced. Consider using a specific CUDA version number instead of a 
year.
   ```suggestion
               // Critical: According to the CUDA Runtime API documentation:
   ```



##########
qdp/qdp-core/src/gpu/pool_metrics.rs:
##########
@@ -0,0 +1,215 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Pool utilization metrics for diagnosing pool starvation.
+//!
+//! Provides lightweight, thread-safe metrics tracking for pinned buffer pool
+//! utilization. Uses lock-free atomic operations to minimize performance 
impact.
+
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+
+/// Pool utilization metrics (thread-safe, lock-free design)
+///
+/// Tracks pool availability, acquire operations, and wait times to diagnose
+/// pool starvation issues. Uses atomic operations with Relaxed ordering for
+/// minimal performance overhead.
+#[derive(Debug)]
+pub struct PoolMetrics {
+    min_available: AtomicUsize,
+    max_available: AtomicUsize,
+    total_acquires: AtomicU64,
+    total_waits: AtomicU64, // Number of times pool was empty when acquiring
+    total_wait_time_ns: AtomicU64, // Total wait time in nanoseconds
+}
+
+impl PoolMetrics {
+    /// Create a new PoolMetrics instance with all counters initialized to 
zero.
+    pub fn new() -> Self {
+        Self {
+            min_available: AtomicUsize::new(usize::MAX),
+            max_available: AtomicUsize::new(0),
+            total_acquires: AtomicU64::new(0),
+            total_waits: AtomicU64::new(0),
+            total_wait_time_ns: AtomicU64::new(0),
+        }
+    }
+
+    /// Record an acquire operation with the number of available buffers at 
that time.
+    pub fn record_acquire(&self, available: usize) {
+        let current_min = self.min_available.load(Ordering::Relaxed);
+        if available < current_min {
+            self.min_available.store(available, Ordering::Relaxed);
+        }
+
+        let current_max = self.max_available.load(Ordering::Relaxed);
+        if available > current_max {
+            self.max_available.store(available, Ordering::Relaxed);

Review Comment:
   The min/max tracking has a race condition. Between loading the current value 
and storing the new value, another thread could modify it, leading to incorrect 
min/max values. Consider using compare_and_swap in a loop to ensure atomicity, 
or document that these values are approximate and may be incorrect under 
concurrent access.
   ```suggestion
           // Update minimum available using a compare-and-swap loop to avoid 
races.
           loop {
               let current_min = self.min_available.load(Ordering::Relaxed);
               if available >= current_min {
                   break;
               }
               match self.min_available.compare_exchange(
                   current_min,
                   available,
                   Ordering::Relaxed,
                   Ordering::Relaxed,
               ) {
                   Ok(_) => break,
                   Err(_) => continue,
               }
           }
   
           // Update maximum available using a compare-and-swap loop to avoid 
races.
           loop {
               let current_max = self.max_available.load(Ordering::Relaxed);
               if available <= current_max {
                   break;
               }
               match self.max_available.compare_exchange(
                   current_max,
                   available,
                   Ordering::Relaxed,
                   Ordering::Relaxed,
               ) {
                   Ok(_) => break,
                   Err(_) => continue,
               }
   ```



##########
qdp/qdp-core/src/error.rs:
##########
@@ -59,6 +59,8 @@ pub fn cuda_error_to_string(code: i32) -> &'static str {
         12 => "cudaErrorInvalidDevicePointer",
         17 => "cudaErrorInvalidMemcpyDirection",
         30 => "cudaErrorUnknown",
+        400 => "cudaErrorInvalidResourceHandle",
+        600 => "cudaErrorInvalidResourceHandle", // Alternative error code 
mapping

Review Comment:
   The CUDA error code 600 is not a standard CUDA error code. According to CUDA 
documentation, cudaErrorInvalidResourceHandle is error code 400. Error code 600 
is not defined in standard CUDA. This mapping should be verified and corrected 
if needed, or a comment should explain why both codes map to the same error.
   ```suggestion
   
   ```



##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -107,6 +110,32 @@ impl PinnedBufferPool {
 
     /// Acquire a pinned buffer, blocking until one is available.
     pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+        self.acquire_with_metrics(None)
+    }
+
+    /// Acquire a pinned buffer with optional metrics tracking.
+    ///
+    /// # Arguments
+    /// * `metrics` - Optional PoolMetrics instance for tracking utilization
+    ///
+    /// If metrics is provided, records the number of available buffers at 
acquire time
+    /// and tracks wait times if the pool is empty.
+    pub fn acquire_with_metrics(
+        self: &Arc<Self>,
+        metrics: Option<&PoolMetrics>,
+    ) -> PinnedBufferHandle {
+        let available = self.available();
+
+        if let Some(m) = metrics {
+            m.record_acquire(available);
+        }
+
+        let start_time = if metrics.is_some() {
+            Some(Instant::now())
+        } else {
+            None
+        };
+
         let mut free = self.lock_free();

Review Comment:
   There's a time-of-check to time-of-use (TOCTOU) race condition here. The 
available() call at line 127 locks and reads the pool size, then unlocks. By 
the time lock_free() is called at line 139, another thread could have modified 
the pool, making the recorded available count inaccurate. Consider recording 
the available count after acquiring the lock on line 139 by using free.len() 
instead of calling self.available() before the lock.



##########
qdp/qdp-core/src/gpu/overlap_tracker.rs:
##########
@@ -0,0 +1,453 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Overlap tracking for H2D copy and compute operations.
+//!
+//! Uses CUDA events to measure the overlap between host-to-device copy 
operations
+//! and compute operations, enabling verification of the >60% overlap target.
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::cuda_ffi::{
+    CUDA_EVENT_DEFAULT, CUDA_SUCCESS, cudaEventCreateWithFlags, 
cudaEventDestroy,
+    cudaEventElapsedTime, cudaEventRecord, cudaEventSynchronize,
+};
+use cudarc::driver::safe::CudaStream;
+use std::ffi::c_void;
+
+/// Tracks overlap between H2D copy and compute operations using CUDA events.
+///
+/// Creates events for each pool slot to track copy and compute start/end 
times.
+/// Can be optionally enabled via environment variable to minimize overhead 
when disabled.
+pub struct OverlapTracker {
+    copy_start_events: Vec<*mut c_void>,
+    copy_end_events: Vec<*mut c_void>,
+    compute_start_events: Vec<*mut c_void>,
+    compute_end_events: Vec<*mut c_void>,
+    pool_size: usize,
+    enabled: bool,
+}
+
+impl OverlapTracker {
+    /// Create a new OverlapTracker for the given pool size.
+    ///
+    /// If disabled, no events are created and all operations are no-ops.
+    pub fn new(pool_size: usize, enabled: bool) -> Result<Self> {
+        if !enabled {
+            return Ok(Self {
+                copy_start_events: Vec::new(),
+                copy_end_events: Vec::new(),
+                compute_start_events: Vec::new(),
+                compute_end_events: Vec::new(),
+                pool_size,
+                enabled: false,
+            });
+        }
+
+        let mut copy_start: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+        let mut copy_end: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+        let mut compute_start: Vec<*mut c_void> = 
Vec::with_capacity(pool_size);
+        let mut compute_end: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+
+        unsafe {
+            for _ in 0..pool_size {
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                copy_start.push(ev);
+
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                copy_end.push(ev);
+
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                compute_start.push(ev);
+
+                let mut ev: *mut c_void = std::ptr::null_mut();
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DEFAULT);
+                if ret != CUDA_SUCCESS {
+                    Self::cleanup_events(&[&copy_start, &copy_end, 
&compute_start, &compute_end]);
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
+                compute_end.push(ev);
+            }
+        }
+
+        Ok(Self {
+            copy_start_events: copy_start,
+            copy_end_events: copy_end,
+            compute_start_events: compute_start,
+            compute_end_events: compute_end,
+            pool_size,
+            enabled,
+        })
+    }
+
+    unsafe fn cleanup_events(events: &[&Vec<*mut c_void>]) {
+        for event_vec in events {
+            for ev in event_vec.iter() {
+                if !ev.is_null() {
+                    unsafe {
+                        let _ = cudaEventDestroy(*ev);
+                    }

Review Comment:
   The inner unsafe block on line 126 is redundant since the entire function is 
already marked as unsafe on line 122. The inner unsafe can be removed.
   ```suggestion
                       let _ = cudaEventDestroy(*ev);
   ```



##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -115,10 +144,22 @@ impl PinnedBufferPool {
                     pool: Arc::clone(self),
                 };
             }
-            free = self
-                .available_cv
-                .wait(free)
-                .unwrap_or_else(|poisoned| poisoned.into_inner());
+
+            // Record wait if metrics enabled
+            if let Some(m) = metrics {
+                let wait_start = start_time.unwrap();
+                free = self
+                    .available_cv
+                    .wait(free)
+                    .unwrap_or_else(|poisoned| poisoned.into_inner());
+                let wait_time = wait_start.elapsed();
+                m.record_wait(wait_time.as_nanos() as u64);

Review Comment:
   The wait time measurement logic is incorrect. The code records the wait time 
for each condvar wait iteration, but start_time is set before entering the 
loop. This means if a thread waits multiple times (multiple iterations of the 
loop), each wait will be measured from the original start_time, not from the 
beginning of that specific wait. This will result in incorrect cumulative wait 
times. The start_time should be captured immediately before each wait call, or 
the implementation should track whether this is the first wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to