vibhatha commented on code in PR #14071:
URL: https://github.com/apache/arrow/pull/14071#discussion_r965440777


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -52,159 +55,239 @@ Status CheckRelCommon(const RelMessage& rel) {
   return Status::OK();
 }
 
-Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const 
ExtensionSet& ext_set,
-                                  const ConversionOptions& conversion_options) 
{
-  static bool dataset_init = false;
-  if (!dataset_init) {
-    dataset_init = true;
-    dataset::internal::Initialize();
+Status CheckReadRelation(const substrait::ReadRel& rel,
+                         const ConversionOptions& conversion_options) {
+  // NOTE: scan_options->projection is not used by the scanner and thus can't 
be used
+  if (rel.has_projection()) {
+    return Status::NotImplemented("substrait::ReadRel::projection");
   }
 
-  switch (rel.rel_type_case()) {
-    case substrait::Rel::RelTypeCase::kRead: {
-      const auto& read = rel.read();
-      RETURN_NOT_OK(CheckRelCommon(read));
+  if (rel.has_named_table()) {
+    if (!conversion_options.named_table_provider) {
+      return Status::Invalid(
+          "plan contained a named table but a NamedTableProvider has not been "
+          "configured");
+    }
+
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(auto base_schema,
-                            FromProto(read.base_schema(), ext_set, 
conversion_options));
-      auto num_columns = static_cast<int>(base_schema->fields().size());
+  if (!rel.has_local_files()) {
+    return Status::NotImplemented(
+        "substrait::ReadRel with read_type other than LocalFiles");
+  }
 
-      auto scan_options = std::make_shared<dataset::ScanOptions>();
-      scan_options->use_threads = true;
+  if (rel.local_files().has_advanced_extension()) {
+    return 
Status::NotImplemented("substrait::ReadRel::LocalFiles::advanced_extension");
+  }
 
-      if (read.has_filter()) {
-        ARROW_ASSIGN_OR_RAISE(scan_options->filter,
-                              FromProto(read.filter(), ext_set, 
conversion_options));
-      }
+  return Status::OK();
+}
 
-      if (read.has_projection()) {
-        // NOTE: scan_options->projection is not used by the scanner and thus 
can't be
-        // used for this
-        return Status::NotImplemented("substrait::ReadRel::projection");
-      }
+Status CheckFileItem(const substrait::ReadRel_LocalFiles_FileOrFiles& 
file_item) {
+  if (file_item.partition_index() != 0) {
+    return Status::NotImplemented(
+        "non-default 
substrait::ReadRel::LocalFiles::FileOrFiles::partition_index");
+  }
 
-      if (read.has_named_table()) {
-        if (!conversion_options.named_table_provider) {
-          return Status::Invalid(
-              "plan contained a named table but a NamedTableProvider has not 
been "
-              "configured");
-        }
-        const NamedTableProvider& named_table_provider =
-            conversion_options.named_table_provider;
-        const substrait::ReadRel::NamedTable& named_table = read.named_table();
-        std::vector<std::string> table_names(named_table.names().begin(),
-                                             named_table.names().end());
-        ARROW_ASSIGN_OR_RAISE(compute::Declaration source_decl,
-                              named_table_provider(table_names));
-        return DeclarationInfo{std::move(source_decl), num_columns};
-      }
+  if (file_item.start() != 0) {
+    return Status::NotImplemented(
+        "non-default substrait::ReadRel::LocalFiles::FileOrFiles::start 
offset");
+  }
+
+  if (file_item.length() != 0) {
+    return Status::NotImplemented(
+        "non-default substrait::ReadRel::LocalFiles::FileOrFiles::length");
+  }
+
+  return Status::OK();
+}
+
+Status CheckFilePathUri(const ::arrow::internal::Uri& uri) {
+  if (!uri.is_file_scheme()) {
+    return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", 
uri.ToString(),
+                                  ") with other than local filesystem 
(file:///)");
+  }
+
+  if (uri.port() != -1) {
+    return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", 
uri.ToString(),
+                                  ") should not have a port number in path");
+  }
+
+  if (!uri.query_string().empty()) {
+    return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", 
uri.ToString(),
+                                  ") should not have a query string in path");
+  }
+
+  return Status::OK();
+}
+
+// Other helper functions
+Status DiscoverFilesFromDir(std::shared_ptr<fs::LocalFileSystem>& local_fs,
+                            std::string dirpath, std::vector<fs::FileInfo>& 
rel_fpaths) {
+  // Define a selector for a recursive descent
+  fs::FileSelector selector;
+  selector.base_dir = dirpath;
+  selector.recursive = true;
+
+  ARROW_ASSIGN_OR_RAISE(auto file_infos, local_fs->GetFileInfo(selector));
+  for (auto& file_info : file_infos) {
+    if (file_info.type() == fs::FileType::File) {
+      rel_fpaths.push_back(file_info);
+    }
+  }
+
+  return Status::OK();
+}
+
+// Function that implements "FromProto" for a substrait::ReadRel (read 
relation)
+Result<DeclarationInfo> FromReadRelation(const substrait::ReadRel& rel,

Review Comment:
   I think your approach is better, may be we can standardize this across other 
relations too. But I believe it could be better to do it in another PR. I see 
these components in general
   
   1. validation
   2. excluding what is not supported via checks
   3. logic to populate the relational entities
   4. may be helper functions like (file discovery, etc)
   
   At the moment we have all these within the switch case which is not very 
easy to go through. 
   
   Shall we create a separate JIRA and work it there? cc @westonpace 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to