Thanks both for the suggestions, it makes sense.

I will try with SourceNode with the factory method first because my
service/client API doesn't support parallel read yet. (Parallel reading
while preserving data ordering via flight protocol is something I thought
about a little bit but probably something to solve later)

Li

On Tue, Sep 13, 2022 at 8:39 PM Weston Pace <weston.p...@gmail.com> wrote:

> Yes.  If you need the source node to read in parallel OR if you have
> multiple fragments (especially if those fragments don't have identical
> schemas) then you want a dataset and not just a plain source node.
>
> On Tue, Sep 13, 2022 at 1:55 PM David Li <lidav...@apache.org> wrote:
> >
> > Yeah, I concur with Weston.
> >
> > > To start with I think a custom factory function will be sufficient
> > > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > > options would somehow describe the coordinates of the flight endpoint.
> >
> > These 'coordinates' would be a FlightDescriptor.
> >
> > > However, it might be nice if "open a connection to the flight
> > > endpoint" happened during the call to StartProducing and not during
> > > the factory function call.  This could maybe be a follow-up task.
> >
> > The factory would call GetFlightInfo (or maybe GetSchema, from what it
> sounds like) to get the schema, but this wouldn't actually read any data.
> StartProducing would then actually call DoGet to actually read data.
> >
> > ---
> >
> > The reason why I suggested adapting Flight to Dataset, assuming this
> matches the semantics of your service, is because it encapsulates these
> steps, but reuses all the machinery we already have:
> >
> > - Dataset discovery naturally becomes GetFlightInfo. (Semantically, this
> is like beginning execution of a query, and returns one or more partitions
> where the result set can be read.)
> > - Those partitions then each become a Fragment, and then they can be
> read in parallel by Dataset.
> >
> > It sounds like the service in question here isn't quite that complex,
> though, so no need to necessarily go that far.
> >
> > On Tue, Sep 13, 2022, at 19:18, Weston Pace wrote:
> > >> The alternative path of subclassing SourceNode and having
> ExecNode::Init or
> > >> ExecNode::StartProducing seems quite a bit of change (also I don't
> think
> > >> SourceNode is exposed via public header). But let me know if you
> think I am
> > >> missing something.
> > >
> > > Agreed that we don't want to go this route.  David's suggestion is a
> > > good idea.  However, this shouldn't be the responsibility of the
> > > caller exactly.
> > >
> > > In other words (and my lack of detailed knowledge about flight is
> > > probably going to leak here) there should still be a factory function
> > > (e.g. "flight_source" or something like that) and a custom options
> > > object (FlightSourceOptions).
> > >
> > > To start with I think a custom factory function will be sufficient
> > > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > > options would somehow describe the coordinates of the flight endpoint.
> > > The factory function would open a connection to the flight endpoint
> > > and convert this into a record batch reader.  Then it would create one
> > > of the node's that Yaron has contributed and return that.
> > >
> > > However, it might be nice if "open a connection to the flight
> > > endpoint" happened during the call to StartProducing and not during
> > > the factory function call.  This could maybe be a follow-up task.
> > > Perhaps source node could change so that, instead of accepting an
> > > AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
> > > it could execute that function during the call to StartProducing.
> > >
> > > On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ice.xell...@gmail.com> wrote:
> > >>
> > >> Thanks Yaron for the pointer to that PR.
> > >>
> > >> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com>
> wrote:
> > >>
> > >> > If you can wrap the flight reader as a RecordBatchReader, then
> another
> > >> > possibility is using an upcoming PR (
> > >> > https://github.com/apache/arrow/pull/14041) that enables
> SourceNode to
> > >> > accept it. You would need to know the schema when configuring the
> > >> > SourceNode, but you won't need to derived from SourceNode.
> > >> >
> > >> >
> > >> > Yaron.
> > >> > ________________________________
> > >> > From: Li Jin <ice.xell...@gmail.com>
> > >> > Sent: Tuesday, September 13, 2022 3:58 PM
> > >> > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > >> > Subject: Re: Integration between Flight and Acero
> > >> >
> > >> > Update:
> > >> >
> > >> > I am going to try what David Li suggested here:
> > >> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
> > >> >
> > >> > This seems to be the least amount of code. This does require calling
> > >> > "DoGet" at Acero plan/node creation time rather than execution time
> but I
> > >> > don't think it's a big deal for now.
> > >> >
> > >> > The alternative path of subclassing SourceNode and having
> ExecNode::Init or
> > >> > ExecNode::StartProducing seems quite a bit of change (also I don't
> think
> > >> > SourceNode is exposed via public header). But let me know if you
> think I am
> > >> > missing something.
> > >> >
> > >> > Li
> > >> >
> > >> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com>
> wrote:
> > >> >
> > >> > > Hi Li,
> > >> > >
> > >> > > Here's my 2 cents about the Ibis/Substrait part of this.
> > >> > >
> > >> > > An Ibis expression carries a schema. If you're planning to create
> an
> > >> > > integrated Ibis/Substrait/Arrow solution, then you'll need the
> schema to
> > >> > be
> > >> > > available to Ibis in Python. So, you'll need a Python wrapper for
> the C++
> > >> > > implementation you have in mind for the GetSchema method. I think
> you
> > >> > > should pass the schema obtained by (the wrapped) GetSchema to an
> Ibis
> > >> > node,
> > >> > > rather than defining a new Ibis node that would have to access the
> > >> > network
> > >> > > to get the schema on its own.
> > >> > >
> > >> > > Given the above, I agree with you that when the Acero node is
> created its
> > >> > > schema would already be known.
> > >> > >
> > >> > >
> > >> > > Yaron.
> > >> > > ________________________________
> > >> > > From: Li Jin <ice.xell...@gmail.com>
> > >> > > Sent: Thursday, September 1, 2022 2:49 PM
> > >> > > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > >> > > Subject: Re: Integration between Flight and Acero
> > >> > >
> > >> > > Thanks David. I think my original question might not have been
> accurate
> > >> > so
> > >> > > I will try to rephrase my question:
> > >> > >
> > >> > > My ultimate goal is to add an ibis source node:
> > >> > >
> > >> > > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> > >> > >     url = ... # e.g. "my_storage://my_path"
> > >> > >     begin = ... # e.g. "20220101"
> > >> > >     end = ... # e.g. "20220201"
> > >> > >
> > >> > > and pass it to Acero and have Acero create a source node that
> knows how
> > >> > to
> > >> > > read from my_storage. Currently, I have a C++ class that looks
> like this
> > >> > > that knows how to read/write data:
> > >> > >
> > >> > > class MyStorageClient {
> > >> > >
> > >> > >     public:
> > >> > >
> > >> > >         /// \brief Construct a client
> > >> > >
> > >> > >         MyStorageClient(const std::string& service_location);
> > >> > >
> > >> > >
> > >> > >
> > >> > >         /// \brief Read data from a table streamingly
> > >> > >
> > >> > >         /// \param[in] table_uri
> > >> > >
> > >> > >         /// \param[in] start_time The start time (inclusive),
> e.g.,
> > >> > > '20100101'
> > >> > >
> > >> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > >> > '20100110'
> > >> > >
> > >> > >
>  arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> > >> > > ReadStream(const std::string& table_uri, const std::string&
> start_time,
> > >> > > const std::string& end_time);
> > >> > >
> > >> > >
> > >> > >
> > >> > >         /// \brief Write data to a table streamingly
> > >> > >
> > >> > >         /// This method will return a FlightStreamWriter that can
> be used
> > >> > > for streaming data into
> > >> > >
> > >> > >         /// \param[in] table_uri
> > >> > >
> > >> > >         /// \param[in] start_time The start time (inclusive),
> e.g.,
> > >> > > '20100101'
> > >> > >
> > >> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > >> > '20100110'
> > >> > >
> > >> > >         arrow::Result<DoPutResult> WriteStream(const std::string&
> > >> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> > >> > std::string
> > >> > > &start_time, const std::string &end_time);
> > >> > >
> > >> > >
> > >> > >
> > >> > >         /// \brief Get schema of a table.
> > >> > >
> > >> > >         /// \param[in] table The Smooth table name, e.g.,
> > >> > > smooth:/research/user/ljin/test
> > >> > >
> > >> > >         arrow::Result<std::shared_ptr<arrow::Schema>>
> GetSchema(const
> > >> > > std::string& table_uri);
> > >> > >     };
> > >> > >
> > >> > > I think Acero node's schema must be known when the node is
> created, I'd
> > >> > > imagine I would implement MyStorageExecNode that gets created by
> > >> > > SubstraitConsumer (via some registration mechanism in
> SubstraitConsumer):
> > >> > >
> > >> > > (1) GetSchema is called in SubstraitConsumer when creating the
> node
> > >> > > (network call to the storage backend to get schema)
> > >> > > (2) ReadStream is called in either ExecNode::Init or
> > >> > > ExecNode::StartProducing
> > >> > > to create the FlightStreamReader (3) Some thread (either the
> Plan's
> > >> > > execution thread or the thread owned by MyStorageExecNode) will
> read from
> > >> > > FlightStreamReader and send data downstream.
> > >> > >
> > >> > > Does that sound like the right approach or is there some other
> way I
> > >> > should
> > >> > > do this?
> > >> > >
> > >> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <lidav...@apache.org>
> wrote:
> > >> > >
> > >> > > > Hi Li,
> > >> > > >
> > >> > > > It'd depend on how exactly you expect everything to fit
> together, and I
> > >> > > > think the way you'd go about it would depend on what exactly the
> > >> > > > application is. For instance, you could have the application
> code do
> > >> > > > everything up through DoGet and get a reader, then create a
> SourceNode
> > >> > > from
> > >> > > > the reader and continue from there.
> > >> > > >
> > >> > > > Otherwise, I would think the way to go would be to be able to
> create a
> > >> > > > node from a FlightDescriptor (which would contain the
> URL/parameters in
> > >> > > > your example). In that case, I think it'd fit into Arrow
> Dataset, under
> > >> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to
> dataset
> > >> > > > discovery, and each FlightEndpoint in the FlightInfo to a
> Fragment. As
> > >> > a
> > >> > > > bonus, there's already good integration between Dataset and
> Acero and
> > >> > > this
> > >> > > > should naturally do things like read the FlightEndpoints in
> parallel
> > >> > with
> > >> > > > readahead and so on.
> > >> > > >
> > >> > > > That means: you'd start with the FlightDescriptor, and create a
> Dataset
> > >> > > > from it. This will call GetFlightInfo under the hood. (There's
> a minor
> > >> > > > catch here: this assumes the service that returns the
> FlightInfo can
> > >> > > embed
> > >> > > > an accurate schema into it. If that's not true, there'll have
> to be
> > >> > some
> > >> > > > finagling with various ways of getting the actual schema,
> depending on
> > >> > > what
> > >> > > > exactly your service supports.) Once you have a Dataset, you
> can create
> > >> > > an
> > >> > > > ExecPlan and proceed like normal.
> > >> > > >
> > >> > > > Of course, if you then want to get things into Python, R,
> Substrait,
> > >> > > > etc... that requires some more work - especially for Substrait
> where
> > >> > I'm
> > >> > > > not sure how best to encode a custom source like that.
> > >> > > >
> > >> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> > >> > > >
> > >> > > > -David
> > >> > > >
> > >> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > >> > > > > Hello!
> > >> > > > >
> > >> > > > > I have recently started to look into integrating Flight RPC
> with
> > >> > Acero
> > >> > > > > source/sink node.
> > >> > > > >
> > >> > > > > In Flight, the life cycle of a "read" request looks sth like:
> > >> > > > >
> > >> > > > >    - User specifies a URL (e.g. my_storage://my_path) and
> parameter
> > >> > > > (e.g.,
> > >> > > > >    begin = "20220101", end = "20220201")
> > >> > > > >    - Client issue GetFlightInfo and get FlightInfo from server
> > >> > > > >    - Client issue DoGet with the FlightInfo and get a stream
> reader
> > >> > > > >    - Client calls Nextuntil stream is exhausted
> > >> > > > >
> > >> > > > > My question is, how does the above life cycle fit in an Acero
> node?
> > >> > In
> > >> > > > > other words, what are the proper places in Acero node
> lifecycle to
> > >> > > issue
> > >> > > > > the corresponding flight RPC?
> > >> > > > >
> > >> > > > > Appreciate any thoughts,
> > >> > > > > Li
> > >> > > >
> > >> > >
> > >> >
>

Reply via email to