Hi,
  I'll open a issue on the DeclareToReader problem.
  I think the key problem is that the input stream is unordered. The
input stream is a ArrowArrayStream imported from python side, and then
declared to a "record_batch_reader_source", which is a unordered
source node. So the behavior is expected.
  I think the RecordBatchReaderSourceOptions should add an ordering
parameter to indicate the input stream ordering. Otherwise, I need to
convert the record_batch_reader_source into a "record_batch_source"
with a record_batch generator.
  Also, I'd like to have a discuss on dataset scanner, is it produce a
stable sequence of record batches (as an implicit ordering) when the
underlying storage is not changed? For my situation, the downstream
executor may crush, then it would request to continue from a
intermediate state (with a restart offset). I'd like to make it into a
fetch node to skip heading rows, but it seems not an optimized way.
Maybe I should inspect fragments in the dataset, to skip reading
unnecessary files, and build a FlieSystemDataset on the fly?

Weston Pace <weston.p...@gmail.com> 于2023年7月25日周二 23:44写道:
>
> > Reading the source code of exec_plan.cc, DeclarationToReader called
> > DeclarationToRecordBatchGenerator, which ignores the sequence_output
> > parameter in SinkNodeOptions, also, it calls validate which should
> > fail if the SinkNodeOptions honors the sequence_output. Then it seems
> > that DeclarationToReader cannot follow the input batch order?
>
> These methods should not be ignoring sequence_output.  Do you want to open
> a bug?  This should be a straightforward one to fix.
>
> > Then how the substrait works in this scenario? Does it output
> > disorderly as well?
>
> Probably.  Much of internal Substrait testing is probably using
> DeclarationToTable or DeclarationToBatches.  The ordered execution hasn't
> been adopted widely yet because the old scanner doesn't set the batch index
> and the new scanner isn't ready yet.  This limits the usefulness to data
> that is already in memory (the in-memory sources do set the batch index).
>
> I think your understanding of the concept is correct however.  Can you
> share a sample plan that is not working for you?  If you use
> DeclarationToTable do you get consistently ordered results?
>
> On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com> wrote:
>
> > Reading the source code of exec_plan.cc, DeclarationToReader called
> > DeclarationToRecordBatchGenerator, which ignores the sequence_output
> > parameter in SinkNodeOptions, also, it calls validate which should
> > fail if the SinkNodeOptions honors the sequence_output. Then it seems
> > that DeclarationToReader cannot follow the input batch order?
> > Then how the substrait works in this scenario? Does it output
> > disorderly as well?
> >
> > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道:
> > >
> > > Hi,
> > >     I'm trying to zip two streams with same order but different
> > processes.
> > >     For example, the original stream comes with two column 'id' and
> > > 'age', and splits into two stream processed distributedly using acero:
> > > 1. hash the 'id' into a stream with single column 'bucket_id' and 2.
> > > classify 'age' into ['child', 'teenage', 'adult',...]. And then zip
> > > into a single stream.
> > >
> > >        [      'id'      |      'age'      | many other columns]
> > >                 |                  |                        |
> > >        ['bucket_id']   ['classify']                |
> > >                  |                  |                       |
> > >               [zipped_stream | many_other_columns]
> > > I was expecting both bucket_id and classify can keep the same order as
> > > the orginal stream before they are zipped.
> > > According to document, "ordered execution" is using batch_index to
> > > indicate the order of batches.
> > > but acero::DeclarationToReader with a QueryOptions that sequce_output
> > > is set to true does not mean that it keeps the order if the input
> > > stream is not ordered. But it doesn't fail during the execution
> > > (bucket_id and classify are not specify any ordering). Then How can I
> > > make the acero produce a stream that keep the order as the original
> > > input?
> > > --
> > > ---------------------
> > > Best Regards,
> > > Wenbo Hu,
> >
> >
> >
> > --
> > ---------------------
> > Best Regards,
> > Wenbo Hu,
> >



-- 
---------------------
Best Regards,
Wenbo Hu,

Reply via email to