jonkeane commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r818746754
##########
File path: r/src/compute-exec.cpp
##########
@@ -277,4 +287,135 @@ std::shared_ptr<compute::ExecNode>
ExecNode_ReadFromRecordBatchReader(
return MakeExecNodeOrStop("source", plan.get(), {}, options);
}
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> Tpch_Dbgen(
+ const std::shared_ptr<compute::ExecPlan>& plan,
+ int scale_factor,
+ std::string table_name
+ ) {
+
+ auto gen = ValueOrStop(arrow::compute::TpchGen::Make(plan.get(),
scale_factor));
+
+ compute::ExecNode *table;
+ if (table_name == "part") {
+ table = ValueOrStop(gen.Part());
+ } else if (table_name == "supplier") {
+ table = ValueOrStop(gen.Supplier());
+ } else if (table_name == "partsupp") {
+ table = ValueOrStop(gen.PartSupp());
+ } else if (table_name == "customer") {
+ table = ValueOrStop(gen.Customer());
+ } else if (table_name == "nation") {
+ table = ValueOrStop(gen.Nation());
+ } else if (table_name == "lineitem") {
+ table = ValueOrStop(gen.Lineitem());
+ } else if (table_name == "region") {
+ table = ValueOrStop(gen.Region());
+ } else if (table_name == "orders") {
+ table = ValueOrStop(gen.Orders());
+ } else {
+ cpp11::stop("That's not a valid table name");
+ }
+
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+
+ MakeExecNodeOrStop("sink", plan.get(), {table},
+ compute::SinkNodeOptions{&sink_gen});
+
+ StopIfNotOk(plan->Validate());
+ StopIfNotOk(plan->StartProducing());
+
+ // If the generator is destroyed before being completely drained, inform plan
+ std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
+ bool not_finished_yet =
+ plan->finished().TryAddCallback([&plan] {
+ return [plan](const arrow::Status&) {};
+ });
+
+ if (not_finished_yet) {
+ plan->StopProducing();
+ }
+ }};
+
+ return compute::MakeGeneratorReader(
+ table->output_schema(),
+ [stop_producing, plan, sink_gen] { return sink_gen(); }, gc_memory_pool());
+}
+
+// [[arrow::export]]
+void Tpch_Dbgen_Write(
+ const std::shared_ptr<compute::ExecPlan>& plan,
+ int scale_factor,
+ std::string table_name,
+ const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
+ arrow::dataset::ExistingDataBehavior existing_data_behavior, int
max_partitions
+) {
+ auto gen = ValueOrStop(arrow::compute::TpchGen::Make(plan.get(),
scale_factor));
+
+ compute::ExecNode *table;
+ if (table_name == "part") {
+ table = ValueOrStop(gen.Part());
+ } else if (table_name == "supplier") {
+ table = ValueOrStop(gen.Supplier());
+ } else if (table_name == "partsupp") {
+ table = ValueOrStop(gen.PartSupp());
+ } else if (table_name == "customer") {
+ table = ValueOrStop(gen.Customer());
+ } else if (table_name == "nation") {
+ table = ValueOrStop(gen.Nation());
+ } else if (table_name == "lineitem") {
+ table = ValueOrStop(gen.Lineitem());
+ } else if (table_name == "region") {
+ table = ValueOrStop(gen.Region());
+ } else if (table_name == "orders") {
+ table = ValueOrStop(gen.Orders());
+ } else {
+ cpp11::stop("That's not a valid table name");
+ }
+
+ // TODO: unhardcode this once it's working
+ auto base_path = base_dir + "/parquet_dataset";
+ filesystem->CreateDir(base_path);
+
+ auto format = std::make_shared<ds::ParquetFileFormat>();
+
+ ds::FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.existing_data_behavior =
ds::ExistingDataBehavior::kDeleteMatchingPartitions;
+ write_options.filesystem = filesystem;
+ write_options.base_dir = base_path;
+ write_options.partitioning = arrow::dataset::Partitioning::Default();
+ write_options.basename_template = "part{i}.parquet";
+ write_options.max_partitions = 1024;
Review comment:
A lot of this is hard coded for now to get it working — one thing that
surprise me a little bit is that we have slightly different write options for
this than we do for datasets. I think I need to wire up
`FileSystemDatasetWriteOptions` (or figure out if it gets translated in the
right way to get to that point with our R wiring of `FileWriteOptions`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]