rich7420 commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2616351697


##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +158,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()

Review Comment:
   I think we could add an early return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to