I agree that having a row_index is a good approach.  I'm not sure a mask
would be the ideal solution for Iceberg (though it is a reasonable feature
in its own right) because I think position-based deletes, in Iceberg, are
still done using an anti-join and not a filter.

That being said, we probably also want to implement a streaming merge-based
anti-join because I believe delete files are ordered by row_index and so a
streaming approach is likely to be much more performant.

On Mon, May 29, 2023 at 4:01 PM Will Jones <will.jones...@gmail.com> wrote:

> Hi Rusty,
>
> At first glance, I think adding a row_index column would make sense. To be
> clear, this would be an index within a file / fragment, not across multiple
> files, which don't necessarily have a known ordering in Acero (IIUC).
>
> However, another approach would be to take a mask argument in the Parquet
> reader. We may wish to do this anyways for support for using predicate
> pushdown with Parquet's page index. While Arrow C++ hasn't yet implemented
> predicate pushdown on page index (right now just supports row groups),
> Arrow Rust has and provides an API to pass in a mask to support it. The
> reason for this implementation is described in the blog post "Querying
> Parquet with Millisecond Latency" [1], under "Page Pruning". The
> RowSelection struct API is worth a look [2].
>
> I'm not yet sure which would be preferable, but I think adopting a similar
> pattern to what the Rust community has done may be wise. It's possible that
> row_index is easy to implement while the mask will take time, in which case
> row_index makes sense as an interim solution.
>
> Best,
>
> Will Jones
>
> [1]
>
> https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
> [2]
>
> https://docs.rs/parquet/40.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html
>
> On Mon, May 29, 2023 at 2:12 PM Rusty Conover <ru...@conover.me.invalid>
> wrote:
>
> > Hi Arrow Team,
> >
> > I wanted to suggest an improvement regarding Acero's Scan node.
> > Currently, it provides useful information such as __fragment_index,
> > __batch_index, __filename, and __last_in_fragment. However, it would
> > be beneficial to have an additional column that returns an overall
> > "row index" from the source.
> >
> > The row index would start from zero and increment for each row
> > retrieved from the source, particularly in the case of Parquet files.
> > Is it currently possible to obtain this row index or would expanding
> > the Scan node's behavior be required?
> >
> > Having this row index column would be valuable in implementing support
> > for Iceberg's positional-based delete files, as outlined in the
> > following link:
> >
> > https://iceberg.apache.org/spec/#delete-formats
> >
> > While Iceberg's value-based deletes can already be performed using the
> > support for anti joins, using a projection node does not guarantee the
> > row ordering within an Acero graph. Hence, the inclusion of a
> > dedicated row index column would provide a more reliable solution in
> > this context.
> >
> > Thank you for considering this suggestion.
> >
> > Rusty
> >
>

Reply via email to