JakeDern commented on PR #8001: URL: https://github.com/apache/arrow-rs/pull/8001#issuecomment-3134275403
@asubiotto @alamb I have an initial implementation of both writer and reader ready for some feedback. Thanks @asubiotto for the starter code, some of the additional tests were especially helpful! Something I'm curious to get thoughts on after digging through both the Go and Rust writer code is that the ability for the ipc writers in the rust code to produce delta dictionaries is very limited. Both the ipc writers in Go and Rust look at dictionaries on incoming RecordBatches one at a time, doing a comparison of the new dictionary with the old in order to determine if the new is a superset of old. I thought this was odd because the following test I wrote in Rust produces 0 delta dictionaries whereas I thought it would produce 2: ```rust #[test] fn test_deltas() { // Dictionary resets at ["C", "D"] let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A", "B", "E"]]; run_test(batches, false); } ``` However since the writer only looks at the dictionary values of the last RecordBatch, it has to completely reset when `&["C", "D"]`. comes in and similarly again when `&["A", "B", "E"]` comes in. This was inconsistent with what I'd seen in Go where similar code _did_ produce two delta dictionaries, despite seeming to follow the same/similar algorithm. So I dug a bit more and wrote some Go code with two different writers and then read it from rust. Batches 0, 2, and 3 are written using one dictionary builder and batch 1 is written using another: ```go dictType := &arrow.DictionaryType{ IndexType: arrow.PrimitiveTypes.Int16, ValueType: arrow.BinaryTypes.String, Ordered: false, } schema := arrow.NewSchema([]arrow.Field{ {Name: "foo", Type: dictType}, }, nil) buf := bytes.NewBuffer([]byte{}) writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithDictionaryDeltas(true)) allocator := memory.NewGoAllocator() dict_builder := array.NewDictionaryBuilder(allocator, dictType) builder := array.NewStringBuilder(allocator) builder.AppendStringValues([]string{"A", "B", "C"}, []bool{}) dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data())) record := array.NewRecord(schema, []arrow.Array{ dict_builder.NewArray(), }, 3) if err := writer.Write(record); err != nil { panic(err) } // Reset builder dict_builder2 := array.NewDictionaryBuilder(allocator, dictType) builder.AppendStringValues([]string{"A", "B", "D"}, []bool{}) dict_builder2.AppendArray(array.NewStringData(builder.NewArray().Data())) record2 := array.NewRecord(schema, []arrow.Array{ dict_builder2.NewArray(), }, 3) if err := writer.Write(record2); err != nil { panic(err) } builder.AppendStringValues([]string{"A", "B", "E"}, []bool{}) dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data())) record3 := array.NewRecord(schema, []arrow.Array{ dict_builder.NewArray(), }, 3) if err := writer.Write(record3); err != nil { panic(err) } builder.AppendStringValues([]string{"A", "B", "D"}, []bool{}) dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data())) record4 := array.NewRecord(schema, []arrow.Array{ dict_builder.NewArray(), }, 3) if err := writer.Write(record4); err != nil { panic(err) } // write buf out to ~/delta_test/delta.arrow if err := os.WriteFile("/home/jakedern/delta_test/delta2.arrow", buf.Bytes(), 0644); err != nil { panic(fmt.Errorf("failed to write delta file: %w", err)) } ``` ```rust [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false [arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray [ "A", "B", "C", ] [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false [arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray [ "A", "B", "D", ] [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false [arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray [ "A", "B", "C", "E", ] [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = true [arrow-ipc/src/reader.rs:725:5] &dictionary_values = StringArray [ "D", ] ``` What we see is that writing a RecordBatch from a different builder basically resets the `ipc.Writer` because it doesn't know about values from the first builder. The insight that I missed the first time is that in Go, there is a [dictionary](https://github.com/apache/arrow-go/blob/679d97b05d9aa2279cbe56a4ef5cfa94aa657db8/arrow/array/data.go#L42) stapled to the RecordBatches by the RecordBatchWriter which contains all of the values produced by that builder. And the ipc writer uses that rather than the RecordBatch values. The takeaway being that in Go we need some cooperation between the builder and the ipc writer to get delta dictionaries a reasonably amount of time. If we shovel RecordBatches from different builders into the same ipc writer then we get bad behavior for delta dictionaries. In rust, we don't have that `dictionary` field and therefore our ability to write delta dictionaries is always pretty bad. Unless consecutive batches contain values that are supersets of the previous then we get no deltas and just waste time comparing dictionaries. Additionally we [reset the internal dictionary](https://github.com/apache/arrow-rs/blob/94230402c2d31e7da5dc73d1a284cf17940c093c/arrow-array/src/builder/generic_bytes_dictionary_builder.rs#L435) after creating every batch in rust, presumably because we can't do anything with it anyway. My questions are: 1. Is there a better way to handle this than requiring cooperation between the dictionary builder and the ipc writers? It was difficult for me to figure out this information and I could imagine that this would surprise a lot of people who expect it to "just work" and who aren't getting delta dictionaries in different circumstances. 2. Is there a straightforward way/desire to add a similar `dictionary` field to rust data? So at least we can get delta dictionaries under similar condititions as Go writers can. Curious to hear any thoughts, thank you both for the help so far! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org