Thank you both for the extra information.  Acero couldn't actually merge
the streams today, I was thinking more of datafusion and velox which would
often want to keep the streams separate, especially if there was some kind
of filtering or transformation that could be applied before applying a
sorted merge.

However, I also very much agree that both scenarios are valid.  First, if
there are a lot of partitions (e.g. far more than the # of parallelism
units) then you probably don't want parallel paths for all of them.

Second, as you said, simpler clients (e.g. those where all filtering is
down downstream, or those that don't need any filtering at all) will
appreciate flight's ability to merge for them.  It makes the client more
complex but given that clients are already doing this to some extent it
seems worthwhile.

On Thu, Apr 27, 2023 at 7:45 PM David Li <lidav...@apache.org> wrote:

> In addition to Kou's response:
>
> The individual endpoints have always represented a subset of a single
> stream of data. So each endpoint in a FlightInfo is a partition of the
> overall result set.
>
> Not all clients want to deal with reading all the Flight streams
> themselves and may want a single stream of data. (For example: ADBC exposes
> both paths. The JDBC driver also has to deal with this.) So some client
> libraries have to deal with the question of whether to read in parallel and
> whether to keep the result in order or not. A more advanced use case, like
> Acero, would probably read the endpoints itself and could use this flag to
> decide how to merge the streams.
>
> On Fri, Apr 28, 2023, at 09:56, Sutou Kouhei wrote:
> > Hi,
> >
> >> This seems of very limited value if, for example, if the user desired
> DESC
> >> order, then the endpoint would return
> >>
> >> Endpoint 1: (C, B, A)
> >> Endpoint 2: (F, E, D)
> >
> > As David said, the server returns
> >
> > Endpoint 2: (F, E, D)
> > Endpoint 1: (C, B, A)
> >
> > in this case.
> >
> > Here is an use case I think:
> >
> > A system has time series data. Each node in the system has
> > data for one day. If a client requests "SELECT * FROM data
> > WHERE server = 'server1' ORDER BY created_at DESC", the
> > system returns the followings:
> >
> > Endpoint 20230428: (DATA_FOR_2023_04_28)
> > Endpoint 20230427: (DATA_FOR_2023_04_27)
> > Endpoint 20230426: (DATA_FOR_2023_04_26)
> > ...
> >
> > If we have the "ordered" flag, the client can assume that
> > received data are sorted. In other words, if the client
> > reads data from Endpoint 20230428 -> Endpoint 20230427 ->
> > Endpoint 20230426, the data the client read is sorted.
> >
> > If we don't have the "ordered" flag and we use "the relative
> > ordering of data from different endpoints is implementation
> > defined", we can't implement a general purpose Flight based
> > client library (Flight SQL based client library, Flight SQL
> > based ADBC driver and so on). The client library will have
> > the following code:
> >
> >   # TODO: How to detect server_type?
> >   if server_type == "DB1"
> >     # DB1 returns ordered result.
> >     endpoints.each do |endpoint|
> >       yield(endpoints.read)
> >     end
> >   else
> >     # Other DBs doesn't return ordered result.
> >     # So, we read data in parallel for performance.
> >     threads = endpoints.collect do |endpoint|
> >       Thread.new do
> >         yield(endpoints.read)
> >       end
> >     end
> >     threads.each do |thread|
> >       thread.join
> >     end
> >   end
> >
> > The client library needs to add 'or server_type == "DB2"' to
> > 'if server_type == "DB1"' when DB2 also adds support for
> > ordered result. If DB2 2.0 or later is only ordered result
> > ready, the client library needs more condition 'or
> > (server_type == "DB2" and server_version > 2.0)'.
> >
> > So I think that the "ordered" flag is useful.
> >
> >
> > Thanks,
> > --
> > kou
> >
> > In <CAFhtnRxzMaoqmzWPkqsLoJZW5jmx=d_i9ojd9xy1ydkgkgz...@mail.gmail.com>
> >   "Re: [DISCUSS][Format][Flight] Ordered data support" on Thu, 27 Apr
> > 2023 10:55:32 -0400,
> >   Andrew Lamb <al...@influxdata.com> wrote:
> >
> >> I wonder if we have considered simply removing the statement "There is
> no
> >> ordering defined on endpoints. Hence, if the returned data has an
> ordering,
> >> it should be returned in a single endpoint." and  replacing it with
> >> something that says "the relative ordering of data from different
> endpoints
> >> is implementation defined"
> >>
> >> I am struggling to come up with a concrete usecase for the "ordered"
> flag.
> >>
> >> The ticket references "distributed sort" but most distributed sort
> >> algorithms I know of would produce multiple sorted streams that need to
> be
> >> merged together. For example
> >>
> >> Endpoint 1: (B, C, D)
> >> Endpoint 2: (A, E, F)
> >>
> >> It is not clear how the "ordered" flag would help here
> >>
> >> If the intent is somehow to signal the client it doesn't have to merge
> >> (e.g. with data like)
> >>
> >> Endpoint 1: (A, B, C)
> >> Endpoint 2:  (D, E, F)
> >>
> >> This seems of very limited value if, for example, if the user desired
> DESC
> >> order, then the endpoint would return
> >>
> >> Endpoint 1: (C, B, A)
> >> Endpoint 2: (F, E, D)
> >>
> >> Which doesn't seem to conform to the updated definition
> >>
> >> Andrew
> >>
> >>
> >> On Tue, Apr 25, 2023 at 8:56 PM Sutou Kouhei <k...@clear-code.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> I would like to propose adding support for ordered data to
> >>> Apache Arrow Flight. If anyone has comments for this
> >>> proposal, please share them at here or the issue for this
> >>> proposal: https://github.com/apache/arrow/issues/34852
> >>>
> >>> This is one of proposals in "[DISCUSS] Flight RPC/Flight
> >>> SQL/ADBC enhancements":
> >>>
> >>>   https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp
> >>>
> >>> See also the "Flight RPC: Ordered Data" section in the
> >>> design document for the proposals:
> >>>
> >>>
> >>>
> https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit#
> >>>
> >>> Background:
> >>>
> >>> Currently, the endpoints within a FlightInfo explicitly have
> >>> no ordering.
> >>>
> >>> This is unnecessarily limiting. Systems can and do implement
> >>> distributed sorts, but they can't reflect this in the
> >>> current specification.
> >>>
> >>> Proposal:
> >>>
> >>> Add a flag to FlightInfo. If the flag is set, the client may
> >>> assume that the data is sorted in the same order as the
> >>> endpoints. Otherwise, the client cannot make any assumptions
> >>> (as before).
> >>>
> >>> This is a compatible change because the client can just
> >>> ignore the flag.
> >>>
> >>> Implementation:
> >>>
> >>> https://github.com/apache/arrow/pull/35178 is an
> >>> implementation of this proposal. The pull requests has the
> >>> followings:
> >>>
> >>> 1. Format changes:
> >>>
> >>>
> https://github.com/apache/arrow/pull/35178/files#diff-53b6c132dcc789483c879f667a1c675792b77aae9a056b257d6b20287bb09dba
> >>>    * format/Flight.proto
> >>>
> >>> 2. Documentation changes:
> >>>
> >>>
> https://github.com/apache/arrow/pull/35178/files#diff-839518fb41e923de682e8587f0b6fdb00eb8f3361d360c2f7249284a136a7d89
> >>>    * docs/source/format/Flight.rst
> >>>
> >>> 3. The C++ implementation and an integration test:
> >>>    * cpp/src/arrow/flight/
> >>>
> >>> 4. The Java implementation and an integration test (thanks to David
> Li!):
> >>>    * java/flight/
> >>>
> >>> 5. The Go implementation and an integration test:
> >>>    * go/arrow/flight/
> >>>    * go/arrow/internal/flight_integration/
> >>>
> >>> Next:
> >>>
> >>> I'll start a vote for this proposal after we reach a consensus
> >>> on this proposal.
> >>>
> >>> It's the standard process for format change.
> >>> See also:
> >>>
> >>> * [VOTE] Formalize how to change format
> >>>   https://lists.apache.org/thread/jlc4wtt09rfszlzqdl55vrc4dxzscr4c
> >>> * GH-35084: [Docs][Format] Add how to change format specification
> >>>   https://github.com/apache/arrow/pull/35174
> >>>
> >>>
> >>> Thanks,
> >>> --
> >>> kou
> >>>
>

Reply via email to