jonkeane commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r818743722



##########
File path: r/src/compute-exec.cpp
##########
@@ -277,4 +287,135 @@ std::shared_ptr<compute::ExecNode> 
ExecNode_ReadFromRecordBatchReader(
   return MakeExecNodeOrStop("source", plan.get(), {}, options);
 }
 
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> Tpch_Dbgen(
+    const std::shared_ptr<compute::ExecPlan>& plan,
+    int scale_factor,
+    std::string table_name
+    ) {
+
+  auto gen = ValueOrStop(arrow::compute::TpchGen::Make(plan.get(), 
scale_factor));
+
+  compute::ExecNode *table;
+  if (table_name == "part") {
+    table = ValueOrStop(gen.Part());
+  } else if (table_name == "supplier") {
+    table = ValueOrStop(gen.Supplier());
+  } else if (table_name == "partsupp") {
+    table = ValueOrStop(gen.PartSupp());
+  } else if (table_name == "customer") {
+    table = ValueOrStop(gen.Customer());
+  } else if (table_name == "nation") {
+    table = ValueOrStop(gen.Nation());
+  } else if (table_name == "lineitem") {
+    table = ValueOrStop(gen.Lineitem());
+  } else if (table_name == "region") {
+    table = ValueOrStop(gen.Region());
+  } else if (table_name == "orders") {
+    table = ValueOrStop(gen.Orders());
+  } else {
+    cpp11::stop("That's not a valid table name");
+  }
+
+  arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+
+  MakeExecNodeOrStop("sink", plan.get(), {table},
+                     compute::SinkNodeOptions{&sink_gen});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
+    bool not_finished_yet =
+      plan->finished().TryAddCallback([&plan] {
+        return [plan](const arrow::Status&) {};
+      });
+
+    if (not_finished_yet) {
+      plan->StopProducing();
+    }
+  }};
+
+  return compute::MakeGeneratorReader(
+    table->output_schema(),
+    [stop_producing, plan, sink_gen] { return sink_gen(); }, gc_memory_pool());
+}
+
+// [[arrow::export]]
+void Tpch_Dbgen_Write(

Review comment:
       This is not the ultimate shape that this function will take — but was my 
first attempt to using the write node. It is currently segfaulting and I've 
commented some of the silly things I've done in-line. 
   
   This function is a bit separate from the PR so if getting this working will 
delay merging the larger PR, I'm happy to pull it into a separate one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to