machichima commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2617091446


##########
qdp/qdp-core/src/io.rs:
##########
@@ -534,47 +572,116 @@ impl ParquetBlockReader {
                     }
                     let column = batch.column(0);
 
-                    let list_array = column
-                        .as_any()
-                        .downcast_ref::<ListArray>()
-                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
-
-                    if list_array.len() == 0 {
-                        continue;
-                    }
-
-                    // Extract sample size from first element
-                    let first_value = list_array.value(0);
-                    let float_array = first_value
-                        .as_any()
-                        .downcast_ref::<Float64Array>()
-                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
-
-                    let current_sample_size = float_array.len();
+                    let (current_sample_size, batch_values) = match 
column.data_type() {
+                        DataType::List(_) => {
+                            let list_array = column
+                                .as_any()
+                                .downcast_ref::<ListArray>()
+                                .ok_or_else(|| MahoutError::Io("Failed to 
downcast to ListArray".to_string()))?;
+
+                            if list_array.len() == 0 {
+                                continue;
+                            }
+
+                            let mut batch_values = Vec::new();
+                            let mut current_sample_size = None;
+                            for i in 0..list_array.len() {
+                                let value_array = list_array.value(i);
+                                let float_array = value_array
+                                    .as_any()
+                                    .downcast_ref::<Float64Array>()
+                                    .ok_or_else(|| MahoutError::Io("List 
values must be Float64".to_string()))?;
+
+                                if i == 0 {
+                                    current_sample_size = 
Some(float_array.len());
+                                }
+
+                                if float_array.null_count() == 0 {
+                                    
batch_values.extend_from_slice(float_array.values());
+                                } else {
+                                    return Err(MahoutError::Io("Null value 
encountered in Float64Array during quantum encoding. Please check data quality 
at the source.".to_string()));
+                                }
+                            }
+
+                            (current_sample_size.expect("list_array.len() > 0 
ensures at least one element"), batch_values)
+                        }
+                        DataType::FixedSizeList(_, size) => {
+                            let list_array = column
+                                .as_any()
+                                .downcast_ref::<FixedSizeListArray>()
+                                .ok_or_else(|| MahoutError::Io("Failed to 
downcast to FixedSizeListArray".to_string()))?;
+
+                            if list_array.len() == 0 {
+                                continue;
+                            }
+
+                            let current_sample_size = *size as usize;
+
+                            let values = list_array.values();
+                            let float_array = values
+                                .as_any()
+                                .downcast_ref::<Float64Array>()
+                                .ok_or_else(|| MahoutError::Io("FixedSizeList 
values must be Float64".to_string()))?;
+
+                            let mut batch_values = Vec::new();
+                            if float_array.null_count() == 0 {
+                                
batch_values.extend_from_slice(float_array.values());
+                            } else {
+                                return Err(MahoutError::Io("Null value 
encountered in Float64Array during quantum encoding. Please check data quality 
at the source.".to_string()));
+                            }
+
+                            (current_sample_size, batch_values)
+                        }
+                        _ => {
+                            return Err(MahoutError::Io(format!(
+                                "Expected List<Float64> or 
FixedSizeList<Float64>, got {:?}",
+                                column.data_type()
+                            )));
+                        }
+                    };
 
                     if self.sample_size.is_none() {
                         self.sample_size = Some(current_sample_size);
                         limit = calc_limit(current_sample_size);
-                    }
 
-                    // Extract all values from this batch
-                    let mut batch_values = Vec::new();
-                    for i in 0..list_array.len() {
-                        let value_array = list_array.value(i);
-                        let float_array = value_array
-                            .as_any()
-                            .downcast_ref::<Float64Array>()
-                            .ok_or_else(|| MahoutError::Io("List values must 
be Float64".to_string()))?;
-
-                        if float_array.null_count() == 0 {
-                            
batch_values.extend_from_slice(float_array.values());
-                        } else {
-                            return Err(MahoutError::Io("Null value encountered 
in Float64Array during quantum encoding. Please check data quality at the 
source.".to_string()));
+                        while self.leftover_cursor < self.leftover_data.len() 
&& written < limit {
+                            let available = self.leftover_data.len() - 
self.leftover_cursor;
+                            let space_left = limit - written;
+                            let to_copy = std::cmp::min(available, space_left);
+
+                            if to_copy > 0 {
+                                
buffer[written..written+to_copy].copy_from_slice(
+                                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                                );
+                                written += to_copy;
+                                self.leftover_cursor += to_copy;
+
+                                if self.leftover_cursor == 
self.leftover_data.len() {
+                                    self.leftover_data.clear();
+                                    self.leftover_cursor = 0;
+                                    break;
+                                }
+                            } else {
+                                break;
+                            }
+
+                            if written >= limit {
+                                return Ok(written);
+                            }
+                        }

Review Comment:
   I think handle leftover here is redundant as we already handle it above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to