bkietz commented on issue #40583:
URL: https://github.com/apache/arrow/issues/40583#issuecomment-2007632423
The basic features of skyhook are a configurable server side scan/compute,
and efficient transport of resulting buffers to the client. This does not
correspond to a file format; file formats compartmentalize reading data files
independent of I/O. A CSV formatted file might reside in a string literal or an
S3 bucket, but skyhook's files are actually IPC or parquet and must reside on a
ceph server. Writing skyhook as a file format therefore breaks conventions and
contracts relied on by the dataset API.
However in the context of acero skyhook has a very natural structure:
- A source `ExecNode` on the client which requests specific scan/compute
from the server, forwards returned batches into an `ExecPlan`, and mediates
back pressure.
- A ceph CLS which maintains a catalog of data files for scanning,
instantiates server side `ExecPlan`s to fulfil client requests, and responds to
back pressure from the client.
Some design notes which follow from the above:
Currently skyhook supports only predicate and projection pushdown, but
there's no reason to forbid specifying arbitrary computation to the server. For
example an aggregation performed on the server could save significant network
overhead when the result is small. Although Arrow's `Expression`s are
serializable this feature isn't designed for stability. Substrait is a more
capable and stable tool intended for transmitting arbitrary execution plans.
It'd be worthwhile to replace projection/filter `Expression` serialization with
a substrait subplan to be executed on the server. This should actually simplify
skyhook's server side code since it can reuse
`arrow::engine::SerializePlan/DeserializePlan`.
Furthermore, skyhook need not be bound to a single file format or to output
small enough to be materialized as a single `Table`. On the server an arbitrary
collection of data files can be wrapped into a catalog of datasets. For
example, a collection of csv and parquet data files could be wrapped in a
UnionDataset, cataloged as a single table `nyc-taxi`. The server side
complexity of a dataset can be invisible to client configuring a scan; clients
should only need to reference these datasets by name and schema, using a
substrait `NamedTable` message.
Instead of having each skyhook scan negotiate a new connection, the
connection to a ceph server should be factored out to an independent client
side object. This way validation of the connection, checking for registration
of a compatible CLS, and caching of the server's catalog can all be
compartmentalized into a single construction.
The purpose of the arrow monorepo is cross platform library code, whereas
skyhook is comprised of a specialized service and client pair. Skyhook should
therefore probably be relocated outside the arrow monorepo. Moreover skyhook is
compelling as a demonstration of both the extensibility and usability of acero
from third party code, which will be emphasized by keeping it in an independent
project.
Some example code illustrating what using a skyhook source node on the
client could look like:
```c++
// This only needs to be done once:
ARROW_ASSIGN_OR_RAISE(auto skyhook_connection, MakeSkyhookConnection(
config_path, data_pool, user_name, cluster_name));
// Display the catalog:
for (auto [name, schema] : skyhook_connection.catalog()) {
std::cout << name << ": " << schema->ToString() << std::endl;
}
Result<std::shared_ptr<Table>> GetTripCostsForYear(int year) {
// Declare a server side plan which will aggregate trip costs
// grouped by tag in the specified year:
auto server_side_plan = Declaration::Sequence({
{"named_table", skyhook_connection.MakeNamedTableOptions("nyc-taxi")},
{"filter", FilterNodeOptions(equal(field_ref("year"), literal(year)))},
{"aggregate", AggregateNodeOptions({{"hash_sum", "cost", "cost_sum"}},
{"tag"})},
});
// The client side plan then uses the server side plan to configure its
source node:
std::shared_ptr<Table> output_table;
auto client_side_plan = Declaration::Sequence({
{"skyhook_source", SkyhookSourceNodeOptions(server_side_plan,
skyhook_connection)},
{"table_sink", TableSinkNodeOptions(&output_table)},
});
// Run the plan, return the resulting table
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make());
ARROW_RETURN_NOT_OK(client_side_plan.AddToPlan(plan.get()));
auto finished = plan->finished();
ARROW_RETURN_NOT_OK(finished.status());
return output_table;
}
```
A summary of the additions necessary to make that work, client side:
- SkyhookSourceNode (subclass of ExecNode)
- in StartProducing() override: serialize `server_side_plan` to substrait
and send to server
- receives `ceph::bufferlist`s asynchronously, converts to `ExecBatch`es,
then pushes to the next node with `this->output()->InputReceived()`
- forwards PauseProducing() and ResumeProducing() calls to the server
- SkyhookSourceNodeOptions (subclass of ExecNodeOptions)
- in client initialization: register SkyhookSourceNode factory named
"skyhook_source"
... and on the server side:
- Maintain a catalog mapping table names to datasets, accessible as a
NamedTableProvider
- Respond to new connections with a summary of the catalog, mapping names to
schemas
- Receive and excecute `server_side_plan`s from clients
- deserialize from substrait
- construct and run `ExecPlan`
- convert each generated `ExecBatch` to `ceph::bufferlist` and transmit to
the client
- pause and resume when indicated by the client
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]