vibhatha commented on code in PR #13401: URL: https://github.com/apache/arrow/pull/13401#discussion_r965463633
########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -421,5 +433,144 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& rel.DebugString()); } +namespace { + +Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const compute::Declaration& declr) { + std::shared_ptr<Schema> bind_schema; + if (declr.factory_name == "scan") { + const auto& opts = checked_cast<const dataset::ScanNodeOptions&>(*(declr.options)); + bind_schema = opts.dataset->schema(); + } else if (declr.factory_name == "filter") { + auto input_declr = util::get<compute::Declaration>(declr.inputs[0]); + ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr)); + } else if (declr.factory_name == "sink") { + // Note that the sink has no output_schema + return bind_schema; + } else { + return Status::Invalid("Schema extraction failed, unsupported factory ", + declr.factory_name); + } + return bind_schema; +} + +Result<std::unique_ptr<substrait::ReadRel>> ScanRelationConverter( + const std::shared_ptr<Schema>& schema, const compute::Declaration& declaration, + ExtensionSet* ext_set, const ConversionOptions& conversion_options) { + auto read_rel = make_unique<substrait::ReadRel>(); + const auto& scan_node_options = + checked_cast<const dataset::ScanNodeOptions&>(*declaration.options); + auto dataset = + dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get()); + if (dataset == nullptr) { + return Status::Invalid( + "Can only convert scan node with FileSystemDataset to a Substrait plan."); + } + // set schema + ARROW_ASSIGN_OR_RAISE(auto named_struct, + ToProto(*dataset->schema(), ext_set, conversion_options)); + read_rel->set_allocated_base_schema(named_struct.release()); + + // set local files + auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>(); + for (const auto& file : dataset->files()) { + auto read_rel_lfs_ffs = make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>(); + read_rel_lfs_ffs->set_uri_path(UriFromAbsolutePath(file)); + // set file format + auto format_type_name = dataset->format()->type_name(); + if (format_type_name == "parquet") { + auto parquet_fmt = + make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>(); + read_rel_lfs_ffs->set_allocated_parquet(parquet_fmt.release()); + } else if (format_type_name == "ipc") { + auto arrow_fmt = + make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ArrowReadOptions>(); + read_rel_lfs_ffs->set_allocated_arrow(arrow_fmt.release()); + } else if (format_type_name == "orc") { + auto orc_fmt = + make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_OrcReadOptions>(); + read_rel_lfs_ffs->set_allocated_orc(orc_fmt.release()); + } else { + return Status::NotImplemented("Unsupported file type: ", format_type_name); + } + read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release()); + } + read_rel->set_allocated_local_files(read_rel_lfs.release()); Review Comment: Nice catch. Jira created: https://issues.apache.org/jira/browse/ARROW-17647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org