Reading the source code of exec_plan.cc, DeclarationToReader called DeclarationToRecordBatchGenerator, which ignores the sequence_output parameter in SinkNodeOptions, also, it calls validate which should fail if the SinkNodeOptions honors the sequence_output. Then it seems that DeclarationToReader cannot follow the input batch order? Then how the substrait works in this scenario? Does it output disorderly as well?
Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道: > > Hi, > I'm trying to zip two streams with same order but different processes. > For example, the original stream comes with two column 'id' and > 'age', and splits into two stream processed distributedly using acero: > 1. hash the 'id' into a stream with single column 'bucket_id' and 2. > classify 'age' into ['child', 'teenage', 'adult',...]. And then zip > into a single stream. > > [ 'id' | 'age' | many other columns] > | | | > ['bucket_id'] ['classify'] | > | | | > [zipped_stream | many_other_columns] > I was expecting both bucket_id and classify can keep the same order as > the orginal stream before they are zipped. > According to document, "ordered execution" is using batch_index to > indicate the order of batches. > but acero::DeclarationToReader with a QueryOptions that sequce_output > is set to true does not mean that it keeps the order if the input > stream is not ordered. But it doesn't fail during the execution > (bucket_id and classify are not specify any ordering). Then How can I > make the acero produce a stream that keep the order as the original > input? > -- > --------------------- > Best Regards, > Wenbo Hu, -- --------------------- Best Regards, Wenbo Hu,