alamb commented on code in PR #8001:
URL: https://github.com/apache/arrow-rs/pull/8001#discussion_r2270742132


##########
arrow-ipc/src/reader.rs:
##########
@@ -25,9 +25,10 @@
 //! [`Seek`]: std::io::Seek
 
 mod stream;
-
 pub use stream::*;
 
+use arrow_select::concat;

Review Comment:
   yeah, this is unfortunate -- that to get concat we need to get to add this 
dependency, but I don't really see a way around it



##########
arrow-ipc/src/reader.rs:
##########
@@ -1597,6 +1660,122 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
     }
 }
 
+/// Representation of a fully parsed IpcMessage from the underlying stream.
+/// Parsing this kind of message is done by higher level constructs such as
+/// [`StreamReader`], because fully interpreting the messages into a record
+/// batch or dictionary batch requires access to stream state such as schema
+/// and the full dictionary cache.
+#[derive(Debug)]
+#[allow(dead_code)]

Review Comment:
   Why is this dead code?  Maybe it should be `#cfg(test)` instead?



##########
arrow-ipc/src/writer.rs:
##########
@@ -760,34 +812,108 @@ impl DictionaryTracker {
     /// * If the tracker has not been configured to error on replacement or 
this dictionary
     ///   has never been seen before, return `Ok(true)` to indicate that the 
dictionary was just
     ///   inserted.
-    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, 
ArrowError> {
-        let dict_data = column.to_data();
-        let dict_values = &dict_data.child_data()[0];
-
-        // If a dictionary with this id was already emitted, check if it was 
the same.
-        if let Some(last) = self.written.get(&dict_id) {
-            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
-                // Same dictionary values => no need to emit it again
-                return Ok(false);
+    pub fn insert(
+        &mut self,
+        dict_id: i64,
+        column: &ArrayRef,
+        compute_delta: bool,
+    ) -> Result<DictionaryUpdate, ArrowError> {
+        let new_data = column.to_data();
+        let new_values = &new_data.child_data()[0];
+
+        // If there is no existing dictionary with this ID, we always insert
+        let Some(old) = self.written.get(&dict_id) else {
+            self.written.insert(dict_id, new_data);
+            return Ok(DictionaryUpdate::New);
+        };
+
+        // Fast path - If the array data points to the same buffer as the
+        // existing then they're the same.
+        let old_values = &old.child_data()[0];
+        if ArrayData::ptr_eq(old_values, new_values) {
+            return Ok(DictionaryUpdate::None);
+        }
+
+        // Slow path - Compare the dictionaries value by value
+        let comparison = compare_dictionaries(old_values, new_values);
+        if matches!(comparison, DictionaryComparison::Equal) {
+            return Ok(DictionaryUpdate::None);
+        }
+
+        const REPLACEMENT_ERROR: &str =
+            "Dictionary replacement detected when writing IPC file format. \
+                 Arrow IPC files only support a single dictionary for a given 
field \
+                 across all batches.";
+
+        match comparison {
+            DictionaryComparison::NotEqual => {
+                if self.error_on_replacement {
+                    return Err(ArrowError::InvalidArgumentError(
+                        REPLACEMENT_ERROR.to_string(),
+                    ));
+                }
+
+                self.written.insert(dict_id, new_data);
+                Ok(DictionaryUpdate::Replaced)
             }
-            if self.error_on_replacement {
-                // If error on replacement perform a logical comparison
-                if last.child_data()[0] == *dict_values {
-                    // Same dictionary values => no need to emit it again
-                    return Ok(false);
+            DictionaryComparison::Delta => {
+                if compute_delta {
+                    let delta =
+                        new_values.slice(old_values.len(), new_values.len() - 
old_values.len());
+                    self.written.insert(dict_id, new_data);
+                    Ok(DictionaryUpdate::Delta(delta))
+                } else {
+                    if self.error_on_replacement {
+                        return Err(ArrowError::InvalidArgumentError(
+                            REPLACEMENT_ERROR.to_string(),
+                        ));
+                    }
+
+                    self.written.insert(dict_id, new_data);
+                    Ok(DictionaryUpdate::Replaced)
                 }
-                return Err(ArrowError::InvalidArgumentError(
-                    "Dictionary replacement detected when writing IPC file 
format. \
-                     Arrow IPC files only support a single dictionary for a 
given field \
-                     across all batches."
-                        .to_string(),
-                ));
             }
+            DictionaryComparison::Equal => unreachable!("Already checked equal 
case"),
+        }
+    }
+}
+
+/// Describes how two dictionary arrays compare to each other.
+#[derive(Debug, Clone)]
+enum DictionaryComparison {
+    /// Neither a delta, nor an exact match
+    NotEqual,
+    /// Exact element-wise match
+    Equal,
+    /// The two arrays are dictionary deltas of each other, meaning the first
+    /// is a prefix of the second.
+    Delta,
+}
+
+// Compares two dictionaries and returns a [`DictionaryComparison`].
+fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> 
DictionaryComparison {
+    // Check for exact match
+    let existing_len = old.len();
+    let new_len = new.len();
+    if existing_len == new_len {
+        if *old == *new {

Review Comment:
   I am not sure now to be honest 😬 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to