[
https://issues.apache.org/jira/browse/ARROW-18317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthew Topol resolved ARROW-18317.
-----------------------------------
Resolution: Fixed
Issue resolved by pull request 14636
[https://github.com/apache/arrow/pull/14636]
> [Go] Problem of dictionary update during a communication via an IPC stream
> ---------------------------------------------------------------------------
>
> Key: ARROW-18317
> URL: https://issues.apache.org/jira/browse/ARROW-18317
> Project: Apache Arrow
> Issue Type: Bug
> Components: Go
> Affects Versions: 10.0.0
> Reporter: Laurent Querel
> Assignee: Matthew Topol
> Priority: Major
> Labels: pull-request-available
> Fix For: 11.0.0, 10.0.1
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Dictionaries do not seem to be updated correctly when sending a record on an
> IPC stream.
> The following example creates a 1st record with a single field named "field"
> and initialized with the value "value_0. This record is then serialized with
> an ipc writer and deserialized with an ipc reader.
> A second record is then created with the value "value_1". After serialization
> and deserialization, the expected value for the field is "value_1" but I get
> "value_0".
> Based on a quick analysis via the debugger, I suspect an error in combining
> the dictionary from step 1 with the dictionary from step 2. The resulting
> dictionary contains the concatenation of the two dictionaries (i.e.
> value_0value_1), but the offsets values used to read the field (of the second
> record) refer "value_0". It may be that the offset arrays are not correctly
> combined or something like that when the second record is received.
> Below a code snippet to reproduce the issue.
> {code:java}
> // NOTE: Release methods are not managed in this test for simplicity.
> func TestDictionary(t *testing.T) {
> pool := memory.NewGoAllocator()
> // A schema with a single dictionary field
> schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type:
> &arrow.DictionaryType{
> IndexType: arrow.PrimitiveTypes.Uint16,
> ValueType: arrow.BinaryTypes.String,
> Ordered: false,
> }}}, nil)
> // IPC writer and reader
> var bufWriter bytes.Buffer
> ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
> bufReader := bytes.NewReader([]byte{})
> var ipcReader *ipc.Reader
> // Create a first record with field = "value_0"
> record := CreateRecord(t, pool, schema, 0)
> expectedJson, err := record.MarshalJSON()
> require.NoError(t, err)
> // Serialize and deserialize the record via an IPC stream
> json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter,
> ipcWriter, bufReader, ipcReader)
> require.NoError(t, err)
> // Compare the expected JSON with the actual JSON
> require.JSONEq(t, string(expectedJson), string(json))
> // Create a second record with field = "value_1"
> record = CreateRecord(t, pool, schema, 1)
> expectedJson, err = record.MarshalJSON()
> require.NoError(t, err)
> // Serialize and deserialize the record via an IPC stream
> json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter,
> ipcWriter, bufReader, ipcReader)
> require.NoError(t, err)
> // Compare the expected JSON with the actual JSON
> // field = "value_0" but should be "value_1"
> require.JSONEq(t, string(expectedJson), string(json))
> }
> // Create a record with a single field.
> // The value of field `field` depends on the value passed in parameter.
> func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema,
> value int) arrow.Record {
> rb := array.NewRecordBuilder(pool, schema)
> fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
> err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
> if err != nil {
> t.Fatal(err)
> }
> return rb.NewRecord()
> }
> // Encode and decode a record over a tuple of IPC writer and reader.
> // IPC writer and reader are the same from one call to another.
> func EncodeDecodeIpcStream(t *testing.T,
> record arrow.Record,
> bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
> bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader,
> error) {
> // Serialize the record via an ipc writer
> if err := ipcWriter.Write(record); err != nil {
> return nil, ipcReader, err
> }
> serializedRecord := bufWriter.Bytes()
> bufWriter.Reset()
> // Deserialize the record via an ipc reader
> bufReader.Reset(serializedRecord)
> if ipcReader == nil {
> newIpcReader, err := ipc.NewReader(bufReader)
> if err != nil {
> return nil, newIpcReader, err
> }
> ipcReader = newIpcReader
> }
> ipcReader.Next()
> record = ipcReader.Record()
> // Return the decoded record as a json string
> json, err := record.MarshalJSON()
> if err != nil {
> return nil, ipcReader, err
> }
> return json, ipcReader, nil
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)