JakeDern commented on PR #8001:
URL: https://github.com/apache/arrow-rs/pull/8001#issuecomment-3134275403

   @asubiotto @alamb I have an initial implementation of both writer and reader 
ready for some feedback. Thanks @asubiotto for the starter code, some of the 
additional tests were especially helpful!
   
   Something I'm curious to get thoughts on after digging through both the Go 
and Rust writer code is that the ability for the ipc writers in the rust code 
to produce delta dictionaries is very limited.
   
   Both the ipc writers in Go and Rust look at dictionaries on incoming 
RecordBatches one at a time, doing a comparison of the new dictionary with the 
old in order to determine if the new is a superset of old. I thought this was 
odd because the following test I wrote in Rust produces 0 delta dictionaries 
whereas I thought it would produce 2:
   
   ```rust
   #[test]
   fn test_deltas() {
       // Dictionary resets at ["C", "D"]
       let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A", 
"B", "E"]];
       run_test(batches, false);
   }
   ```
   
   However since the writer only looks at the dictionary values of the last 
RecordBatch, it has to completely reset when `&["C", "D"]`. comes in and 
similarly again when `&["A", "B", "E"]` comes in.
   
   This was inconsistent with what I'd seen in Go where similar code _did_ 
produce two delta dictionaries, despite seeming to follow the same/similar 
algorithm.
   
   So I dug a bit more and wrote some Go code with two different writers and 
then read it from rust. Batches 0, 2, and 3 are written using one dictionary 
builder and batch 1 is written using another:
   
   ```go
        dictType := &arrow.DictionaryType{
                IndexType: arrow.PrimitiveTypes.Int16,
                ValueType: arrow.BinaryTypes.String,
                Ordered:   false,
        }
   
        schema := arrow.NewSchema([]arrow.Field{
                {Name: "foo", Type: dictType},
        }, nil)
   
        buf := bytes.NewBuffer([]byte{})
        writer := ipc.NewWriter(buf, ipc.WithSchema(schema), 
ipc.WithDictionaryDeltas(true))
   
        allocator := memory.NewGoAllocator()
   
        dict_builder := array.NewDictionaryBuilder(allocator, dictType)
   
        builder := array.NewStringBuilder(allocator)
        builder.AppendStringValues([]string{"A", "B", "C"}, []bool{})
        dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
        record := array.NewRecord(schema, []arrow.Array{
                dict_builder.NewArray(),
        }, 3)
   
        if err := writer.Write(record); err != nil {
                panic(err)
        }
   
        // Reset builder
        dict_builder2 := array.NewDictionaryBuilder(allocator, dictType)
        builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
        
dict_builder2.AppendArray(array.NewStringData(builder.NewArray().Data()))
        record2 := array.NewRecord(schema, []arrow.Array{
                dict_builder2.NewArray(),
        }, 3)
   
        if err := writer.Write(record2); err != nil {
                panic(err)
        }
   
        builder.AppendStringValues([]string{"A", "B", "E"}, []bool{})
        dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
        record3 := array.NewRecord(schema, []arrow.Array{
                dict_builder.NewArray(),
        }, 3)
   
        if err := writer.Write(record3); err != nil {
                panic(err)
        }
   
        builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
        dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
        record4 := array.NewRecord(schema, []arrow.Array{
                dict_builder.NewArray(),
        }, 3)
   
        if err := writer.Write(record4); err != nil {
                panic(err)
        }
   
        // write buf out to ~/delta_test/delta.arrow
        if err := os.WriteFile("/home/jakedern/delta_test/delta2.arrow", 
buf.Bytes(), 0644); err != nil {
                panic(fmt.Errorf("failed to write delta file: %w", err))
        }
   ```
   
   ```rust
   [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
   [arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
   [
     "A",
     "B",
     "C",
   ]
   [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
   [arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
   [
     "A",
     "B",
     "D",
   ]
   [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
   [arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
   [
     "A",
     "B",
     "C",
     "E",
   ]
   [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = true
   [arrow-ipc/src/reader.rs:725:5] &dictionary_values = StringArray
   [
     "D",
   ]
   ```
   
   What we see is that writing a RecordBatch from a different builder basically 
resets the `ipc.Writer` because it doesn't know about values from the first 
builder. The insight that I missed the first time is that in Go, there is a 
[dictionary](https://github.com/apache/arrow-go/blob/679d97b05d9aa2279cbe56a4ef5cfa94aa657db8/arrow/array/data.go#L42)
 stapled to the RecordBatches by the RecordBatchWriter which contains all of 
the values produced by that builder. And the ipc writer uses that rather than 
the RecordBatch values.
   
   The takeaway being that in Go we need some cooperation between the builder 
and the ipc writer to get delta dictionaries a reasonably amount of time. If we 
shovel RecordBatches from different builders into the same ipc writer then we 
get bad behavior for delta dictionaries.
   
   In rust, we don't have that `dictionary` field and therefore our ability to 
write delta dictionaries is always pretty bad. Unless consecutive batches 
contain values that are supersets of the previous then we get no deltas and 
just waste time comparing dictionaries. Additionally we [reset the internal 
dictionary](https://github.com/apache/arrow-rs/blob/94230402c2d31e7da5dc73d1a284cf17940c093c/arrow-array/src/builder/generic_bytes_dictionary_builder.rs#L435)
 after creating every batch in rust, presumably because we can't do anything 
with it anyway.
   
   My questions are:
   
   1.  Is there a better way to handle this than requiring cooperation between 
the dictionary builder and the ipc writers? It was difficult for me to figure 
out this information and I could imagine that this would surprise a lot of 
people who expect it to "just work" and who aren't getting delta dictionaries 
in different circumstances.
   2. Is there a straightforward way/desire to add a similar `dictionary` field 
to rust data? So at least we can get delta dictionaries under similar 
condititions as Go writers can.
   
   Curious to hear any thoughts, thank you both for the help so far!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to