alamb commented on code in PR #8001: URL: https://github.com/apache/arrow-rs/pull/8001#discussion_r2263151212
########## arrow-array/src/builder/generic_bytes_dictionary_builder.rs: ########## @@ -463,6 +463,38 @@ where DictionaryArray::from(unsafe { builder.build_unchecked() }) } + /// Builds the `DictionaryArray` without resetting the values builder or Review Comment: thank you -- this comment is quite clear ########## arrow-ipc/src/tests/delta_dictionary.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + reader::IpcMessage, + writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}, +}; +use crate::{ + reader::{FileReader, StreamReader}, + writer::FileWriter, +}; +use arrow_array::{ + builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray, + RecordBatch, StringArray, +}; +use arrow_schema::{DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_mixed_delta() { Review Comment: What is it about this test that can't be put into the normal integration test? It looks like it is something we should be able to do via the public API ########## arrow-ipc/src/tests/delta_dictionary.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + reader::IpcMessage, + writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}, +}; +use crate::{ + reader::{FileReader, StreamReader}, + writer::FileWriter, +}; +use arrow_array::{ + builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray, + RecordBatch, StringArray, +}; +use arrow_schema::{DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_mixed_delta() { + let batches: &[&[&str]] = &[ + &["A"], + &["A", "B"], + &["C"], + &["D", "E"], + &["A", "B", "C", "D", "E"], + ]; + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, Review Comment: I wonder if it would be valuable to also add the expected dictionary contents here like ```suggestion (MessageType::Dict , ["A"]), ``` 🤔 ########## arrow-ipc/src/tests/delta_dictionary.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + reader::IpcMessage, + writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}, +}; +use crate::{ + reader::{FileReader, StreamReader}, + writer::FileWriter, +}; +use arrow_array::{ + builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray, + RecordBatch, StringArray, +}; +use arrow_schema::{DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_mixed_delta() { + let batches: &[&[&str]] = &[ + &["A"], + &["A", "B"], + &["C"], + &["D", "E"], + &["A", "B", "C", "D", "E"], + ]; + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_disjoint_delta() { + let batches: &[&[&str]] = &[&["A"], &["B"], &["C", "E"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_increasing_delta() { + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_single_delta() { + let batches: &[&[&str]] = &[&["A", "B", "C"], &["D"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_single_same_value_sequence() { + let batches: &[&[&str]] = &[&["A"], &["A"], &["A"], &["A"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_multi_same_value_sequence() { + let batches: &[&[&str]] = &[&["A", "B", "C"], &["A", "B", "C"]]; + run_delta_sequence_test(batches, &[MessageType::Dict, MessageType::RecordBatch]); +} + Review Comment: 👏 -- this is a very nice set of tests The only other thing I can think of would be to test sending a dictionary array with zero rows ########## arrow-ipc/tests/test_delta_dictionary.rs: ########## @@ -0,0 +1,549 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + builder::{ListBuilder, PrimitiveDictionaryBuilder, StringDictionaryBuilder}, + Array, ArrayRef, DictionaryArray, RecordBatch, StringArray, +}; +use arrow_ipc::reader::StreamReader; +use arrow_ipc::writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}; +use arrow_schema::{ArrowError, DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_dictionary_handling_option() { + // Test that DictionaryHandling can be set + let _options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + // Verify it was set (we can't access private field directly) + // This test just verifies the API exists +} + +#[test] +fn test_nested_dictionary_with_delta() -> Result<(), ArrowError> { + // Test writing nested dictionaries with delta option + // Create a simple nested structure for testing + + // Create dictionary arrays + let mut dict_builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new(); + dict_builder.append_value("hello"); + dict_builder.append_value("world"); + let dict_array = dict_builder.finish(); + + // Create a list of dictionaries + let mut list_builder = + ListBuilder::new(StringDictionaryBuilder::<arrow_array::types::Int32Type>::new()); + list_builder.values().append_value("item1"); + list_builder.values().append_value("item2"); + list_builder.append(true); + list_builder.values().append_value("item3"); + list_builder.append(true); + let list_array = list_builder.finish(); + + // Create schema with nested dictionaries + let schema = Arc::new(Schema::new(vec![ + Field::new("dict", dict_array.data_type().clone(), true), + Field::new("list_of_dict", list_array.data_type().clone(), true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(dict_array) as ArrayRef, + Arc::new(list_array) as ArrayRef, + ], + )?; + + // Write with delta dictionary handling + let mut buffer = Vec::new(); + { + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + let mut writer = StreamWriter::try_new_with_options(&mut buffer, &schema, options)?; + writer.write(&batch)?; + writer.finish()?; + } + + // Verify it writes without error + assert!(!buffer.is_empty()); + + Ok(()) +} + +#[test] +fn test_read_delta_dictionary_error() -> Result<(), ArrowError> { + // This test verifies that reading delta dictionaries returns appropriate error + // until the feature is fully implemented + + // Create a dictionary array + let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new(); + builder.append_value("test"); + let array = builder.finish(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "dict", + array.data_type().clone(), + true, + )])); + + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array) as ArrayRef])?; + + // Write normally (not delta) + let mut buffer = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema)?; + writer.write(&batch)?; + writer.finish()?; + } + + // Reading should work for non-delta dictionaries + let reader = StreamReader::try_new(Cursor::new(buffer), None)?; + let batches: Result<Vec<_>, _> = reader.collect(); + assert!(batches.is_ok()); Review Comment: can we also assert the batches made it through successfully (aka that `batches == [batch]`)? ########## arrow-ipc/src/tests/delta_dictionary.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + reader::IpcMessage, + writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}, +}; +use crate::{ + reader::{FileReader, StreamReader}, + writer::FileWriter, +}; +use arrow_array::{ + builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray, + RecordBatch, StringArray, +}; +use arrow_schema::{DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_mixed_delta() { + let batches: &[&[&str]] = &[ + &["A"], + &["A", "B"], + &["C"], + &["D", "E"], + &["A", "B", "C", "D", "E"], + ]; + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_disjoint_delta() { + let batches: &[&[&str]] = &[&["A"], &["B"], &["C", "E"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_increasing_delta() { + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_single_delta() { + let batches: &[&[&str]] = &[&["A", "B", "C"], &["D"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_single_same_value_sequence() { + let batches: &[&[&str]] = &[&["A"], &["A"], &["A"], &["A"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_multi_same_value_sequence() { + let batches: &[&[&str]] = &[&["A", "B", "C"], &["A", "B", "C"]]; + run_delta_sequence_test(batches, &[MessageType::Dict, MessageType::RecordBatch]); +} + +#[derive(Debug, PartialEq)] +enum MessageType { + Schema, + Dict, + DeltaDict, + RecordBatch, +} + +impl From<&IpcMessage> for MessageType { + fn from(value: &IpcMessage) -> Self { + match value { + IpcMessage::Schema(_) => MessageType::Schema, + IpcMessage::DictionaryBatch { + id: _, + is_delta, + values: _, + } => match is_delta { + true => MessageType::DeltaDict, + false => MessageType::Dict, + }, + IpcMessage::RecordBatch(_) => MessageType::RecordBatch, + } + } +} + +fn run_resend_sequence_test(batches: &[&[&str]], sequence: &[MessageType]) { + let opts = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Resend); + run_sequence_test(batches, sequence, opts); +} + +fn run_delta_sequence_test(batches: &[&[&str]], sequence: &[MessageType]) { + let opts = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + run_sequence_test(batches, sequence, opts); +} + +fn run_sequence_test(batches: &[&[&str]], sequence: &[MessageType], options: IpcWriteOptions) { + let stream_buf = write_all_to_stream(options.clone(), batches); + let ipc_stream = get_ipc_message_stream(stream_buf); + for (message, expected_type) in ipc_stream.iter().zip(sequence.iter()) { + let actual_type: MessageType = message.into(); + assert_eq!(actual_type, *expected_type, "Message type mismatch"); + } +} + +fn get_ipc_message_stream(buf: Vec<u8>) -> Vec<IpcMessage> { + let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap(); + let mut results = vec![]; + + loop { + match reader.next_ipc_message() { + Ok(Some(message)) => results.push(message), + Ok(None) => break, // End of stream + Err(e) => panic!("Error reading IPC message: {:?}", e), + } + } + + results +} + +#[test] +fn test_replace_same_length() { + let batches: &[&[&str]] = &[ + &["A", "B", "C", "D", "E", "F"], + &["A", "G", "H", "I", "J", "K"], + ]; + run_parity_test(batches); +} + +#[test] +fn test_sparse_deltas() { + let batches: &[&[&str]] = &[ + &["A"], + &["C"], + &["E", "F", "D"], + &["FOO"], + &["parquet", "B"], + &["123", "B", "C"], + ]; + run_parity_test(batches); +} + +#[test] +fn test_deltas_with_reset() { + // Dictionary resets at ["C", "D"] + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A", "B", "C", "D"]]; + run_parity_test(batches); +} + +/// FileWriter can only tolerate very specific patterns of delta dictionaries, +/// because the dictionary cannot be replaced/reset. +#[test] +fn test_deltas_with_file() { + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"], &["A", "B", "C", "D"]]; + run_parity_test(batches); +} + +/// Encode all batches three times and compare all three for the same results +/// on the other end. +/// +/// - Stream encoding with delta +/// - Stream encoding without delta +/// - File encoding with delta (File format does not allow replacement +/// dictionaries) +fn run_parity_test(batches: &[&[&str]]) { + let delta_options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + let delta_stream_buf = write_all_to_stream(delta_options.clone(), batches); + + let resend_options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Resend); + let resend_stream_buf = write_all_to_stream(resend_options.clone(), batches); + + let delta_file_buf = write_all_to_file(delta_options, batches); + + let mut streams = vec![ + get_stream_batches(delta_stream_buf), + get_stream_batches(resend_stream_buf), + get_file_batches(delta_file_buf), + ]; + + let (first_stream, other_streams) = streams.split_first_mut().unwrap(); + + let mut idx = 0; + while let Some(batch) = first_stream.next() { + let first_dict = extract_dictionary(batch); + let expected_values = batches[idx]; + assert_eq!(expected_values, &dict_to_vec(first_dict.clone())); + + for stream in other_streams.iter_mut() { + let next_batch = stream + .next() + .expect("All streams should yield same number of elements"); + let next_dict = extract_dictionary(next_batch); + assert_eq!(expected_values, &dict_to_vec(next_dict.clone())); + assert_eq!(first_dict, next_dict); + } + + idx += 1; + } + + for stream in other_streams.iter_mut() { + assert!( + stream.next().is_none(), + "All streams should yield same number of elements" + ); + } +} + +fn dict_to_vec(dict: DictionaryArray<Int32Type>) -> Vec<String> { + let values: Vec<String> = dict + .values() + .as_any() + .downcast_ref::<StringArray>() + .unwrap() + .iter() + .map(|v| v.map(|s| s.to_string()).unwrap_or_default()) + .collect(); + + dict.keys() + .iter() + .map(|i| values[i.unwrap() as usize].clone()) + .collect() +} + +fn get_stream_batches(buf: Vec<u8>) -> Box<dyn Iterator<Item = RecordBatch>> { + let reader = StreamReader::try_new(Cursor::new(buf), None).unwrap(); + Box::new( + reader + .collect::<Vec<Result<_, _>>>() + .into_iter() + .map(|r| r.unwrap()), + ) +} + +fn get_file_batches(buf: Vec<u8>) -> Box<dyn Iterator<Item = RecordBatch>> { + let reader = FileReader::try_new(Cursor::new(buf), None).unwrap(); + Box::new( + reader + .collect::<Vec<Result<_, _>>>() + .into_iter() + .map(|r| r.unwrap()), + ) +} + +fn extract_dictionary(batch: RecordBatch) -> DictionaryArray<arrow_array::types::Int32Type> { + batch + .column(0) + .as_any() + .downcast_ref::<DictionaryArray<arrow_array::types::Int32Type>>() + .unwrap() + .clone() +} + +fn write_all_to_file(options: IpcWriteOptions, vals: &[&[&str]]) -> Vec<u8> { + let batches = build_batches(vals); + let mut buf: Vec<u8> = Vec::new(); + let mut writer = + FileWriter::try_new_with_options(&mut buf, &batches[0].schema(), options).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + buf +} + +fn write_all_to_stream(options: IpcWriteOptions, vals: &[&[&str]]) -> Vec<u8> { + let batches = build_batches(vals); + + let mut buf: Vec<u8> = Vec::new(); + let mut writer = + StreamWriter::try_new_with_options(&mut buf, &batches[0].schema(), options).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } + + writer.finish().unwrap(); + + buf +} + +fn build_batches(vals: &[&[&str]]) -> Vec<RecordBatch> { + let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new(); + vals.iter().map(|v| build_batch(v, &mut builder)).collect() +} + +fn build_batch( + vals: &[&str], + builder: &mut StringDictionaryBuilder<arrow_array::types::Int32Type>, +) -> RecordBatch { + for &val in vals { + if val.is_empty() { + builder.append_null(); + } else { + builder.append_value(val); + } + } + let array = builder.finish_preserve_values(); Review Comment: I think this new API is quite neat, but it may not be obvious to people how to use the API to build dictionary arrays that can be efficiently sent over IPC with delta dictionaries Would you be willing to add a second example to the StreamWriter? https://docs.rs/arrow-ipc/56.0.0/arrow_ipc/writer/struct.StreamWriter.html#example Basically it would show using a dictionary builder, call `finish_preserve_values` and then send a second batch This is not required for this PR -- we can do it as a follow on too ########## arrow-ipc/src/writer.rs: ########## @@ -760,34 +812,108 @@ impl DictionaryTracker { /// * If the tracker has not been configured to error on replacement or this dictionary /// has never been seen before, return `Ok(true)` to indicate that the dictionary was just /// inserted. - pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> { - let dict_data = column.to_data(); - let dict_values = &dict_data.child_data()[0]; - - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last) = self.written.get(&dict_id) { - if ArrayData::ptr_eq(&last.child_data()[0], dict_values) { - // Same dictionary values => no need to emit it again - return Ok(false); + pub fn insert( + &mut self, + dict_id: i64, + column: &ArrayRef, + compute_delta: bool, + ) -> Result<DictionaryUpdate, ArrowError> { + let new_data = column.to_data(); + let new_values = &new_data.child_data()[0]; + + // If there is no existing dictionary with this ID, we always insert + let Some(old) = self.written.get(&dict_id) else { + self.written.insert(dict_id, new_data); + return Ok(DictionaryUpdate::New); + }; + + // Fast path - If the array data points to the same buffer as the + // existing then they're the same. + let old_values = &old.child_data()[0]; + if ArrayData::ptr_eq(old_values, new_values) { + return Ok(DictionaryUpdate::None); + } + + // Slow path - Compare the dictionaries value by value + let comparison = compare_dictionaries(old_values, new_values); + if matches!(comparison, DictionaryComparison::Equal) { + return Ok(DictionaryUpdate::None); + } + + const REPLACEMENT_ERROR: &str = + "Dictionary replacement detected when writing IPC file format. \ + Arrow IPC files only support a single dictionary for a given field \ + across all batches."; + + match comparison { + DictionaryComparison::NotEqual => { + if self.error_on_replacement { + return Err(ArrowError::InvalidArgumentError( + REPLACEMENT_ERROR.to_string(), + )); + } + + self.written.insert(dict_id, new_data); + Ok(DictionaryUpdate::Replaced) } - if self.error_on_replacement { - // If error on replacement perform a logical comparison - if last.child_data()[0] == *dict_values { - // Same dictionary values => no need to emit it again - return Ok(false); + DictionaryComparison::Delta => { + if compute_delta { + let delta = + new_values.slice(old_values.len(), new_values.len() - old_values.len()); + self.written.insert(dict_id, new_data); + Ok(DictionaryUpdate::Delta(delta)) + } else { + if self.error_on_replacement { + return Err(ArrowError::InvalidArgumentError( + REPLACEMENT_ERROR.to_string(), + )); + } + + self.written.insert(dict_id, new_data); + Ok(DictionaryUpdate::Replaced) } - return Err(ArrowError::InvalidArgumentError( - "Dictionary replacement detected when writing IPC file format. \ - Arrow IPC files only support a single dictionary for a given field \ - across all batches." - .to_string(), - )); } + DictionaryComparison::Equal => unreachable!("Already checked equal case"), + } + } +} + +/// Describes how two dictionary arrays compare to each other. +#[derive(Debug, Clone)] +enum DictionaryComparison { + /// Neither a delta, nor an exact match + NotEqual, + /// Exact element-wise match + Equal, + /// The two arrays are dictionary deltas of each other, meaning the first + /// is a prefix of the second. + Delta, +} + +// Compares two dictionaries and returns a [`DictionaryComparison`]. +fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison { + // Check for exact match + let existing_len = old.len(); + let new_len = new.len(); + if existing_len == new_len { + if *old == *new { Review Comment: this compares the entire array element by element which could be slow, however it appears to be what the current code does too so if it turns out to be too slow we can potentially make it faster in the future ########## arrow-ipc/src/writer.rs: ########## @@ -363,21 +374,32 @@ impl IpcDataGenerator { dict_id_seq, )?; - // It's importnat to only take the dict_id at this point, because the dict ID + // It's important to only take the dict_id at this point, because the dict ID // sequence is assigned depth-first, so we need to first encode children and have // them take their assigned dict IDs before we take the dict ID for this field. let dict_id = dict_id_seq.next().ok_or_else(|| { ArrowError::IpcError(format!("no dict id for field {}", field.name())) })?; - let emit = dictionary_tracker.insert(dict_id, column)?; - - if emit { - encoded_dictionaries.push(self.dictionary_batch_to_bytes( - dict_id, - dict_values, - write_options, - )?); + let compute_delta = write_options.dictionary_handling == DictionaryHandling::Delta; + match dictionary_tracker.insert(dict_id, column, compute_delta)? { Review Comment: I wonder if the API would be clearer if you just passed DictionaryHandling directly into the DictionaryTracker (rather than a flag) ########## arrow-ipc/src/writer.rs: ########## @@ -760,34 +812,108 @@ impl DictionaryTracker { /// * If the tracker has not been configured to error on replacement or this dictionary /// has never been seen before, return `Ok(true)` to indicate that the dictionary was just /// inserted. - pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> { - let dict_data = column.to_data(); - let dict_values = &dict_data.child_data()[0]; - - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last) = self.written.get(&dict_id) { - if ArrayData::ptr_eq(&last.child_data()[0], dict_values) { - // Same dictionary values => no need to emit it again - return Ok(false); + pub fn insert( Review Comment: I think this is a change to the [public API](https://docs.rs/arrow-ipc/56.0.0/arrow_ipc/writer/struct.DictionaryTracker.html#method.insert) and thus if we make this change we will have to wait until the next major release to merge this PR (oct 2025) - https://github.com/apache/arrow-rs/issues/7835 An alternative approach would be to add a new method and deprecate the old one as described here: https://github.com/apache/arrow-rs?tab=readme-ov-file#deprecation-guidelines So something like ```rust // deprecated, use insert_column instead pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> { // call new API let compute_deltas = false; self.insert_column(dict_id, column, compute_deltas) } // new API pub fn insert_column( &mut self, dict_id: i64, column: &ArrayRef, compute_delta: bool, ) -> Result<DictionaryUpdate, ArrowError> { ... ``` ########## arrow-ipc/src/tests/delta_dictionary.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + reader::IpcMessage, + writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}, +}; +use crate::{ + reader::{FileReader, StreamReader}, + writer::FileWriter, +}; +use arrow_array::{ + builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray, + RecordBatch, StringArray, +}; +use arrow_schema::{DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_mixed_delta() { + let batches: &[&[&str]] = &[ + &["A"], + &["A", "B"], + &["C"], + &["D", "E"], + &["A", "B", "C", "D", "E"], + ]; + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_disjoint_delta() { + let batches: &[&[&str]] = &[&["A"], &["B"], &["C", "E"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_increasing_delta() { + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_single_delta() { + let batches: &[&[&str]] = &[&["A", "B", "C"], &["D"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::DeltaDict, + MessageType::RecordBatch, + ], + ); + + run_resend_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::Dict, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_single_same_value_sequence() { + let batches: &[&[&str]] = &[&["A"], &["A"], &["A"], &["A"]]; + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); + + run_delta_sequence_test( + batches, + &[ + MessageType::Dict, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + MessageType::RecordBatch, + ], + ); +} + +#[test] +fn test_multi_same_value_sequence() { + let batches: &[&[&str]] = &[&["A", "B", "C"], &["A", "B", "C"]]; + run_delta_sequence_test(batches, &[MessageType::Dict, MessageType::RecordBatch]); +} + +#[derive(Debug, PartialEq)] +enum MessageType { + Schema, + Dict, + DeltaDict, + RecordBatch, +} + +impl From<&IpcMessage> for MessageType { + fn from(value: &IpcMessage) -> Self { + match value { + IpcMessage::Schema(_) => MessageType::Schema, + IpcMessage::DictionaryBatch { + id: _, + is_delta, + values: _, + } => match is_delta { + true => MessageType::DeltaDict, + false => MessageType::Dict, + }, + IpcMessage::RecordBatch(_) => MessageType::RecordBatch, + } + } +} + +fn run_resend_sequence_test(batches: &[&[&str]], sequence: &[MessageType]) { + let opts = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Resend); + run_sequence_test(batches, sequence, opts); +} + +fn run_delta_sequence_test(batches: &[&[&str]], sequence: &[MessageType]) { + let opts = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + run_sequence_test(batches, sequence, opts); +} + +fn run_sequence_test(batches: &[&[&str]], sequence: &[MessageType], options: IpcWriteOptions) { + let stream_buf = write_all_to_stream(options.clone(), batches); + let ipc_stream = get_ipc_message_stream(stream_buf); + for (message, expected_type) in ipc_stream.iter().zip(sequence.iter()) { + let actual_type: MessageType = message.into(); + assert_eq!(actual_type, *expected_type, "Message type mismatch"); + } +} + +fn get_ipc_message_stream(buf: Vec<u8>) -> Vec<IpcMessage> { + let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap(); + let mut results = vec![]; + + loop { + match reader.next_ipc_message() { + Ok(Some(message)) => results.push(message), + Ok(None) => break, // End of stream + Err(e) => panic!("Error reading IPC message: {:?}", e), + } + } + + results +} + +#[test] +fn test_replace_same_length() { + let batches: &[&[&str]] = &[ + &["A", "B", "C", "D", "E", "F"], + &["A", "G", "H", "I", "J", "K"], + ]; + run_parity_test(batches); +} + +#[test] +fn test_sparse_deltas() { + let batches: &[&[&str]] = &[ + &["A"], + &["C"], + &["E", "F", "D"], + &["FOO"], + &["parquet", "B"], + &["123", "B", "C"], + ]; + run_parity_test(batches); +} + +#[test] +fn test_deltas_with_reset() { + // Dictionary resets at ["C", "D"] + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A", "B", "C", "D"]]; + run_parity_test(batches); +} + +/// FileWriter can only tolerate very specific patterns of delta dictionaries, +/// because the dictionary cannot be replaced/reset. +#[test] +fn test_deltas_with_file() { + let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["A", "B", "C"], &["A", "B", "C", "D"]]; + run_parity_test(batches); +} + +/// Encode all batches three times and compare all three for the same results Review Comment: 😍 ########## arrow-ipc/tests/test_delta_dictionary.rs: ########## @@ -0,0 +1,549 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + builder::{ListBuilder, PrimitiveDictionaryBuilder, StringDictionaryBuilder}, + Array, ArrayRef, DictionaryArray, RecordBatch, StringArray, +}; +use arrow_ipc::reader::StreamReader; +use arrow_ipc::writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}; +use arrow_schema::{ArrowError, DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_dictionary_handling_option() { + // Test that DictionaryHandling can be set + let _options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + // Verify it was set (we can't access private field directly) + // This test just verifies the API exists +} + +#[test] +fn test_nested_dictionary_with_delta() -> Result<(), ArrowError> { + // Test writing nested dictionaries with delta option + // Create a simple nested structure for testing + + // Create dictionary arrays + let mut dict_builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new(); + dict_builder.append_value("hello"); + dict_builder.append_value("world"); + let dict_array = dict_builder.finish(); + + // Create a list of dictionaries + let mut list_builder = + ListBuilder::new(StringDictionaryBuilder::<arrow_array::types::Int32Type>::new()); + list_builder.values().append_value("item1"); + list_builder.values().append_value("item2"); + list_builder.append(true); + list_builder.values().append_value("item3"); + list_builder.append(true); + let list_array = list_builder.finish(); + + // Create schema with nested dictionaries + let schema = Arc::new(Schema::new(vec![ + Field::new("dict", dict_array.data_type().clone(), true), + Field::new("list_of_dict", list_array.data_type().clone(), true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(dict_array) as ArrayRef, + Arc::new(list_array) as ArrayRef, + ], + )?; + + // Write with delta dictionary handling + let mut buffer = Vec::new(); + { + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + let mut writer = StreamWriter::try_new_with_options(&mut buffer, &schema, options)?; + writer.write(&batch)?; + writer.finish()?; + } + + // Verify it writes without error + assert!(!buffer.is_empty()); Review Comment: I think we should verify the data can be read back without error as well ########## arrow-ipc/tests/test_delta_dictionary.rs: ########## @@ -0,0 +1,549 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + builder::{ListBuilder, PrimitiveDictionaryBuilder, StringDictionaryBuilder}, + Array, ArrayRef, DictionaryArray, RecordBatch, StringArray, +}; +use arrow_ipc::reader::StreamReader; +use arrow_ipc::writer::{DictionaryHandling, IpcWriteOptions, StreamWriter}; +use arrow_schema::{ArrowError, DataType, Field, Schema}; +use std::io::Cursor; +use std::sync::Arc; + +#[test] +fn test_dictionary_handling_option() { + // Test that DictionaryHandling can be set + let _options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + // Verify it was set (we can't access private field directly) Review Comment: strictly speaking the other tests cover using the option / that it exists so this one is redundant. However, I don't think it is a problem to leave it in -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org