Replacing ``` ac::Declaration source{"record_batch_reader_source", ac::RecordBatchReaderSourceNodeOptions{std::move(input)}}; ``` with ``` ac::RecordBatchSourceNodeOptions rb_source_options{ input->schema(), [input]() { return arrow::MakeFunctionIterator([input] { return input->Next(); }); }}; ac::Declaration source{"record_batch_source", std::move(rb_source_options)}; ``` Works as expected.
Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月26日周三 10:22写道: > > Hi, > I'll open a issue on the DeclareToReader problem. > I think the key problem is that the input stream is unordered. The > input stream is a ArrowArrayStream imported from python side, and then > declared to a "record_batch_reader_source", which is a unordered > source node. So the behavior is expected. > I think the RecordBatchReaderSourceOptions should add an ordering > parameter to indicate the input stream ordering. Otherwise, I need to > convert the record_batch_reader_source into a "record_batch_source" > with a record_batch generator. > Also, I'd like to have a discuss on dataset scanner, is it produce a > stable sequence of record batches (as an implicit ordering) when the > underlying storage is not changed? For my situation, the downstream > executor may crush, then it would request to continue from a > intermediate state (with a restart offset). I'd like to make it into a > fetch node to skip heading rows, but it seems not an optimized way. > Maybe I should inspect fragments in the dataset, to skip reading > unnecessary files, and build a FlieSystemDataset on the fly? > > Weston Pace <weston.p...@gmail.com> 于2023年7月25日周二 23:44写道: > > > > > Reading the source code of exec_plan.cc, DeclarationToReader called > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output > > > parameter in SinkNodeOptions, also, it calls validate which should > > > fail if the SinkNodeOptions honors the sequence_output. Then it seems > > > that DeclarationToReader cannot follow the input batch order? > > > > These methods should not be ignoring sequence_output. Do you want to open > > a bug? This should be a straightforward one to fix. > > > > > Then how the substrait works in this scenario? Does it output > > > disorderly as well? > > > > Probably. Much of internal Substrait testing is probably using > > DeclarationToTable or DeclarationToBatches. The ordered execution hasn't > > been adopted widely yet because the old scanner doesn't set the batch index > > and the new scanner isn't ready yet. This limits the usefulness to data > > that is already in memory (the in-memory sources do set the batch index). > > > > I think your understanding of the concept is correct however. Can you > > share a sample plan that is not working for you? If you use > > DeclarationToTable do you get consistently ordered results? > > > > On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com> wrote: > > > > > Reading the source code of exec_plan.cc, DeclarationToReader called > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output > > > parameter in SinkNodeOptions, also, it calls validate which should > > > fail if the SinkNodeOptions honors the sequence_output. Then it seems > > > that DeclarationToReader cannot follow the input batch order? > > > Then how the substrait works in this scenario? Does it output > > > disorderly as well? > > > > > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道: > > > > > > > > Hi, > > > > I'm trying to zip two streams with same order but different > > > processes. > > > > For example, the original stream comes with two column 'id' and > > > > 'age', and splits into two stream processed distributedly using acero: > > > > 1. hash the 'id' into a stream with single column 'bucket_id' and 2. > > > > classify 'age' into ['child', 'teenage', 'adult',...]. And then zip > > > > into a single stream. > > > > > > > > [ 'id' | 'age' | many other columns] > > > > | | | > > > > ['bucket_id'] ['classify'] | > > > > | | | > > > > [zipped_stream | many_other_columns] > > > > I was expecting both bucket_id and classify can keep the same order as > > > > the orginal stream before they are zipped. > > > > According to document, "ordered execution" is using batch_index to > > > > indicate the order of batches. > > > > but acero::DeclarationToReader with a QueryOptions that sequce_output > > > > is set to true does not mean that it keeps the order if the input > > > > stream is not ordered. But it doesn't fail during the execution > > > > (bucket_id and classify are not specify any ordering). Then How can I > > > > make the acero produce a stream that keep the order as the original > > > > input? > > > > -- > > > > --------------------- > > > > Best Regards, > > > > Wenbo Hu, > > > > > > > > > > > > -- > > > --------------------- > > > Best Regards, > > > Wenbo Hu, > > > > > > > -- > --------------------- > Best Regards, > Wenbo Hu, -- --------------------- Best Regards, Wenbo Hu,