JakeDern commented on code in PR #8001: URL: https://github.com/apache/arrow-rs/pull/8001#discussion_r2263733837
########## arrow-ipc/src/writer.rs: ########## @@ -760,34 +812,108 @@ impl DictionaryTracker { /// * If the tracker has not been configured to error on replacement or this dictionary /// has never been seen before, return `Ok(true)` to indicate that the dictionary was just /// inserted. - pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> { - let dict_data = column.to_data(); - let dict_values = &dict_data.child_data()[0]; - - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last) = self.written.get(&dict_id) { - if ArrayData::ptr_eq(&last.child_data()[0], dict_values) { - // Same dictionary values => no need to emit it again - return Ok(false); + pub fn insert( + &mut self, + dict_id: i64, + column: &ArrayRef, + compute_delta: bool, + ) -> Result<DictionaryUpdate, ArrowError> { + let new_data = column.to_data(); + let new_values = &new_data.child_data()[0]; + + // If there is no existing dictionary with this ID, we always insert + let Some(old) = self.written.get(&dict_id) else { + self.written.insert(dict_id, new_data); + return Ok(DictionaryUpdate::New); + }; + + // Fast path - If the array data points to the same buffer as the + // existing then they're the same. + let old_values = &old.child_data()[0]; + if ArrayData::ptr_eq(old_values, new_values) { + return Ok(DictionaryUpdate::None); + } + + // Slow path - Compare the dictionaries value by value + let comparison = compare_dictionaries(old_values, new_values); + if matches!(comparison, DictionaryComparison::Equal) { + return Ok(DictionaryUpdate::None); + } + + const REPLACEMENT_ERROR: &str = + "Dictionary replacement detected when writing IPC file format. \ + Arrow IPC files only support a single dictionary for a given field \ + across all batches."; + + match comparison { + DictionaryComparison::NotEqual => { + if self.error_on_replacement { + return Err(ArrowError::InvalidArgumentError( + REPLACEMENT_ERROR.to_string(), + )); + } + + self.written.insert(dict_id, new_data); + Ok(DictionaryUpdate::Replaced) } - if self.error_on_replacement { - // If error on replacement perform a logical comparison - if last.child_data()[0] == *dict_values { - // Same dictionary values => no need to emit it again - return Ok(false); + DictionaryComparison::Delta => { + if compute_delta { + let delta = + new_values.slice(old_values.len(), new_values.len() - old_values.len()); + self.written.insert(dict_id, new_data); + Ok(DictionaryUpdate::Delta(delta)) + } else { + if self.error_on_replacement { + return Err(ArrowError::InvalidArgumentError( + REPLACEMENT_ERROR.to_string(), + )); + } + + self.written.insert(dict_id, new_data); + Ok(DictionaryUpdate::Replaced) } - return Err(ArrowError::InvalidArgumentError( - "Dictionary replacement detected when writing IPC file format. \ - Arrow IPC files only support a single dictionary for a given field \ - across all batches." - .to_string(), - )); } + DictionaryComparison::Equal => unreachable!("Already checked equal case"), + } + } +} + +/// Describes how two dictionary arrays compare to each other. +#[derive(Debug, Clone)] +enum DictionaryComparison { + /// Neither a delta, nor an exact match + NotEqual, + /// Exact element-wise match + Equal, + /// The two arrays are dictionary deltas of each other, meaning the first + /// is a prefix of the second. + Delta, +} + +// Compares two dictionaries and returns a [`DictionaryComparison`]. +fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison { + // Check for exact match + let existing_len = old.len(); + let new_len = new.len(); + if existing_len == new_len { + if *old == *new { Review Comment: Would the faster way be to do some direct comparison of the bytes of the underlying array data? Honestly the underlying data model is still a bit confusing to me 😅, so I don't know if that's a reasonable thing to do or not. But happy to take any direction here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org