This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e5650748a2 arrow-ipc: add reset method to DictionaryTracker (#9196)
e5650748a2 is described below

commit e5650748a2150624a53297ea82cec516367769d9
Author: albertlockett <[email protected]>
AuthorDate: Sat Jan 17 11:21:52 2026 -0500

    arrow-ipc: add reset method to DictionaryTracker (#9196)
    
    # Which issue does this PR close?
    
    - Closes #9195 .
    
    # Rationale for this change
    When creating a new IPC stream, efficiency can be gained by reusing some
    of the types involved in building the stream to avoid some allocation
    cost. `DictionaryTracker` is a type that's difficult to reuse because
    there's no way to reset the internal state.
    
    The linked issue has more discussion of my actual use case.
    
    # What changes are included in this PR?
    Adds a method called `reset` to the `DictionaryTracker` so it can be
    reused when creating a new IPC stream, avoiding the allocation cost of
    creating a new instance.
    
    # Are these changes tested?
    
    Yes I added a test. The test would fail unless this new method is
    called.
    
    # Are there any user-facing changes?
    
    New public method on `DictionaryTracker`
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-ipc/src/writer.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 84 insertions(+), 2 deletions(-)

diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 86376c8e5e..23217fec6d 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -829,6 +829,7 @@ pub enum DictionaryUpdate {
 /// isn't allowed in the `FileWriter`.
 #[derive(Debug)]
 pub struct DictionaryTracker {
+    // NOTE: When adding fields, update the clear() method accordingly.
     written: HashMap<i64, ArrayData>,
     dict_ids: Vec<i64>,
     error_on_replacement: bool,
@@ -986,6 +987,16 @@ impl DictionaryTracker {
             DictionaryComparison::Equal => unreachable!("Already checked equal 
case"),
         }
     }
+
+    /// Clears the state of the dictionary tracker.
+    ///
+    /// This allows the dictionary tracker to be reused for a new IPC stream 
while avoiding the
+    /// allocation cost of creating a new instance. This method should not be 
called if
+    /// the dictionary tracker will be used to continue writing to an existing 
IPC stream.
+    pub fn clear(&mut self) {
+        self.dict_ids.clear();
+        self.written.clear();
+    }
 }
 
 /// Describes how two dictionary arrays compare to each other.
@@ -1192,9 +1203,11 @@ impl<W: Write> FileWriter<W> {
         let mut fbb = FlatBufferBuilder::new();
         let dictionaries = fbb.create_vector(&self.dictionary_blocks);
         let record_batches = fbb.create_vector(&self.record_blocks);
-        let mut dictionary_tracker = DictionaryTracker::new(true);
+
+        // dictionaries are already written, so we can reset dictionary 
tracker to reuse for schema
+        self.dictionary_tracker.clear();
         let schema = IpcSchemaEncoder::new()
-            .with_dictionary_tracker(&mut dictionary_tracker)
+            .with_dictionary_tracker(&mut self.dictionary_tracker)
             .schema_to_fb_offset(&mut fbb, &self.schema);
         let fb_custom_metadata = (!self.custom_metadata.is_empty())
             .then(|| crate::convert::metadata_to_fb(&mut fbb, 
&self.custom_metadata));
@@ -4174,4 +4187,73 @@ mod tests {
         let all_passed = (0..20).all(|_| create_hash() == expected);
         assert!(all_passed);
     }
+
+    #[test]
+    fn test_dictionary_tracker_reset() {
+        let data_gen = IpcDataGenerator::default();
+        let mut dictionary_tracker = DictionaryTracker::new(false);
+        let writer_options = IpcWriteOptions::default();
+        let mut compression_ctx = CompressionContext::default();
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "a",
+            DataType::Dictionary(Box::new(DataType::UInt8), 
Box::new(DataType::Utf8)),
+            false,
+        )]));
+
+        let mut write_single_batch_stream =
+            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> 
Vec<u8> {
+                let mut buffer = Vec::new();
+
+                // create a new IPC stream:
+                let stream_header = 
data_gen.schema_to_bytes_with_dictionary_tracker(
+                    &schema,
+                    dict_tracker,
+                    &writer_options,
+                );
+                _ = write_message(&mut buffer, stream_header, 
&writer_options).unwrap();
+
+                let (encoded_dicts, encoded_batch) = data_gen
+                    .encode(&batch, dict_tracker, &writer_options, &mut 
compression_ctx)
+                    .unwrap();
+                for encoded_dict in encoded_dicts {
+                    _ = write_message(&mut buffer, encoded_dict, 
&writer_options).unwrap();
+                }
+                _ = write_message(&mut buffer, encoded_batch, 
&writer_options).unwrap();
+
+                buffer
+            };
+
+        let batch1 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(DictionaryArray::new(
+                UInt8Array::from_iter_values([0]),
+                Arc::new(StringArray::from_iter_values(["a"])),
+            ))],
+        )
+        .unwrap();
+        let buffer = write_single_batch_stream(batch1.clone(), &mut 
dictionary_tracker);
+
+        // ensure we can read the stream back
+        let mut reader = StreamReader::try_new(Cursor::new(buffer), 
None).unwrap();
+        let read_batch = reader.next().unwrap().unwrap();
+        assert_eq!(read_batch, batch1);
+
+        // reset the dictionary tracker so it can be used for next stream
+        dictionary_tracker.clear();
+
+        // now write a 2nd stream and ensure we can also read it:
+        let batch2 = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(DictionaryArray::new(
+                UInt8Array::from_iter_values([0]),
+                Arc::new(StringArray::from_iter_values(["a"])),
+            ))],
+        )
+        .unwrap();
+        let buffer = write_single_batch_stream(batch2.clone(), &mut 
dictionary_tracker);
+        let mut reader = StreamReader::try_new(Cursor::new(buffer), 
None).unwrap();
+        let read_batch = reader.next().unwrap().unwrap();
+        assert_eq!(read_batch, batch2);
+    }
 }

Reply via email to