westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907431339


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = 
std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode 
declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller 
supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a 
Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that 
consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the 
default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the 
Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation 
in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = 
NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an 
execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller 
supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a 
Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that 
consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the 
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the 
Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the 
Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& 
consumer_factory,
+                                          const ExtensionIdRegistry* registry 
= NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming 
the batches
+/// produced by each toplevel Substrait relation when deserializing a 
Substrait Plan.
+using WriteOptionsFactory = 
std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode 
declarations
+///
+/// The output of each top-level Substrait relation will be written to a 
filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a 
Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write 
options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the 
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the 
Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation 
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = 
NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an 
execution plan
+///
+/// The output of the single Substrait relation will be written to a 
filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a 
Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write 
options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the 
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the 
Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation 
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = 
NULLPTR);

Review Comment:
   For both this and the other `DeserializePlan` method it doesn't make too 
much sense for the user to have to provide an entire factory.
   
   Instead could we change `const WriteOptionsFactory& write_options_factory` 
to `dataset::WriteNodeOptions write_options`?
   
   We should also change the other `DeserializePlan` to use 
`std::shared_ptr<compute::SinkNodeConsumer>` instead of `const 
ConsumerFactory&` but that could be part of a future cleanup.



##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -315,6 +315,11 @@ struct ExtensionIdRegistryImpl : ExtensionIdRegistry {
     return Status::OK();
   }
 
+  Status RegisterFunction(std::string uri, std::string name,

Review Comment:
   These id/string APIs confuse me so I apologize if this is off base but I 
think these two `RegisterFunction` methods are backwards.  It seems like it 
makes more sense to just change primary `RegisterFunction` to take in strings 
instead of an `Id`.  The first thing the function does is make a value-copy of 
the `Id` so I don't see any benefit to passing in an `Id`.
   
   Then, if we need it, we could add a helper overload that goes the other way, 
accepting an `Id` and calling `to_string` on each of the pieces.



##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const 
Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, 
std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), 
std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) 
{
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), 
std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = 
write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", 
options}});

Review Comment:
   I agree with you.  Every "sink" node (including writes) should probably 
allow the user to provide a schema for the outgoing batches as we convert from 
ExecBatch to RecordBatch at the edge.  This would allow the user to supply 
custom names as well as custom field and schema-level metadata.
   
   The only thing they can't specify is the field types (those have to match 
what is already the output schema for the node).  So we can either:
   
    * Use schema and throw an error if the user-supplied schema's types don't 
match
    * Create a new type that is just the names & metadata and not types.



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> 
ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = 
NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode 
declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a 
Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the 
default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation 
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> 
MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on 
a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& 
arrow_function_name);

Review Comment:
   A future PR sounds fine.



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   > AFAIU, a Substrait plan must have some consumer for proper execution. Up 
to now, only a ConsumingSinkNodeOption was provided. A NullSinkNodeConsumer is 
useful with the writing capability added here.
   
   But you also added a custom `DeserializePlans` that takes in a 
`WriteNodeFactory` and doesn't require a consumer.  So if the user is only 
doing writes then they should use that method and the null consumer shouldn't 
be needed.



##########
cpp/src/arrow/engine/substrait/extension_set.h:
##########
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <set>

Review Comment:
   Is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to