jonkeane commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r818750831



##########
File path: r/src/compute-exec.cpp
##########
@@ -277,4 +287,135 @@ std::shared_ptr<compute::ExecNode> 
ExecNode_ReadFromRecordBatchReader(
   return MakeExecNodeOrStop("source", plan.get(), {}, options);
 }
 
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> Tpch_Dbgen(
+    const std::shared_ptr<compute::ExecPlan>& plan,
+    int scale_factor,
+    std::string table_name
+    ) {
+
+  auto gen = ValueOrStop(arrow::compute::TpchGen::Make(plan.get(), 
scale_factor));
+
+  compute::ExecNode *table;
+  if (table_name == "part") {
+    table = ValueOrStop(gen.Part());
+  } else if (table_name == "supplier") {
+    table = ValueOrStop(gen.Supplier());
+  } else if (table_name == "partsupp") {
+    table = ValueOrStop(gen.PartSupp());
+  } else if (table_name == "customer") {
+    table = ValueOrStop(gen.Customer());
+  } else if (table_name == "nation") {
+    table = ValueOrStop(gen.Nation());
+  } else if (table_name == "lineitem") {
+    table = ValueOrStop(gen.Lineitem());
+  } else if (table_name == "region") {
+    table = ValueOrStop(gen.Region());
+  } else if (table_name == "orders") {
+    table = ValueOrStop(gen.Orders());
+  } else {
+    cpp11::stop("That's not a valid table name");
+  }
+
+  arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+
+  MakeExecNodeOrStop("sink", plan.get(), {table},
+                     compute::SinkNodeOptions{&sink_gen});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
+    bool not_finished_yet =
+      plan->finished().TryAddCallback([&plan] {
+        return [plan](const arrow::Status&) {};
+      });
+
+    if (not_finished_yet) {
+      plan->StopProducing();
+    }
+  }};
+
+  return compute::MakeGeneratorReader(
+    table->output_schema(),
+    [stop_producing, plan, sink_gen] { return sink_gen(); }, gc_memory_pool());
+}
+
+// [[arrow::export]]
+void Tpch_Dbgen_Write(
+    const std::shared_ptr<compute::ExecPlan>& plan,
+    int scale_factor,
+    std::string table_name,
+    const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
+    arrow::dataset::ExistingDataBehavior existing_data_behavior, int 
max_partitions
+) {
+  auto gen = ValueOrStop(arrow::compute::TpchGen::Make(plan.get(), 
scale_factor));
+
+  compute::ExecNode *table;
+  if (table_name == "part") {
+    table = ValueOrStop(gen.Part());
+  } else if (table_name == "supplier") {
+    table = ValueOrStop(gen.Supplier());
+  } else if (table_name == "partsupp") {
+    table = ValueOrStop(gen.PartSupp());
+  } else if (table_name == "customer") {
+    table = ValueOrStop(gen.Customer());
+  } else if (table_name == "nation") {
+    table = ValueOrStop(gen.Nation());
+  } else if (table_name == "lineitem") {
+    table = ValueOrStop(gen.Lineitem());
+  } else if (table_name == "region") {
+    table = ValueOrStop(gen.Region());
+  } else if (table_name == "orders") {
+    table = ValueOrStop(gen.Orders());
+  } else {
+    cpp11::stop("That's not a valid table name");
+  }
+
+  // TODO: unhardcode this once it's working
+  auto base_path =  base_dir + "/parquet_dataset";
+  filesystem->CreateDir(base_path);
+
+  auto format = std::make_shared<ds::ParquetFileFormat>();
+
+  ds::FileSystemDatasetWriteOptions write_options;
+  write_options.file_write_options = format->DefaultWriteOptions();
+  write_options.existing_data_behavior = 
ds::ExistingDataBehavior::kDeleteMatchingPartitions;
+  write_options.filesystem = filesystem;
+  write_options.base_dir = base_path;
+  write_options.partitioning = arrow::dataset::Partitioning::Default();
+  write_options.basename_template = "part{i}.parquet";
+  write_options.max_partitions = 1024;
+
+  // TODO: this had a checked_cast in front of it in the code I adapted it from
+  // but I ran into namespace issues when doing it so I took it out to see if 
it
+  // worked, but maybe that's what's causing the sefault?
+  const ds::WriteNodeOptions options =
+    ds::WriteNodeOptions{write_options, table->output_schema()};

Review comment:
       This might be exactly what's causing the seg fault, the code I saw that 
did this had `checked_cast` in front of this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to