vibhatha commented on code in PR #13401: URL: https://github.com/apache/arrow/pull/13401#discussion_r956912961
########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -421,5 +431,171 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& rel.DebugString()); } +Result<std::unique_ptr<substrait::Rel>> ToProto( + const compute::Declaration& declr, ExtensionSet* ext_set, + const ConversionOptions& conversion_options) { + auto rel = make_unique<substrait::Rel>(); + RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, rel, conversion_options)); + return std::move(rel); +} + +Status SetRelation(const std::unique_ptr<substrait::Rel>& plan, + const std::unique_ptr<substrait::Rel>& partial_plan, + const std::string& factory_name) { + if (factory_name == "scan" && partial_plan->has_read()) { + plan->set_allocated_read(partial_plan->release_read()); + } else if (factory_name == "filter" && partial_plan->has_filter()) { + plan->set_allocated_filter(partial_plan->release_filter()); + } else { + return Status::NotImplemented("Substrait converter ", factory_name, + " not supported."); + } + return Status::OK(); +} + +Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) { + std::shared_ptr<Schema> bind_schema; + if (declr.factory_name == "scan") { + const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options)); + bind_schema = opts.dataset->schema(); + } else if (declr.factory_name == "filter") { + auto input_declr = util::get<compute::Declaration>(declr.inputs[0]); + ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr)); + } else if (declr.factory_name == "sink") { + // Note that the sink has no output_schema + return bind_schema; + } else { + return Status::Invalid("Schema extraction failed, unsupported factory ", + declr.factory_name); + } + return bind_schema; +} + +Status SerializeAndCombineRelations(const compute::Declaration& declaration, + ExtensionSet* ext_set, + std::unique_ptr<substrait::Rel>& rel, + const ConversionOptions& conversion_options) { + std::vector<compute::Declaration::Input> inputs = declaration.inputs; + for (auto& input : inputs) { + auto input_decl = util::get<compute::Declaration>(input); + RETURN_NOT_OK( + SerializeAndCombineRelations(input_decl, ext_set, rel, conversion_options)); + } + const auto& factory_name = declaration.factory_name; + ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration)); + // Note that the sink declaration factory doesn't exist for serialization as + // Substrait doesn't deal with a sink node definition + std::unique_ptr<substrait::Rel> factory_rel; + if (factory_name == "scan") { + ARROW_ASSIGN_OR_RAISE(factory_rel, ScanRelationConverter(schema, declaration, ext_set, + conversion_options)); + } else if (factory_name == "filter") { + ARROW_ASSIGN_OR_RAISE( + factory_rel, + FilterRelationConverter(schema, declaration, ext_set, conversion_options)); + } else { + return Status::NotImplemented("Factory ", factory_name, + " not implemented for roundtripping."); + } + + if (factory_rel != nullptr) { + RETURN_NOT_OK(SetRelation(rel, factory_rel, factory_name)); + } else { + return Status::Invalid("Conversion on factory ", factory_name, + " returned an invalid relation"); + } + return Status::OK(); +} + +Result<std::unique_ptr<substrait::Rel>> GetRelationFromDeclaration( + const compute::Declaration& declaration, ExtensionSet* ext_set, + const ConversionOptions& conversion_options) { + auto declr_input = declaration.inputs[0]; + // Note that the input is expected in declaration. + // ExecNode inputs are not accepted + if (util::get_if<compute::ExecNode*>(&declr_input)) { + return Status::NotImplemented("Only support Plans written in Declaration format."); + } + return ToProto(util::get<compute::Declaration>(declr_input), ext_set, + conversion_options); +} + +Result<std::unique_ptr<substrait::Rel>> ScanRelationConverter( + const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration, + ExtensionSet* ext_set, const ConversionOptions& conversion_options) { + auto rel = make_unique<substrait::Rel>(); + auto read_rel = make_unique<substrait::ReadRel>(); + const auto& scan_node_options = + checked_cast<const dataset::ScanNodeOptions&>(*declaration.options); + auto dataset = + dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get()); + if (dataset == nullptr) { + return Status::Invalid("Can only convert file system datasets to a Substrait plan."); + } + // set schema + ARROW_ASSIGN_OR_RAISE(auto named_struct, + ToProto(*dataset->schema(), ext_set, conversion_options)); + read_rel->set_allocated_base_schema(named_struct.release()); + + // set local files + auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>(); + for (const auto& file : dataset->files()) { + auto read_rel_lfs_ffs = make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>(); + read_rel_lfs_ffs->set_uri_path("file://" + file); + // set file format + // arrow and feather are temporarily handled via the Parquet format until + // upgraded to the latest Substrait version. Review Comment: You're correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org