Replacing
```
ac::Declaration source{"record_batch_reader_source",
ac::RecordBatchReaderSourceNodeOptions{std::move(input)}};
```
with
```
ac::RecordBatchSourceNodeOptions rb_source_options{
input->schema(), [input]() { return
arrow::MakeFunctionIterator([input] { return input->Next(); }); }};
ac::Declaration source{"record_batch_source", std::move(rb_source_options)};
```
Works as expected.

Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月26日周三 10:22写道:
>
> Hi,
>   I'll open a issue on the DeclareToReader problem.
>   I think the key problem is that the input stream is unordered. The
> input stream is a ArrowArrayStream imported from python side, and then
> declared to a "record_batch_reader_source", which is a unordered
> source node. So the behavior is expected.
>   I think the RecordBatchReaderSourceOptions should add an ordering
> parameter to indicate the input stream ordering. Otherwise, I need to
> convert the record_batch_reader_source into a "record_batch_source"
> with a record_batch generator.
>   Also, I'd like to have a discuss on dataset scanner, is it produce a
> stable sequence of record batches (as an implicit ordering) when the
> underlying storage is not changed? For my situation, the downstream
> executor may crush, then it would request to continue from a
> intermediate state (with a restart offset). I'd like to make it into a
> fetch node to skip heading rows, but it seems not an optimized way.
> Maybe I should inspect fragments in the dataset, to skip reading
> unnecessary files, and build a FlieSystemDataset on the fly?
>
> Weston Pace <weston.p...@gmail.com> 于2023年7月25日周二 23:44写道:
> >
> > > Reading the source code of exec_plan.cc, DeclarationToReader called
> > > DeclarationToRecordBatchGenerator, which ignores the sequence_output
> > > parameter in SinkNodeOptions, also, it calls validate which should
> > > fail if the SinkNodeOptions honors the sequence_output. Then it seems
> > > that DeclarationToReader cannot follow the input batch order?
> >
> > These methods should not be ignoring sequence_output.  Do you want to open
> > a bug?  This should be a straightforward one to fix.
> >
> > > Then how the substrait works in this scenario? Does it output
> > > disorderly as well?
> >
> > Probably.  Much of internal Substrait testing is probably using
> > DeclarationToTable or DeclarationToBatches.  The ordered execution hasn't
> > been adopted widely yet because the old scanner doesn't set the batch index
> > and the new scanner isn't ready yet.  This limits the usefulness to data
> > that is already in memory (the in-memory sources do set the batch index).
> >
> > I think your understanding of the concept is correct however.  Can you
> > share a sample plan that is not working for you?  If you use
> > DeclarationToTable do you get consistently ordered results?
> >
> > On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com> wrote:
> >
> > > Reading the source code of exec_plan.cc, DeclarationToReader called
> > > DeclarationToRecordBatchGenerator, which ignores the sequence_output
> > > parameter in SinkNodeOptions, also, it calls validate which should
> > > fail if the SinkNodeOptions honors the sequence_output. Then it seems
> > > that DeclarationToReader cannot follow the input batch order?
> > > Then how the substrait works in this scenario? Does it output
> > > disorderly as well?
> > >
> > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道:
> > > >
> > > > Hi,
> > > >     I'm trying to zip two streams with same order but different
> > > processes.
> > > >     For example, the original stream comes with two column 'id' and
> > > > 'age', and splits into two stream processed distributedly using acero:
> > > > 1. hash the 'id' into a stream with single column 'bucket_id' and 2.
> > > > classify 'age' into ['child', 'teenage', 'adult',...]. And then zip
> > > > into a single stream.
> > > >
> > > >        [      'id'      |      'age'      | many other columns]
> > > >                 |                  |                        |
> > > >        ['bucket_id']   ['classify']                |
> > > >                  |                  |                       |
> > > >               [zipped_stream | many_other_columns]
> > > > I was expecting both bucket_id and classify can keep the same order as
> > > > the orginal stream before they are zipped.
> > > > According to document, "ordered execution" is using batch_index to
> > > > indicate the order of batches.
> > > > but acero::DeclarationToReader with a QueryOptions that sequce_output
> > > > is set to true does not mean that it keeps the order if the input
> > > > stream is not ordered. But it doesn't fail during the execution
> > > > (bucket_id and classify are not specify any ordering). Then How can I
> > > > make the acero produce a stream that keep the order as the original
> > > > input?
> > > > --
> > > > ---------------------
> > > > Best Regards,
> > > > Wenbo Hu,
> > >
> > >
> > >
> > > --
> > > ---------------------
> > > Best Regards,
> > > Wenbo Hu,
> > >
>
>
>
> --
> ---------------------
> Best Regards,
> Wenbo Hu,



-- 
---------------------
Best Regards,
Wenbo Hu,

Reply via email to