vibhatha commented on code in PR #14071: URL: https://github.com/apache/arrow/pull/14071#discussion_r965445985
########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -52,159 +55,239 @@ Status CheckRelCommon(const RelMessage& rel) { return Status::OK(); } -Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& ext_set, - const ConversionOptions& conversion_options) { - static bool dataset_init = false; - if (!dataset_init) { - dataset_init = true; - dataset::internal::Initialize(); +Status CheckReadRelation(const substrait::ReadRel& rel, + const ConversionOptions& conversion_options) { + // NOTE: scan_options->projection is not used by the scanner and thus can't be used + if (rel.has_projection()) { + return Status::NotImplemented("substrait::ReadRel::projection"); } - switch (rel.rel_type_case()) { - case substrait::Rel::RelTypeCase::kRead: { - const auto& read = rel.read(); - RETURN_NOT_OK(CheckRelCommon(read)); + if (rel.has_named_table()) { + if (!conversion_options.named_table_provider) { + return Status::Invalid( + "plan contained a named table but a NamedTableProvider has not been " + "configured"); + } + + return Status::OK(); + } - ARROW_ASSIGN_OR_RAISE(auto base_schema, - FromProto(read.base_schema(), ext_set, conversion_options)); - auto num_columns = static_cast<int>(base_schema->fields().size()); + if (!rel.has_local_files()) { + return Status::NotImplemented( + "substrait::ReadRel with read_type other than LocalFiles"); + } - auto scan_options = std::make_shared<dataset::ScanOptions>(); - scan_options->use_threads = true; + if (rel.local_files().has_advanced_extension()) { + return Status::NotImplemented("substrait::ReadRel::LocalFiles::advanced_extension"); + } - if (read.has_filter()) { - ARROW_ASSIGN_OR_RAISE(scan_options->filter, - FromProto(read.filter(), ext_set, conversion_options)); - } + return Status::OK(); +} - if (read.has_projection()) { - // NOTE: scan_options->projection is not used by the scanner and thus can't be - // used for this - return Status::NotImplemented("substrait::ReadRel::projection"); - } +Status CheckFileItem(const substrait::ReadRel_LocalFiles_FileOrFiles& file_item) { + if (file_item.partition_index() != 0) { + return Status::NotImplemented( + "non-default substrait::ReadRel::LocalFiles::FileOrFiles::partition_index"); + } - if (read.has_named_table()) { - if (!conversion_options.named_table_provider) { - return Status::Invalid( - "plan contained a named table but a NamedTableProvider has not been " - "configured"); - } - const NamedTableProvider& named_table_provider = - conversion_options.named_table_provider; - const substrait::ReadRel::NamedTable& named_table = read.named_table(); - std::vector<std::string> table_names(named_table.names().begin(), - named_table.names().end()); - ARROW_ASSIGN_OR_RAISE(compute::Declaration source_decl, - named_table_provider(table_names)); - return DeclarationInfo{std::move(source_decl), num_columns}; - } + if (file_item.start() != 0) { + return Status::NotImplemented( + "non-default substrait::ReadRel::LocalFiles::FileOrFiles::start offset"); + } + + if (file_item.length() != 0) { + return Status::NotImplemented( + "non-default substrait::ReadRel::LocalFiles::FileOrFiles::length"); + } + + return Status::OK(); +} + +Status CheckFilePathUri(const ::arrow::internal::Uri& uri) { + if (!uri.is_file_scheme()) { + return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", uri.ToString(), + ") with other than local filesystem (file:///)"); + } + + if (uri.port() != -1) { + return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", uri.ToString(), + ") should not have a port number in path"); + } + + if (!uri.query_string().empty()) { + return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", uri.ToString(), + ") should not have a query string in path"); + } + + return Status::OK(); +} + +// Other helper functions +Status DiscoverFilesFromDir(std::shared_ptr<fs::LocalFileSystem>& local_fs, + std::string dirpath, std::vector<fs::FileInfo>& rel_fpaths) { + // Define a selector for a recursive descent + fs::FileSelector selector; + selector.base_dir = dirpath; + selector.recursive = true; + + ARROW_ASSIGN_OR_RAISE(auto file_infos, local_fs->GetFileInfo(selector)); + for (auto& file_info : file_infos) { + if (file_info.type() == fs::FileType::File) { + rel_fpaths.push_back(file_info); + } + } + + return Status::OK(); +} + +// Function that implements "FromProto" for a substrait::ReadRel (read relation) +Result<DeclarationInfo> FromReadRelation(const substrait::ReadRel& rel, + const ExtensionSet& ext_set, + const ConversionOptions& conversion_options) { + // Validate the defined read relation + RETURN_NOT_OK(CheckReadRelation(rel, conversion_options)); + + // Get the base schema for the read relation + ARROW_ASSIGN_OR_RAISE(auto base_schema, + FromProto(rel.base_schema(), ext_set, conversion_options)); - if (!read.has_local_files()) { + auto num_columns = static_cast<int>(base_schema->fields().size()); + auto scan_options = std::make_shared<dataset::ScanOptions>(); + scan_options->use_threads = true; + + if (rel.has_filter()) { + ARROW_ASSIGN_OR_RAISE(scan_options->filter, + FromProto(rel.filter(), ext_set, conversion_options)); + } + + if (rel.has_named_table()) { + const NamedTableProvider& named_provider = conversion_options.named_table_provider; + const substrait::ReadRel::NamedTable& named_table = rel.named_table(); + std::vector<std::string> table_names(named_table.names().begin(), + named_table.names().end()); + + ARROW_ASSIGN_OR_RAISE(compute::Declaration source_decl, named_provider(table_names)); + return DeclarationInfo{std::move(source_decl), num_columns}; + } + + // Determine format based on the first FileOrFiles item + std::shared_ptr<dataset::FileFormat> format; + if (rel.local_files().items_size() > 0) { + const auto& first_file = rel.local_files().items(0); + + switch (first_file.file_format_case()) { + case substrait::ReadRel_LocalFiles_FileOrFiles::kParquet: + format = std::make_shared<dataset::ParquetFileFormat>(); + break; + + case substrait::ReadRel_LocalFiles_FileOrFiles::kArrow: + format = std::make_shared<dataset::IpcFileFormat>(); + break; + + default: + // TODO: maybe check for ".feather" or ".arrows"? return Status::NotImplemented( - "substrait::ReadRel with read_type other than LocalFiles"); + "unknown file format ", + "(see substrait::ReadRel::LocalFiles::FileOrFiles::file_format)"); + } + } + + // Use a filesystem instance to gather each file + auto filesystem = std::make_shared<fs::LocalFileSystem>(); + std::vector<fs::FileInfo> files; + for (const auto& item : rel.local_files().items()) { + // Validate properties of the `FileOrFiles` item + RETURN_NOT_OK(CheckFileItem(item)); + + // Extract and parse the read relation's source URI + ::arrow::internal::Uri item_uri; + switch (item.path_type_case()) { + case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath: + RETURN_NOT_OK(item_uri.Parse(item.uri_path())); + break; + + case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile: + RETURN_NOT_OK(item_uri.Parse(item.uri_file())); + break; + + case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder: + RETURN_NOT_OK(item_uri.Parse(item.uri_folder())); + break; + + default: + RETURN_NOT_OK(item_uri.Parse(item.uri_path_glob())); + break; + } + + // Validate the URI before processing + RETURN_NOT_OK(CheckFilePathUri(item_uri)); + + // Handle the URI as appropriate + switch (item.path_type_case()) { + case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile: { + files.emplace_back(item_uri.path(), fs::FileType::File); + break; } - if (read.local_files().has_advanced_extension()) { - return Status::NotImplemented( - "substrait::ReadRel::LocalFiles::advanced_extension"); + case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder: { Review Comment: may be? ```suggestion case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFolder: { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org