This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e5650748a2 arrow-ipc: add reset method to DictionaryTracker (#9196)
e5650748a2 is described below
commit e5650748a2150624a53297ea82cec516367769d9
Author: albertlockett <[email protected]>
AuthorDate: Sat Jan 17 11:21:52 2026 -0500
arrow-ipc: add reset method to DictionaryTracker (#9196)
# Which issue does this PR close?
- Closes #9195 .
# Rationale for this change
When creating a new IPC stream, efficiency can be gained by reusing some
of the types involved in building the stream to avoid some allocation
cost. `DictionaryTracker` is a type that's difficult to reuse because
there's no way to reset the internal state.
The linked issue has more discussion of my actual use case.
# What changes are included in this PR?
Adds a method called `reset` to the `DictionaryTracker` so it can be
reused when creating a new IPC stream, avoiding the allocation cost of
creating a new instance.
# Are these changes tested?
Yes I added a test. The test would fail unless this new method is
called.
# Are there any user-facing changes?
New public method on `DictionaryTracker`
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-ipc/src/writer.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 84 insertions(+), 2 deletions(-)
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 86376c8e5e..23217fec6d 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -829,6 +829,7 @@ pub enum DictionaryUpdate {
/// isn't allowed in the `FileWriter`.
#[derive(Debug)]
pub struct DictionaryTracker {
+ // NOTE: When adding fields, update the clear() method accordingly.
written: HashMap<i64, ArrayData>,
dict_ids: Vec<i64>,
error_on_replacement: bool,
@@ -986,6 +987,16 @@ impl DictionaryTracker {
DictionaryComparison::Equal => unreachable!("Already checked equal
case"),
}
}
+
+ /// Clears the state of the dictionary tracker.
+ ///
+ /// This allows the dictionary tracker to be reused for a new IPC stream
while avoiding the
+ /// allocation cost of creating a new instance. This method should not be
called if
+ /// the dictionary tracker will be used to continue writing to an existing
IPC stream.
+ pub fn clear(&mut self) {
+ self.dict_ids.clear();
+ self.written.clear();
+ }
}
/// Describes how two dictionary arrays compare to each other.
@@ -1192,9 +1203,11 @@ impl<W: Write> FileWriter<W> {
let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
let record_batches = fbb.create_vector(&self.record_blocks);
- let mut dictionary_tracker = DictionaryTracker::new(true);
+
+ // dictionaries are already written, so we can reset dictionary
tracker to reuse for schema
+ self.dictionary_tracker.clear();
let schema = IpcSchemaEncoder::new()
- .with_dictionary_tracker(&mut dictionary_tracker)
+ .with_dictionary_tracker(&mut self.dictionary_tracker)
.schema_to_fb_offset(&mut fbb, &self.schema);
let fb_custom_metadata = (!self.custom_metadata.is_empty())
.then(|| crate::convert::metadata_to_fb(&mut fbb,
&self.custom_metadata));
@@ -4174,4 +4187,73 @@ mod tests {
let all_passed = (0..20).all(|_| create_hash() == expected);
assert!(all_passed);
}
+
+ #[test]
+ fn test_dictionary_tracker_reset() {
+ let data_gen = IpcDataGenerator::default();
+ let mut dictionary_tracker = DictionaryTracker::new(false);
+ let writer_options = IpcWriteOptions::default();
+ let mut compression_ctx = CompressionContext::default();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Dictionary(Box::new(DataType::UInt8),
Box::new(DataType::Utf8)),
+ false,
+ )]));
+
+ let mut write_single_batch_stream =
+ |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| ->
Vec<u8> {
+ let mut buffer = Vec::new();
+
+ // create a new IPC stream:
+ let stream_header =
data_gen.schema_to_bytes_with_dictionary_tracker(
+ &schema,
+ dict_tracker,
+ &writer_options,
+ );
+ _ = write_message(&mut buffer, stream_header,
&writer_options).unwrap();
+
+ let (encoded_dicts, encoded_batch) = data_gen
+ .encode(&batch, dict_tracker, &writer_options, &mut
compression_ctx)
+ .unwrap();
+ for encoded_dict in encoded_dicts {
+ _ = write_message(&mut buffer, encoded_dict,
&writer_options).unwrap();
+ }
+ _ = write_message(&mut buffer, encoded_batch,
&writer_options).unwrap();
+
+ buffer
+ };
+
+ let batch1 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(DictionaryArray::new(
+ UInt8Array::from_iter_values([0]),
+ Arc::new(StringArray::from_iter_values(["a"])),
+ ))],
+ )
+ .unwrap();
+ let buffer = write_single_batch_stream(batch1.clone(), &mut
dictionary_tracker);
+
+ // ensure we can read the stream back
+ let mut reader = StreamReader::try_new(Cursor::new(buffer),
None).unwrap();
+ let read_batch = reader.next().unwrap().unwrap();
+ assert_eq!(read_batch, batch1);
+
+ // reset the dictionary tracker so it can be used for next stream
+ dictionary_tracker.clear();
+
+ // now write a 2nd stream and ensure we can also read it:
+ let batch2 = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(DictionaryArray::new(
+ UInt8Array::from_iter_values([0]),
+ Arc::new(StringArray::from_iter_values(["a"])),
+ ))],
+ )
+ .unwrap();
+ let buffer = write_single_batch_stream(batch2.clone(), &mut
dictionary_tracker);
+ let mut reader = StreamReader::try_new(Cursor::new(buffer),
None).unwrap();
+ let read_batch = reader.next().unwrap().unwrap();
+ assert_eq!(read_batch, batch2);
+ }
}