That would be great!

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dwe...@netflix.com> wrote:

> Hey Gautam,
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
> I'll reach out directly and see if we can get together.
>
> -Dan
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam <gautamkows...@gmail.com> wrote:
>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote:
>>
>>> Hey Guys,
>>>            Sorry bout the delay on this. Just got back on getting a
>>> basic working implementation in Iceberg for Vectorization on primitive
>>> types.
>>>
>>> *Here's what I have so far :  *
>>>
>>> I have added `ParquetValueReader` implementations for some basic
>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>> value vector reader there are column iterators that read from the parquet
>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>> work properly with the underlying interfaces.  I'v also made changes to
>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>> planning runtime works fine with these changes.
>>>
>>> Although it fails during query execution, the bit it's  currently
>>> failing at is this line of code :
>>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>
>>> This code, I think,  tries to apply the iterator's schema projection on
>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>> InternalRow only. If I take this out and just return the
>>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>> There's a Todo in the code that says "*remove the projection by
>>> reporting the iterator's schema back to Spark*".  Is there a simple way
>>> to communicate that to Spark for my new iterator? Any pointers on how to
>>> get around this?
>>>
>>>
>>> Thanks and Regards,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Replies inline.
>>>>
>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote:
>>>>
>>>>> Thanks for responding Ryan,
>>>>>
>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>
>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>> currently doesn't have converters for map type. How can I default these
>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>> primitives?
>>>>>
>>>>
>>>> We should look at what Spark does to handle maps.
>>>>
>>>> I think we should get the prototype working with test cases that don't
>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>> start and just won't hit these problems.
>>>>
>>>>
>>>>> Lemme know if this makes sense...
>>>>>
>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>
>>>>
>>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>>> not too familiar with the Arrow APIs.
>>>>
>>>>
>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>
>>>>
>>>> Sounds good for primitives, but I would just leave the nested types
>>>> un-implemented for now.
>>>>
>>>>
>>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>>> alongside regular columns that don't have arrow based support ?
>>>>>
>>>>
>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>> has to be all one or the other. That's why it's easier to start with
>>>> primitives, then add structs, then lists, and finally maps.
>>>>
>>>>
>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>
>>>>
>>>> We will probably need two paths. One for columnar batches and one for
>>>> row-based reads. That doesn't need to be done right away and what you
>>>> already have in your working copy makes sense as a start.
>>>>
>>>>
>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>> [1] -
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>

Reply via email to