westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r900418662
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -116,7 +119,7 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
} else {
return Status::NotImplemented(
"substrait::ReadRel::LocalFiles::FileOrFiles::format "
- "other than FILE_FORMAT_PARQUET");
+ "other than FILE_FORMAT_PARQUET with an unrecognized file
extension");
Review Comment:
```suggestion
"other than FILE_FORMAT_PARQUET");
```
Minor nit: In the near future I am hoping we will be dropping the
`.ends_with` checks and will not be using file extension to determine the
format. It's non-standard (i.e. not a documented behavior in the Substrait
spec) and a little hidden. The latest Substrait spec already has support for
specifying the IPC format.
So I'd rather not add a clarification that we will just be turning around
and removing.
##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const
Buffer& buf,
return FromProto(rel, ext_set);
}
-Result<std::vector<compute::Declaration>> DeserializePlans(
- const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+ compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+ const ConsumerFactory& consumer_factory) {
+ return [&consumer_factory](compute::Declaration input,
std::vector<std::string> names) {
+ std::shared_ptr<compute::ExecNodeOptions> options =
+ std::make_shared<compute::ConsumingSinkNodeOptions>(
+ compute::ConsumingSinkNodeOptions{consumer_factory(),
std::move(names)});
+ return compute::Declaration::Sequence(
+ {std::move(input), {"consuming_sink", options}});
+ };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+ std::vector<std::string> names)
{
+ int names_size = static_cast<int>(names.size());
+ if (names_size == 0) {
+ return input;
+ }
+ std::vector<compute::Expression> expressions;
+ for (int i = 0; i < names_size; i++) {
+ expressions.push_back(compute::field_ref(FieldRef(i)));
+ }
+ return compute::Declaration::Sequence(
+ {std::move(input),
+ {"project",
+ compute::ProjectNodeOptions{std::move(expressions),
std::move(names)}}});
+}
+
+} // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+ const WriteOptionsFactory& write_options_factory) {
+ return [&write_options_factory](compute::Declaration input,
+ std::vector<std::string> names) {
+ compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+ std::shared_ptr<compute::ExecNodeOptions> options =
write_options_factory();
+ return compute::Declaration::Sequence({std::move(projected), {"write",
options}});
+ };
+}
+
+static Result<std::vector<compute::Declaration>> DeserializePlans(
Review Comment:
Is this method internal? I don't see it in any .h file. If so, can we
include it in the anonymous namespace.
That being said, why not make it external and add it to serde.h? I've been
hoping for a while we can move away from forcing users to use the consuming
sink node so I think making this external would give flexibility to new users.
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
virtual Future<> Finish() = 0;
};
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
+ public:
+ Status Init(const std::shared_ptr<Schema>&, BackpressureControl*) override {
+ return Status::OK();
+ }
+ Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
+ Future<> Finish() override { return Status::OK(); }
+
+ public:
+ static std::shared_ptr<NullSinkNodeConsumer> Make() {
+ return std::make_shared<NullSinkNodeConsumer>();
+ }
Review Comment:
Is this needed? Is it for consistency with other nodes? It seems callers
could just call `make_shared` themselves.
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
virtual Future<> Finish() = 0;
};
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
Review Comment:
Is this for benchmarking purposes? Or some other reason?
Would it instead make sense to have no-op declaration factory that simply
didn't add any new nodes?
I'm not really against this approach, just pondering the options.
##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -108,25 +109,50 @@ class SubstraitExecutor {
} // namespace
Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
- const Buffer& substrait_buffer) {
- ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+ const Buffer& substrait_buffer, const ExtensionIdRegistry* extid_registry,
+ compute::FunctionRegistry* func_registry) {
// TODO(ARROW-15732)
compute::ExecContext exec_context(arrow::default_memory_pool(),
- ::arrow::internal::GetCpuThreadPool());
+ ::arrow::internal::GetCpuThreadPool(),
func_registry);
+ ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context));
SubstraitExecutor executor(std::move(plan), exec_context);
- RETURN_NOT_OK(executor.Init(substrait_buffer));
+ RETURN_NOT_OK(executor.Init(substrait_buffer, extid_registry));
ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+ // check closing here, not in destructor, to expose error to caller
+ RETURN_NOT_OK(executor.Close());
return sink_reader;
}
Result<std::shared_ptr<Buffer>> SerializeJsonPlan(const std::string&
substrait_json) {
return engine::internal::SubstraitFromJSON("Plan", substrait_json);
}
+Result<std::vector<compute::Declaration>> DeserializePlans(
+ const Buffer& buffer, const ExtensionIdRegistry* registry) {
+ return engine::DeserializePlans(
+ buffer, []() { return std::make_shared<compute::NullSinkNodeConsumer>();
},
+ registry);
+}
+
std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry() {
return nested_extension_id_registry(default_extension_id_registry());
}
+Status RegisterFunction(ExtensionIdRegistry& registry, const std::string&
id_uri,
+ const std::string& id_name,
+ const std::string& arrow_function_name) {
+ const std::string& id_uri_sym = registry.AddExternalSymbol(id_uri);
+ const std::string& id_name_sym = registry.AddExternalSymbol(id_name);
+ const std::string& arrow_function_name_sym =
+ registry.AddExternalSymbol(arrow_function_name);
Review Comment:
I'm pretty sure this is redundant because `RegisterFunction` already takes
ownership of the Arrow function name (in `function_names_`)
##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
/// \brief Retrieve a RecordBatchReader from a Substrait plan.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>>
ExecuteSerializedPlan(
- const Buffer& substrait_buffer);
+ const Buffer& substrait_buffer, const ExtensionIdRegistry* registry =
NULLPTR,
+ compute::FunctionRegistry* func_registry = NULLPTR);
/// \brief Get a Serialized Plan from a Substrait JSON plan.
/// This is a helper method for Python tests.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
const std::string& substrait_json);
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
+/// including a no-op consumer of the sink output
+///
Review Comment:
Can you add a brief description here explaining why a user might want to do
this? It could be something like "this can be useful for testing or
benchmarking purposes"
##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
/// \brief Retrieve a RecordBatchReader from a Substrait plan.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>>
ExecuteSerializedPlan(
- const Buffer& substrait_buffer);
+ const Buffer& substrait_buffer, const ExtensionIdRegistry* registry =
NULLPTR,
+ compute::FunctionRegistry* func_registry = NULLPTR);
/// \brief Get a Serialized Plan from a Substrait JSON plan.
/// This is a helper method for Python tests.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
const std::string& substrait_json);
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+ const Buffer& buf, const ExtensionIdRegistry* registry);
+
/// \brief Make a nested registry with the default registry as parent.
/// See arrow::engine::nested_extension_id_registry for details.
ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry>
MakeExtensionIdRegistry();
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on
a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+ const std::string& id_uri,
+ const std::string& id_name,
+ const std::string&
arrow_function_name);
+
+ARROW_ENGINE_EXPORT const std::string& default_extension_types_uri();
Review Comment:
Is the intention to also use this as the URI for extension functions? I
think I'm a fan of simpler URIs like:
`https://arrow.apache.org/substrait/v1/extensions.yaml`
Either way, you can probably safely ignore this comment until we fix the
Substrait picture here to allow for YAML extension/inheritance.
##########
cpp/src/arrow/engine/substrait/extension_set.h:
##########
@@ -95,6 +96,17 @@ class ARROW_ENGINE_EXPORT ExtensionIdRegistry {
virtual Status CanRegisterFunction(Id,
const std::string& arrow_function_name)
const = 0;
virtual Status RegisterFunction(Id, std::string arrow_function_name) = 0;
+
+ /// \brief Add a symbol external to the plan yet used in an Id.
+ ///
+ /// This ensures the symbol, which is only viewed but not held by the Id,
lives while
+ /// the extension set does. Symbols appearing in the Substrait plan are
already held.
+ const std::string& AddExternalSymbol(const std::string& symbol) {
+ return *external_symbols.insert(symbol).first;
+ }
Review Comment:
I think we should have...
```
// Registers the function and takes ownership of uri, name
RegisterFunction(std::string uri, std::string name, std::string
arrow_function_name) = 0;
// Registers the function without taking ownership of id
RegisterFunction(Id id, std::string arrow_function_name) = 0;
```
Then `external_symbols` can be an implementation detail.
##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
/// \brief Retrieve a RecordBatchReader from a Substrait plan.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>>
ExecuteSerializedPlan(
- const Buffer& substrait_buffer);
+ const Buffer& substrait_buffer, const ExtensionIdRegistry* registry =
NULLPTR,
+ compute::FunctionRegistry* func_registry = NULLPTR);
/// \brief Get a Serialized Plan from a Substrait JSON plan.
/// This is a helper method for Python tests.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
const std::string& substrait_json);
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+ const Buffer& buf, const ExtensionIdRegistry* registry);
+
/// \brief Make a nested registry with the default registry as parent.
/// See arrow::engine::nested_extension_id_registry for details.
ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry>
MakeExtensionIdRegistry();
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on
a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+ const std::string& id_uri,
+ const std::string& id_name,
+ const std::string&
arrow_function_name);
Review Comment:
I'm not a big fan of the fact that we'd have to call `RegisterFunction`
twice for every UDF (once for the Arrow registry and once for the Substrait
registry). I think I'd like to someday get to a point where there is a special
URI / convention so that anytime the consumer encounters a URI like
`https://arrow.apache.org/substrait/v1/custom.yaml` it will automatically
convert the Substrait args to Arrow args using some convention and then make a
call to the function registry.
However, that can be for the future. This seems like a reasonable approach
for the moment.
##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory =
std::function<std::shared_ptr<compute::SinkNodeConsumer>
/// message
/// \param[in] consumer_factory factory function for generating the node that
consumes
/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
/// Plan is returned here.
/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
/// Substrait Plan
ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
const Buffer& buf, const ConsumerFactory& consumer_factory,
- ExtensionSet* ext_set_out = NULLPTR);
+ const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out =
NULLPTR);
+/// \brief Deserializes a single-relation Substrait Plan message to an
execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that
consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the
Substrait
+/// Plan
Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
const ConsumerFactory&
consumer_factory,
+ const ExtensionIdRegistry* registry
= NULLPTR,
ExtensionSet* ext_set_out = NULLPTR);
+/// Factory function type for generating the write options of a node consuming
the batches
+/// produced by each toplevel Substrait relation when deserializing a
Substrait Plan.
+using WriteOptionsFactory =
std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write
options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+ const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+ const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out =
NULLPTR);
+
Review Comment:
For symmetry it might be nice to have a `DeserializePlan` which takes a
single instance of `dataset::WriteNodeOptions` and returns an error if the
underlying plan has multiple top-level relations. I imagine this would be a
useful convenience to have as well for callers.
##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory =
std::function<std::shared_ptr<compute::SinkNodeConsumer>
/// message
/// \param[in] consumer_factory factory function for generating the node that
consumes
/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
/// Plan is returned here.
/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
/// Substrait Plan
ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
const Buffer& buf, const ConsumerFactory& consumer_factory,
- ExtensionSet* ext_set_out = NULLPTR);
+ const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out =
NULLPTR);
+/// \brief Deserializes a single-relation Substrait Plan message to an
execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that
consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the
Substrait
+/// Plan
Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
const ConsumerFactory&
consumer_factory,
+ const ExtensionIdRegistry* registry
= NULLPTR,
ExtensionSet* ext_set_out = NULLPTR);
+/// Factory function type for generating the write options of a node consuming
the batches
+/// produced by each toplevel Substrait relation when deserializing a
Substrait Plan.
+using WriteOptionsFactory =
std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write
options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
Review Comment:
This will need some unit test coverage.
##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory =
std::function<std::shared_ptr<compute::SinkNodeConsumer>
/// message
/// \param[in] consumer_factory factory function for generating the node that
consumes
/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
/// Plan is returned here.
/// \return a vector of ExecNode declarations, one for each toplevel relation
in the
/// Substrait Plan
ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
const Buffer& buf, const ConsumerFactory& consumer_factory,
- ExtensionSet* ext_set_out = NULLPTR);
+ const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out =
NULLPTR);
+/// \brief Deserializes a single-relation Substrait Plan message to an
execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that
consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the
default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the
Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the
Substrait
+/// Plan
Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
const ConsumerFactory&
consumer_factory,
+ const ExtensionIdRegistry* registry
= NULLPTR,
ExtensionSet* ext_set_out = NULLPTR);
+/// Factory function type for generating the write options of a node consuming
the batches
+/// produced by each toplevel Substrait relation when deserializing a
Substrait Plan.
+using WriteOptionsFactory =
std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
Review Comment:
```suggestion
/// \brief Deserializes a Substrait Plan message to a list of ExecNode
declarations
///
/// The output of each top-level Substrait relation will be written to a
filesystem.
/// `write_options_factory` can be used to control write behavior.
///
/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait Plan
```
These overloads are likely to be a little confusing to users. The docstring
should clearly explain the difference between the two.
We can expand on the docstring for the other overload a little too...
```
The output of each top-level Substrait relation will be sent to a caller
supplied
consumer function provided by consumer_factory
```
##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const
Buffer& buf,
return FromProto(rel, ext_set);
}
-Result<std::vector<compute::Declaration>> DeserializePlans(
- const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+ compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+ const ConsumerFactory& consumer_factory) {
+ return [&consumer_factory](compute::Declaration input,
std::vector<std::string> names) {
+ std::shared_ptr<compute::ExecNodeOptions> options =
+ std::make_shared<compute::ConsumingSinkNodeOptions>(
+ compute::ConsumingSinkNodeOptions{consumer_factory(),
std::move(names)});
+ return compute::Declaration::Sequence(
+ {std::move(input), {"consuming_sink", options}});
+ };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+ std::vector<std::string> names)
{
+ int names_size = static_cast<int>(names.size());
+ if (names_size == 0) {
+ return input;
+ }
+ std::vector<compute::Expression> expressions;
+ for (int i = 0; i < names_size; i++) {
+ expressions.push_back(compute::field_ref(FieldRef(i)));
+ }
+ return compute::Declaration::Sequence(
+ {std::move(input),
+ {"project",
+ compute::ProjectNodeOptions{std::move(expressions),
std::move(names)}}});
+}
+
+} // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+ const WriteOptionsFactory& write_options_factory) {
+ return [&write_options_factory](compute::Declaration input,
+ std::vector<std::string> names) {
+ compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+ std::shared_ptr<compute::ExecNodeOptions> options =
write_options_factory();
+ return compute::Declaration::Sequence({std::move(projected), {"write",
options}});
Review Comment:
Using a project node for this seems like overkill. Probably
`WriteNodeOptions` should just include a new field which is an array of names
(or maybe replace `custom_metadata` with `custom_schema`). However, this
change does not have to be part of this PR. I can create a JIRA for this
cleanup later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]