400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708620303


##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
     }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+    file: File,
+    header: NpyHeader,
+    row_cursor: usize,
+    column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+    /// Create a new streaming NumPy reader.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let path = path.as_ref();
+
+        match path.try_exists() {
+            Ok(false) => {
+                return Err(MahoutError::Io(format!(
+                    "NumPy file not found: {}",
+                    path.display()
+                )));
+            }
+            Err(e) => {
+                return Err(MahoutError::Io(format!(
+                    "Failed to check if NumPy file exists at {}: {}",
+                    path.display(),
+                    e
+                )));
+            }
+            Ok(true) => {}
+        }
+
+        let mut file = File::open(path)
+            .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+        let header = read_npy_header(path, &mut file)?;
+
+        Ok(Self {
+            file,
+            header,
+            row_cursor: 0,
+            column_buf: Vec::new(),
+        })
+    }
+}
+
+impl DataReader for NumpyStreamingReader {
+    fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+        let total_elements = self.header.total_elements();
+        let mut data = vec![0.0; total_elements];
+        let mut written = 0;
+        while written < total_elements {
+            let n = self.read_chunk(&mut data[written..])?;
+            if n == 0 {
+                break;
+            }
+            written += n;
+        }
+        if written != total_elements {
+            data.truncate(written);
+        }
+
+        Ok((data, self.header.num_samples, self.header.sample_size))
+    }
+
+    fn get_sample_size(&self) -> Option<usize> {
+        Some(self.header.sample_size)
+    }
+
+    fn get_num_samples(&self) -> Option<usize> {
+        Some(self.header.num_samples)
+    }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+    fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        if self.row_cursor >= self.header.num_samples {
+            return Ok(0);
+        }
+
+        let sample_size = self.header.sample_size;
+        let max_rows = buffer.len() / sample_size;
+        if max_rows == 0 {
+            return Err(MahoutError::InvalidInput(format!(
+                "Buffer too small for one sample (need {} elements)",
+                sample_size
+            )));
+        }
+
+        let remaining_rows = self.header.num_samples - self.row_cursor;
+        let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+        let elem_count = rows_to_read * sample_size;
+
+        if !self.header.fortran_order {
+            let offset = self.header.data_offset
+                + (self.row_cursor * sample_size * std::mem::size_of::<f64>()) 
as u64;
+            read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+        } else {
+            if self.column_buf.len() < rows_to_read {
+                self.column_buf.resize(rows_to_read, 0.0);
+            }
+            for col in 0..sample_size {
+                let offset = self.header.data_offset
+                    + ((col * self.header.num_samples + self.row_cursor)
+                        * std::mem::size_of::<f64>()) as u64;
+                let column = &mut self.column_buf[..rows_to_read];
+                read_f64s_at(&mut self.file, offset, column)?;
+                for row in 0..rows_to_read {
+                    buffer[row * sample_size + col] = column[row];
+                }
+            }
+        }
+
+        self.row_cursor += rows_to_read;
+        Ok(elem_count)
+    }
+
+    fn total_rows(&self) -> usize {
+        self.header.num_samples
+    }
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+    mmap: Mmap,
+    header: NpyHeader,
+    row_cursor: usize,
+    column_buf: Vec<f64>,
+}
+
+impl NumpyMmapReader {
+    /// Create a new memory-mapped NumPy reader.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let path = path.as_ref();
+
+        match path.try_exists() {
+            Ok(false) => {
+                return Err(MahoutError::Io(format!(
+                    "NumPy file not found: {}",
+                    path.display()
+                )));
+            }
+            Err(e) => {
+                return Err(MahoutError::Io(format!(
+                    "Failed to check if NumPy file exists at {}: {}",
+                    path.display(),
+                    e
+                )));
+            }
+            Ok(true) => {}
+        }
+
+        let mut file = File::open(path)
+            .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+        let header = read_npy_header(path, &mut file)?;
+        let mmap = unsafe {
+            Mmap::map(&file)
+                .map_err(|e| MahoutError::Io(format!("Failed to mmap NumPy 
file: {}", e)))?
+        };
+
+        Ok(Self {
+            mmap,
+            header,
+            row_cursor: 0,
+            column_buf: Vec::new(),
+        })
+    }
+}
+
+impl DataReader for NumpyMmapReader {
+    fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+        let total_elements = self.header.total_elements();
+        let mut data = vec![0.0; total_elements];
+        let mut written = 0;
+        while written < total_elements {
+            let n = self.read_chunk(&mut data[written..])?;
+            if n == 0 {
+                break;
+            }
+            written += n;
+        }
+        if written != total_elements {
+            data.truncate(written);
+        }
+
+        Ok((data, self.header.num_samples, self.header.sample_size))
+    }
+
+    fn get_sample_size(&self) -> Option<usize> {
+        Some(self.header.sample_size)
+    }
+
+    fn get_num_samples(&self) -> Option<usize> {
+        Some(self.header.num_samples)
+    }
+}
+
+impl StreamingDataReader for NumpyMmapReader {
+    fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        if self.row_cursor >= self.header.num_samples {
+            return Ok(0);
+        }
+
+        let sample_size = self.header.sample_size;
+        let max_rows = buffer.len() / sample_size;
+        if max_rows == 0 {
+            return Err(MahoutError::InvalidInput(format!(
+                "Buffer too small for one sample (need {} elements)",
+                sample_size
+            )));
+        }
+
+        let remaining_rows = self.header.num_samples - self.row_cursor;
+        let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+        let elem_count = rows_to_read * sample_size;
+        let data_base = self.header.data_offset as usize;
+
+        if !self.header.fortran_order {
+            let start = data_base + self.row_cursor * sample_size * 
std::mem::size_of::<f64>();
+            let end = start + elem_count * std::mem::size_of::<f64>();
+            let bytes = &self.mmap[start..end];
+            copy_f64s_from_bytes(bytes, &mut buffer[..elem_count])?;

Review Comment:
   wait I double check rich didn't put zero copy into the issue list 
https://github.com/apache/mahout/issues/718 weird



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to