ianmcook commented on a change in pull request #9615:
URL: https://github.com/apache/arrow/pull/9615#discussion_r618566919



##########
File path: r/src/r_to_arrow.cpp
##########
@@ -1023,12 +1062,134 @@ std::shared_ptr<arrow::Array> vec_to_arrow(SEXP x,
       options.type, options, gc_memory_pool()));
 
   StopIfNotOk(converter->Extend(x, options.size));
+
   return ValueOrStop(converter->ToArray());
 }
 
 }  // namespace r
 }  // namespace arrow
 
+arrow::Status check_consistent_column_length(
+    const std::vector<std::shared_ptr<arrow::ChunkedArray>>& columns) {
+  if (columns.size()) {
+    int64_t num_rows = columns[0]->length();
+
+    for (const auto& column : columns) {
+      if (column->length() != num_rows) {
+        return arrow::Status::Invalid("All columns must have the same length");
+      }
+    }
+  }
+
+  return arrow::Status::OK();
+}
+
+// [[arrow::export]]
+std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp) {
+  bool infer_schema = !Rf_inherits(schema_sxp, "Schema");
+
+  int num_fields;
+  StopIfNotOk(arrow::r::count_fields(lst, &num_fields));
+
+  // schema + metadata
+  std::shared_ptr<arrow::Schema> schema;
+  StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, 
schema));
+  StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema));
+
+  if (!infer_schema && schema->num_fields() != num_fields) {
+    cpp11::stop("incompatible. schema has %d fields, and %d columns are 
supplied",
+                schema->num_fields(), num_fields);
+  }
+
+  // table
+  std::vector<std::shared_ptr<arrow::ChunkedArray>> columns(num_fields);
+
+  if (!infer_schema) {
+    auto check_name = [&](int j, SEXP, cpp11::r_string name) {
+      std::string cpp_name(name);
+      if (schema->field(j)->name() != cpp_name) {
+        cpp11::stop("field at index %d has name '%s' != '%s'", j + 1,
+                    schema->field(j)->name().c_str(), cpp_name.c_str());
+      }
+    };
+    arrow::r::TraverseDots(lst, num_fields, check_name);
+  }
+
+  auto parallel_tasks =
+      
arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool());
+  std::vector<std::function<arrow::Status()>> delayed_serial_tasks;
+
+  arrow::Status status = arrow::Status::OK();
+
+  auto flatten_lst = arrow::r::FlattenDots(lst, num_fields);
+  std::vector<std::unique_ptr<arrow::r::RConverter>> converters(num_fields);
+
+  // init converters
+  for (int j = 0; j < num_fields && status.ok(); j++) {
+    SEXP x = flatten_lst[j];
+
+    if (Rf_inherits(x, "ChunkedArray")) {
+      columns[j] = cpp11::as_cpp<std::shared_ptr<arrow::ChunkedArray>>(x);
+    } else if (Rf_inherits(x, "Array")) {
+      columns[j] = std::make_shared<arrow::ChunkedArray>(
+          cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x));
+    } else {
+      arrow::r::RConversionOptions options;
+      options.strict = !infer_schema;
+      options.type = schema->field(j)->type();
+      options.size = vctrs::short_vec_size(x);
+
+      // maybe short circuit when zero-copy is possible
+      if (arrow::r::can_reuse_memory(x, options.type)) {
+        columns[j] = std::make_shared<arrow::ChunkedArray>(
+            arrow::r::vec_to_arrow__reuse_memory(x));
+      } else {
+        auto converter_result =
+            arrow::MakeConverter<arrow::r::RConverter, 
arrow::r::RConverterTrait>(
+                options.type, options, gc_memory_pool());
+        if (!converter_result.ok()) {
+          status = converter_result.status();
+          break;
+        }
+        converters[j] = std::move(converter_result.ValueUnsafe());
+      }
+    }
+  }
+  StopIfNotOk(status);
+
+  for (int j = 0; j < num_fields; j++) {
+    auto& converter = converters[j];
+    if (converter != nullptr) {
+      auto task = [j, &converters, &flatten_lst, &columns] {
+        auto& converter = converters[j];
+
+        SEXP x = flatten_lst[j];
+        RETURN_NOT_OK(converter->Extend(x, converter->options().size));
+        ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray());
+        columns[j] = std::make_shared<arrow::ChunkedArray>(array);
+        return arrow::Status::OK();
+      };
+
+      if (converter->Parallel()) {
+        parallel_tasks->Append(task);
+      } else {
+        delayed_serial_tasks.push_back(std::move(task));
+      }
+    }
+  }
+
+  for (auto& task : delayed_serial_tasks) {
+    status &= task();
+  }
+
+  status &= parallel_tasks->Finish();
+  status &= check_consistent_column_length(columns);

Review comment:
       Regarding my comment at 
https://github.com/apache/arrow/pull/9851#discussion_r605919879: looks like you 
already took care of this. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to