vibhatha commented on code in PR #13401: URL: https://github.com/apache/arrow/pull/13401#discussion_r957017608
########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& rel.DebugString()); } +Result<std::unique_ptr<substrait::Rel>> ToProto( + const compute::Declaration& declr, ExtensionSet* ext_set, + const ConversionOptions& conversion_options) { + auto rel = make_unique<substrait::Rel>(); + RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options)); + return std::move(rel); +} + +Status SetRelation(const std::unique_ptr<substrait::Rel>& plan, + const std::unique_ptr<substrait::Rel>& partial_plan, + const std::string& factory_name) { + if (factory_name == "scan" && partial_plan->has_read()) { + plan->set_allocated_read(partial_plan->release_read()); + } else if (factory_name == "filter" && partial_plan->has_filter()) { + plan->set_allocated_filter(partial_plan->release_filter()); + } else { + return Status::NotImplemented("Substrait converter ", factory_name, + " not supported."); + } + return Status::OK(); +} + +Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) { + std::shared_ptr<Schema> bind_schema; + if (declr.factory_name == "scan") { + const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options)); + bind_schema = opts.dataset->schema(); + } else if (declr.factory_name == "filter") { + auto input_declr = util::get<compute::Declaration>(declr.inputs[0]); + ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr)); + } else if (declr.factory_name == "sink") { + // Note that the sink has no output_schema + return bind_schema; + } else { + return Status::Invalid("Schema extraction failed, unsupported factory ", + declr.factory_name); + } + return bind_schema; +} + +Status SerializeAndCombineRelations(const compute::Declaration& declaration, + ExtensionSet* ext_set, + std::unique_ptr<substrait::Rel>& rel, + const ConversionOptions& conversion_options) { + std::vector<compute::Declaration::Input> inputs = declaration.inputs; + for (auto& input : inputs) { + auto input_decl = util::get<compute::Declaration>(input); + RETURN_NOT_OK( + SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options)); Review Comment: You're correct. I updated such that each relation is setting it's own input depending on the nature of the node. For instance `sink` has only single input and `filter` also has a single input. That's handled in the updated logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org