HashidaTKS commented on a change in pull request #10527:
URL: https://github.com/apache/arrow/pull/10527#discussion_r667522059



##########
File path: csharp/src/Apache.Arrow/Ipc/MessageSerializer.cs
##########
@@ -73,13 +73,27 @@ internal static Schema GetSchema(Flatbuf.Schema schema)
             return new Schema(fields, metadata, copyCollections: false);
         }
 
-        private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField)
+        private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField, 
ref DictionaryMemo dictionaryMemo)
         {
             Field[] childFields = flatbufField.ChildrenLength > 0 ? new 
Field[flatbufField.ChildrenLength] : null;
             for (int i = 0; i < flatbufField.ChildrenLength; i++)
             {
                 Flatbuf.Field? childFlatbufField = flatbufField.Children(i);
-                childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value);
+                childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value, 
ref dictionaryMemo);
+            }
+
+            Flatbuf.DictionaryEncoding? dictionaryEncoding = 
flatbufField.Dictionary;
+            IArrowType type = GetFieldArrowType(flatbufField, childFields);
+
+            if (dictionaryEncoding.HasValue)
+            {
+                Flatbuf.Int? indexTypeAsInt = 
dictionaryEncoding.Value.IndexType;
+                if (!indexTypeAsInt.HasValue)

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
##########
@@ -723,4 +868,62 @@ public virtual void Dispose()
             }
         }
     }
+
+    internal static class DictionaryCollector
+    {
+        internal static void Collect(RecordBatch recordBatch, ref 
DictionaryMemo dictionaryMemo)
+        {
+            Schema schema = recordBatch.Schema;
+            for (int i = 0; i < schema.Fields.Count; i++)
+            {
+                {

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
##########
@@ -723,4 +868,62 @@ public virtual void Dispose()
             }
         }
     }
+
+    internal static class DictionaryCollector
+    {
+        internal static void Collect(RecordBatch recordBatch, ref 
DictionaryMemo dictionaryMemo)
+        {
+            Schema schema = recordBatch.Schema;
+            for (int i = 0; i < schema.Fields.Count; i++)
+            {
+                {
+                    Field field = schema.GetFieldByIndex(i);
+                    IArrowArray array = recordBatch.Column(i);
+
+                    CollectDictionary(field, array, ref dictionaryMemo);
+                }
+            }
+        }
+
+        private static void CollectDictionary(Field field, IArrowArray array, 
ref DictionaryMemo dictionaryMemo)
+        {
+            if (field.DataType.TypeId == ArrowTypeId.Dictionary)
+            {
+                IArrowArray dictionary = (array as DictionaryArray).Dictionary;
+                dictionaryMemo ??= new DictionaryMemo();
+                long id = dictionaryMemo.GetOrAssignId(field);
+
+                dictionaryMemo.AddOrReplaceDictionary(id, dictionary);
+                WalkChildren(dictionary, ref dictionaryMemo);

Review comment:
       We didn't.
   
   I added tests for a DictionaryArray with children and a ListArray with 
DictionaryType children.
   And I found and fixed some bugs about serialization for those arrays.
   
   Thank you!

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
##########
@@ -723,4 +868,62 @@ public virtual void Dispose()
             }
         }
     }
+
+    internal static class DictionaryCollector
+    {
+        internal static void Collect(RecordBatch recordBatch, ref 
DictionaryMemo dictionaryMemo)
+        {
+            Schema schema = recordBatch.Schema;
+            for (int i = 0; i < schema.Fields.Count; i++)
+            {
+                {
+                    Field field = schema.GetFieldByIndex(i);
+                    IArrowArray array = recordBatch.Column(i);
+
+                    CollectDictionary(field, array, ref dictionaryMemo);
+                }
+            }
+        }
+
+        private static void CollectDictionary(Field field, IArrowArray array, 
ref DictionaryMemo dictionaryMemo)
+        {
+            if (field.DataType.TypeId == ArrowTypeId.Dictionary)
+            {
+                IArrowArray dictionary = (array as DictionaryArray).Dictionary;
+                dictionaryMemo ??= new DictionaryMemo();
+                long id = dictionaryMemo.GetOrAssignId(field);
+
+                dictionaryMemo.AddOrReplaceDictionary(id, dictionary);
+                WalkChildren(dictionary, ref dictionaryMemo);
+            }
+            else
+            {
+                WalkChildren(array, ref dictionaryMemo);
+            }
+        }
+
+        private static void WalkChildren(IArrowArray array, ref DictionaryMemo 
dictionaryMemo)
+        {
+            ArrayData[] children = array.Data.Children;
+
+            if (children == null)
+            {
+                return;
+            }
+
+            if (!(array.Data.DataType is NestedType nestedType))
+            {
+                return;
+            }
+
+            for (int i = 0; i < nestedType.Fields.Count; i++)
+            {
+                Field childField = nestedType.Fields[i];
+                ArrayData child = children[i];
+                IArrowArray childArray = ArrowArrayFactory.BuildArray(child);

Review comment:
       Right, it was wasteful.
   I modified to use `ArrayData` to walk over the children and execute 
`BuildArray()` only when it is needed.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -59,6 +97,75 @@ protected async ValueTask<RecordBatch> 
ReadRecordBatchAsync(CancellationToken ca
         {
             await ReadSchemaAsync().ConfigureAwait(false);
 
+            await 
ReadInitialDictionariesAsync(cancellationToken).ConfigureAwait(false);
+
+            return await 
ReadArrowObjectAsync(cancellationToken).ConfigureAwait(false);
+        }
+
+
+        protected RecordBatch ReadRecordBatch()
+        {
+            ReadSchema();
+
+            ReadInitialDictionaries();
+
+            return ReadArrowObject();

Review comment:
       I tested reading interleaved dictionaries by creating a test file with 
python and reading the test file with C#, and it seemed to work fine.
   The code for creating the test file is below, this code is based on the test 
code of the python implementation.
   
   
https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_ipc.py#L417
   
   ```
   import pyarrow as pa
   
   ty = pa.dictionary(pa.int8(), pa.utf8())
   data = [["foo", "foo", None],
               ["foo", "bar", "foo"],
               ["foo", "bar"],
               ["foo", None, "bar", "quux"], 
               ["bar", "quux"],
               ]
   batches = [
           pa.RecordBatch.from_arrays([pa.array(v, type=ty)], names=['dicts'])
           for v in data]
   schema = batches[0].schema
   
   def write_batches():
       with pa.RecordBatchStreamWriter("./dictionary_batch_test.batch", schema 
= schema) as writer:
           for batch in batches:
               writer.write_batch(batch)
   
   st = write_batches()
   ```
   
   We should add C# tests for this after `ArrowStreamWriter` supports writing 
interleaved dictionaries.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -59,6 +97,75 @@ protected async ValueTask<RecordBatch> 
ReadRecordBatchAsync(CancellationToken ca
         {
             await ReadSchemaAsync().ConfigureAwait(false);
 
+            await 
ReadInitialDictionariesAsync(cancellationToken).ConfigureAwait(false);
+
+            return await 
ReadArrowObjectAsync(cancellationToken).ConfigureAwait(false);
+        }
+
+
+        protected RecordBatch ReadRecordBatch()
+        {
+            ReadSchema();
+
+            ReadInitialDictionaries();
+
+            return ReadArrowObject();

Review comment:
       Sorry, it slipped my mind. That is not intentional.
   I modified to be able to read interleaved dictionaries.
   
   During the modification, I realized that `ReadInitialDictionariesAsync` and 
`ReadInitialDictionaries` are needless, so I removed them.
   
   Related to this, `ArrowStreamWriter` does not support writing interleaved 
dictionaries for now.
   I would like to implement it in another PR because this PR will become too 
large.
   How is that?

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
##########
@@ -723,4 +868,62 @@ public virtual void Dispose()
             }
         }
     }
+
+    internal static class DictionaryCollector
+    {
+        internal static void Collect(RecordBatch recordBatch, ref 
DictionaryMemo dictionaryMemo)
+        {
+            Schema schema = recordBatch.Schema;
+            for (int i = 0; i < schema.Fields.Count; i++)
+            {
+                {
+                    Field field = schema.GetFieldByIndex(i);
+                    IArrowArray array = recordBatch.Column(i);
+
+                    CollectDictionary(field, array, ref dictionaryMemo);
+                }
+            }
+        }
+
+        private static void CollectDictionary(Field field, IArrowArray array, 
ref DictionaryMemo dictionaryMemo)
+        {
+            if (field.DataType.TypeId == ArrowTypeId.Dictionary)
+            {
+                IArrowArray dictionary = (array as DictionaryArray).Dictionary;
+                dictionaryMemo ??= new DictionaryMemo();
+                long id = dictionaryMemo.GetOrAssignId(field);
+
+                dictionaryMemo.AddOrReplaceDictionary(id, dictionary);
+                WalkChildren(dictionary, ref dictionaryMemo);
+            }
+            else
+            {
+                WalkChildren(array, ref dictionaryMemo);
+            }
+        }
+
+        private static void WalkChildren(IArrowArray array, ref DictionaryMemo 
dictionaryMemo)
+        {
+            ArrayData[] children = array.Data.Children;
+
+            if (children == null)
+            {
+                return;
+            }
+
+            if (!(array.Data.DataType is NestedType nestedType))
+            {
+                return;
+            }
+
+            for (int i = 0; i < nestedType.Fields.Count; i++)
+            {
+                Field childField = nestedType.Fields[i];
+                ArrayData child = children[i];
+                IArrowArray childArray = ArrowArrayFactory.BuildArray(child);
+
+                CollectDictionary(childField, childArray, ref dictionaryMemo);
+            }
+        }

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
##########
@@ -248,6 +263,13 @@ private protected void 
WriteRecordBatchInternal(RecordBatch recordBatch)
                 HasWrittenSchema = true;
             }
 
+            if (!HasWrittenDictionaryBatch)
+            {
+                DictionaryCollector.Collect(recordBatch, ref _dictionaryMemo);

Review comment:
       The benchmark results are below.
   
   before changes:
   
   |                                    Method |   Count |       Mean |     
Error |     StdDev |     Median | Gen 0 | Gen 1 | Gen 2 |   Allocated |
   |------------------------------------------ |-------- 
|-----------:|----------:|-----------:|-----------:|------:|------:|------:|------------:|
   |               ArrowReaderWithMemoryStream |   10000 |   2.869 ms | 0.0561 
ms |  0.0982 ms |   2.841 ms |     - |     - |     - |    10.09 KB |
   | ArrowReaderWithMemoryStream_ManagedMemory |   10000 |   2.371 ms | 0.0474 
ms |  0.1116 ms |   2.335 ms |     - |     - |     - |  1444.86 KB |
   |                     ArrowReaderWithMemory |   10000 |   2.122 ms | 0.1173 
ms |  0.3365 ms |   2.160 ms |     - |     - |     - |     7.67 KB |
   |               ArrowReaderWithMemoryStream | 1000000 | 220.505 ms | 4.3805 
ms | 12.5684 ms | 219.305 ms |     - |     - |     - |     8.13 KB |
   | ArrowReaderWithMemoryStream_ManagedMemory | 1000000 | 211.168 ms | 4.1733 
ms | 11.1395 ms | 206.210 ms |     - |     - |     - | 142425.4 KB |
   |                     ArrowReaderWithMemory | 1000000 | 166.486 ms | 3.9192 
ms | 11.5558 ms | 165.832 ms |     - |     - |     - |     7.59 KB |
   
   |     Method | BatchLength | ColumnSetCount |       Mean |     Error |    
StdDev |     Median | Gen 0 | Gen 1 | Gen 2 | Allocated |
   |----------- |------------ |--------------- 
|-----------:|----------:|----------:|-----------:|------:|------:|------:|----------:|
   | WriteBatch |       10000 |             10 |   2.844 ms | 0.1597 ms | 
0.4633 ms |   2.732 ms |     - |     - |     - |  78.06 KB |
   | WriteBatch |       10000 |             14 |   3.913 ms | 0.2181 ms | 
0.6362 ms |   3.692 ms |     - |     - |     - | 112.16 KB |
   | WriteBatch |     1000000 |             10 | 239.486 ms | 4.7245 ms | 
4.4193 ms | 239.822 ms |     - |     - |     - |   79.1 KB |
   | WriteBatch |     1000000 |             14 | 331.087 ms | 6.2670 ms | 
7.4604 ms | 331.364 ms |     - |     - |     - | 111.48 KB |
   
   
   
   after changes:
   
   |                                    Method |   Count |       Mean |     
Error |     StdDev |     Median |     Gen 0 | Gen 1 | Gen 2 |    Allocated |
   |------------------------------------------ |-------- 
|-----------:|----------:|-----------:|-----------:|----------:|------:|------:|-------------:|
   |               ArrowReaderWithMemoryStream |   10000 |   2.820 ms | 0.0560 
ms |  0.1319 ms |   2.788 ms | 1000.0000 |     - |     - |     10.27 KB |
   | ArrowReaderWithMemoryStream_ManagedMemory |   10000 |   2.376 ms | 0.0475 
ms |  0.1252 ms |   2.329 ms |         - |     - |     - |   1445.03 KB |
   |                     ArrowReaderWithMemory |   10000 |   2.233 ms | 0.0305 
ms |  0.0255 ms |   2.235 ms |         - |     - |     - |      7.81 KB |
   |               ArrowReaderWithMemoryStream | 1000000 | 206.608 ms | 4.1289 
ms | 10.5095 ms | 201.813 ms |         - |     - |     - |       8.3 KB |
   | ArrowReaderWithMemoryStream_ManagedMemory | 1000000 | 226.408 ms | 4.4877 
ms | 11.7435 ms | 222.866 ms |         - |     - |     - | 142425.57 KB |
   |                     ArrowReaderWithMemory | 1000000 | 149.248 ms | 2.3519 
ms |  5.1625 ms | 147.150 ms |         - |     - |     - |      7.73 KB |
   
   
   |     Method | BatchLength | ColumnSetCount |       Mean |     Error |     
StdDev |     Median | Gen 0 | Gen 1 | Gen 2 | Allocated |
   |----------- |------------ |--------------- 
|-----------:|----------:|-----------:|-----------:|------:|------:|------:|----------:|
   | WriteBatch |       10000 |             10 |   2.794 ms | 0.1591 ms |  
0.4566 ms |   2.639 ms |     - |     - |     - |  78.97 KB |
   | WriteBatch |       10000 |             14 |   3.991 ms | 0.2347 ms |  
0.6772 ms |   3.776 ms |     - |     - |     - | 113.35 KB |
   | WriteBatch |     1000000 |             10 | 245.601 ms | 8.2668 ms | 
23.8516 ms | 240.295 ms |     - |     - |     - |  77.97 KB |
   | WriteBatch |     1000000 |             14 | 317.090 ms | 6.2694 ms | 
13.4956 ms | 317.577 ms |     - |     - |     - | 112.35 KB |




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to