westonpace commented on code in PR #13040:
URL: https://github.com/apache/arrow/pull/13040#discussion_r865534886
##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -413,9 +428,108 @@ Result<compute::ExecNode*>
MakeWriteNode(compute::ExecPlan* plan,
return node;
}
+namespace {
+
+class TeeNode : public compute::MapNode, public compute::BackpressureControl {
Review Comment:
You don't need to extend `compute::BackpressureControl` here (and you don't
need `backpressure_control_`. That's only needed for the `ConsumingSink` to
give the `SinkNodeConsumer` something they can hold onto to pause/resume.
Since you're calling pause/resume from the node itself you can just use
`this` for your calls to `Pause`/`Resume`
##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -413,9 +428,108 @@ Result<compute::ExecNode*>
MakeWriteNode(compute::ExecPlan* plan,
return node;
}
+namespace {
+
+class TeeNode : public compute::MapNode, public compute::BackpressureControl {
+ public:
+ TeeNode(compute::ExecPlan* plan, std::vector<compute::ExecNode*> inputs,
+ std::shared_ptr<Schema> output_schema,
+ std::unique_ptr<internal::DatasetWriter> dataset_writer,
+ FileSystemDatasetWriteOptions write_options, bool async_mode)
+ : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode),
+ dataset_writer_(std::move(dataset_writer)),
+ write_options_(std::move(write_options)),
+ backpressure_control_(this) {}
+
+ static Result<compute::ExecNode*> Make(compute::ExecPlan* plan,
+ std::vector<compute::ExecNode*>
inputs,
+ const compute::ExecNodeOptions&
options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TeeNode"));
+
+ const WriteNodeOptions write_node_options =
+ checked_cast<const WriteNodeOptions&>(options);
+ const FileSystemDatasetWriteOptions& write_options =
write_node_options.write_options;
+ const std::shared_ptr<Schema> schema = inputs[0]->output_schema();
+
+ ARROW_ASSIGN_OR_RAISE(auto dataset_writer,
+ internal::DatasetWriter::Make(write_options));
+
+ return plan->EmplaceNode<TeeNode>(plan, std::move(inputs),
std::move(schema),
+ std::move(dataset_writer),
std::move(write_options),
+ /*async_mode=*/true);
+ }
+
+ const char* kind_name() const override { return "TeeNode"; }
+
+ Result<compute::ExecBatch> DoTee(const compute::ExecBatch& batch) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> record_batch,
+ batch.ToRecordBatch(output_schema()));
+ ARROW_RETURN_NOT_OK(WriteNextBatch(std::move(record_batch),
batch.guarantee));
+ return batch;
+ }
+
+ Status WriteNextBatch(std::shared_ptr<RecordBatch> batch,
+ compute::Expression guarantee) {
+ return WriteBatch(
+ batch, guarantee, write_options_,
+ [this](std::shared_ptr<RecordBatch> next_batch,
+ const Partitioning::PartitionPathFormat& destination) {
+ return task_group_.AddTask([this, next_batch, destination] {
+ util::tracing::Span span;
+ START_COMPUTE_SPAN(span, "Tee",
+ {{"tee.base_dir", ToStringExtra()},
+ {"tee.length", next_batch->num_rows()}});
Review Comment:
This span isn't measuring anything terribly useful that I can tell.
`WriteRecordBatch` returns immediately after it queues the batch into the
appropriate file queue so this span won't capture the actual time it took to
write the batch. Also, the timing here will be included as part of
`InputReceived` anyways. The `base_dir` might be slightly useful as an event
to the current parent span but I'm not sure that is what `ToStringExtra()` is
going to return.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]