vibhatha commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r956915382


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
       rel.DebugString());
 }
 
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+    const compute::Declaration& declr, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto rel = make_unique<substrait::Rel>();
+  RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, 
conversion_options));
+  return std::move(rel);
+}
+
+Status SetRelation(const std::unique_ptr<substrait::Rel>& plan,
+                   const std::unique_ptr<substrait::Rel>& partial_plan,
+                   const std::string& factory_name) {
+  if (factory_name == "scan" && partial_plan->has_read()) {
+    plan->set_allocated_read(partial_plan->release_read());
+  } else if (factory_name == "filter" && partial_plan->has_filter()) {
+    plan->set_allocated_filter(partial_plan->release_filter());
+  } else {
+    return Status::NotImplemented("Substrait converter ", factory_name,
+                                  " not supported.");
+  }
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const 
compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const 
dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+                                    ExtensionSet* ext_set,
+                                    std::unique_ptr<substrait::Rel>& rel,
+                                    const ConversionOptions& 
conversion_options) {
+  std::vector<compute::Declaration::Input> inputs = declaration.inputs;
+  for (auto& input : inputs) {
+    auto input_decl = util::get<compute::Declaration>(input);
+    RETURN_NOT_OK(
+        SerializeAndCombineRelations(input_decl, ext_set, rel, 
conversion_options));
+  }
+  const auto& factory_name = declaration.factory_name;
+  ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+  // Note that the sink declaration factory doesn't exist for serialization as
+  // Substrait doesn't deal with a sink node definition
+  std::unique_ptr<substrait::Rel> factory_rel;
+  if (factory_name == "scan") {
+    ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, 
declaration, ext_set,
+                                                             
conversion_options));
+  } else if (factory_name == "filter") {
+    ARROW_ASSIGN_OR_RAISE(
+        factory_rel,
+        FilterRelationConverter(schema, declaration, ext_set, 
conversion_options));
+  } else {
+    return Status::NotImplemented("Factory ", factory_name,
+                                  " not implemented for roundtripping.");
+  }
+
+  if (factory_rel != nullptr) {
+    RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name));
+  } else {
+    return Status::Invalid("Conversion on factory ", factory_name,
+                           " returned an invalid relation");
+  }
+  return Status::OK();
+}
+
+Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration(
+    const compute::Declaration& declaration, ExtensionSet* ext_set,
+    const ConversionOptions& conversion_options) {
+  auto declr_input = declaration.inputs[0];
+  // Note that the input is expected in declaration.
+  // ExecNode inputs are not accepted
+  if (util::get_if<compute::ExecNode*>(&declr_input)) {
+    return Status::NotImplemented("Only support Plans written in Declaration 
format.");
+  }
+  return ToProto(util::get<compute::Declaration>(declr_input), ext_set,
+                 conversion_options);
+}

Review Comment:
   What if the input of the `Declaration` consists of both `ExecNode` and 
`Declaration`? This was the doubt. 
   Since there is a `util::Variant` in the `Declaration` to accept the input, 
added this method to make sure the input is a `Declaration`. 
   
   I also have the doubt with taking the `declaration.inputs[0]`. May be 
iterate this and cover all inputs. But I saw in many nodes, this was hardcoded 
to be 0. But do you think my argument is valid regarding the possibility of 
`Input` being an `ExecNode`?
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to