vibhatha commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r957017608


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, 
conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const 
compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const 
dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& 
conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, 
conversion_options));

Review Comment:
   You're correct. I updated such that each relation is setting it's own input 
depending on the nature of the node. For instance `sink` has only single input 
and `filter` also has a single input. That's handled in the updated logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to