westonpace commented on code in PR #14071:
URL: https://github.com/apache/arrow/pull/14071#discussion_r981976488


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -96,6 +96,25 @@ Status CheckRelCommon(const RelMessage& rel) {
   return Status::OK();
 }
 
+// Other helper functions

Review Comment:
   ```suggestion
   ```
   Not sure how useful this comment is



##########
cpp/src/arrow/util/uri.h:
##########
@@ -45,6 +45,9 @@ class ARROW_EXPORT Uri {
   /// explicit scheme.
   std::string scheme() const;
 
+  /// Convenience function that returns true if the scheme() is "file"
+  bool is_file_scheme() const;

Review Comment:
   If someone doesn't know the internals of Uri (e.g. knows that there is an 
`is_file_uri_` property) then this seems like a rather odd convenience.  I 
think it's ok, it just kind of caught me off guard.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -212,43 +204,102 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
               "non-default 
substrait::ReadRel::LocalFiles::FileOrFiles::length");
         }
 
-        path = path.substr(7);
+        // Extract and parse the read relation's source URI
+        ::arrow::internal::Uri item_uri;
         switch (item.path_type_case()) {
-          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath: {
-            ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
-            if (file.type() == fs::FileType::File) {
-              files.push_back(std::move(file));
-            } else if (file.type() == fs::FileType::Directory) {
-              fs::FileSelector selector;
-              selector.base_dir = path;
-              selector.recursive = true;
-              ARROW_ASSIGN_OR_RAISE(auto discovered_files,
-                                    filesystem->GetFileInfo(selector));
-              std::move(files.begin(), files.end(), 
std::back_inserter(discovered_files));
-            }
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_path()));
             break;
-          }
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFile:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_file()));
+            break;
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFolder:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_folder()));
+            break;
+
+          default:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_path_glob()));
+            break;
+        }
+
+        // Validate the URI before processing
+        if (!item_uri.is_file_scheme()) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") with non-filesystem scheme 
(file:///)");

Review Comment:
   ```suggestion
                                           ") with non-file scheme (file:///)");
   ```
   Technically, `s3://` is an S3 "filesystem".



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -212,43 +204,102 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
               "non-default 
substrait::ReadRel::LocalFiles::FileOrFiles::length");
         }
 
-        path = path.substr(7);
+        // Extract and parse the read relation's source URI
+        ::arrow::internal::Uri item_uri;
         switch (item.path_type_case()) {
-          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath: {
-            ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
-            if (file.type() == fs::FileType::File) {
-              files.push_back(std::move(file));
-            } else if (file.type() == fs::FileType::Directory) {
-              fs::FileSelector selector;
-              selector.base_dir = path;
-              selector.recursive = true;
-              ARROW_ASSIGN_OR_RAISE(auto discovered_files,
-                                    filesystem->GetFileInfo(selector));
-              std::move(files.begin(), files.end(), 
std::back_inserter(discovered_files));
-            }
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_path()));
             break;
-          }
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFile:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_file()));
+            break;
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFolder:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_folder()));
+            break;
+
+          default:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_path_glob()));
+            break;
+        }
+
+        // Validate the URI before processing
+        if (!item_uri.is_file_scheme()) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") with non-filesystem scheme 
(file:///)");
+        }
+
+        if (item_uri.port() != -1) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") should not have a port number in 
path");
+        }
+
+        if (!item_uri.query_string().empty()) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") should not have a query string in 
path");
+        }
+
+        switch (item.file_format_case()) {
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kParquet:
+            format = std::make_shared<dataset::ParquetFileFormat>();
+            break;
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kArrow:
+            format = std::make_shared<dataset::IpcFileFormat>();
+            break;
+          default:
+            // TODO: maybe check for ".feather" or ".arrows"?

Review Comment:
   ```suggestion
   ```
   We aren't checking file extensions here so no need.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -120,9 +140,8 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
                               FromProto(read.filter(), ext_set, 
conversion_options));
       }
 
+      // NOTE: scan_options->projection is not used by the scanner and thus 
can't be used

Review Comment:
   ```suggestion
   ```
   This isn't really true



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -96,6 +96,25 @@ Status CheckRelCommon(const RelMessage& rel) {
   return Status::OK();
 }
 
+// Other helper functions
+Status DiscoverFilesFromDir(const std::shared_ptr<fs::LocalFileSystem>& 
local_fs,
+                            const std::string& dirpath,
+                            std::vector<fs::FileInfo>* rel_fpaths) {
+  // Define a selector for a recursive descent
+  fs::FileSelector selector;
+  selector.base_dir = dirpath;
+  selector.recursive = true;
+
+  ARROW_ASSIGN_OR_RAISE(auto file_infos, local_fs->GetFileInfo(selector));
+  for (auto& file_info : file_infos) {
+    if (file_info.IsFile()) {
+      rel_fpaths->push_back(file_info);

Review Comment:
   ```suggestion
         rel_fpaths->push_back(std::move(file_info));
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -52,159 +55,239 @@ Status CheckRelCommon(const RelMessage& rel) {
   return Status::OK();
 }
 
-Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const 
ExtensionSet& ext_set,
-                                  const ConversionOptions& conversion_options) 
{
-  static bool dataset_init = false;
-  if (!dataset_init) {
-    dataset_init = true;
-    dataset::internal::Initialize();
+Status CheckReadRelation(const substrait::ReadRel& rel,
+                         const ConversionOptions& conversion_options) {
+  // NOTE: scan_options->projection is not used by the scanner and thus can't 
be used

Review Comment:
   ```suggestion
     // NOTE: scan_options->projection is not yet handled by the scanner and 
thus can't be used
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to