drin commented on code in PR #14071:
URL: https://github.com/apache/arrow/pull/14071#discussion_r966533636


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -169,45 +157,90 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
               "non-default 
substrait::ReadRel::LocalFiles::FileOrFiles::length");
         }
 
-        path = path.substr(7);
+        // Extract and parse the read relation's source URI
+        ::arrow::internal::Uri item_uri;
         switch (item.path_type_case()) {
-          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath: {
-            ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
-            if (file.type() == fs::FileType::File) {
-              files.push_back(std::move(file));
-            } else if (file.type() == fs::FileType::Directory) {
-              fs::FileSelector selector;
-              selector.base_dir = path;
-              selector.recursive = true;
-              ARROW_ASSIGN_OR_RAISE(auto discovered_files,
-                                    filesystem->GetFileInfo(selector));
-              std::move(files.begin(), files.end(), 
std::back_inserter(discovered_files));
-            }
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_path()));
             break;
-          }
-          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile: {
-            files.emplace_back(path, fs::FileType::File);
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFile:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_file()));
+            break;
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFolder:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_folder()));
+            break;
+
+          default:
+            RETURN_NOT_OK(item_uri.Parse(item.uri_path_glob()));
+            break;
+        }
+
+        // Validate the URI before processing
+        if (!item_uri.is_file_scheme()) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") with other than local filesystem 
(file:///)");
+        }
+
+        if (item_uri.port() != -1) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") should not have a port number in 
path");
+        }
+
+        if (!item_uri.query_string().empty()) {
+          return Status::NotImplemented("substrait::ReadRel::LocalFiles item 
(",
+                                        item_uri.ToString(),
+                                        ") should not have a query string in 
path");
+        }
+
+        switch (item.file_format_case()) {
+          case substrait::ReadRel_LocalFiles_FileOrFiles::kParquet:
+            format = std::make_shared<dataset::ParquetFileFormat>();
+            break;
+          case substrait::ReadRel_LocalFiles_FileOrFiles::kArrow:
+            format = std::make_shared<dataset::IpcFileFormat>();
+            break;
+          default:
+            return Status::NotImplemented(
+                "unknown 
substrait::ReadRel::LocalFiles::FileOrFiles::file_format");
+        }
+
+        // Handle the URI as appropriate
+        switch (item.path_type_case()) {
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFile: {
+            files.emplace_back(item_uri.path(), fs::FileType::File);
             break;
           }
-          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder: {
-            fs::FileSelector selector;
-            selector.base_dir = path;
-            selector.recursive = true;
-            ARROW_ASSIGN_OR_RAISE(auto discovered_files,
-                                  filesystem->GetFileInfo(selector));
-            std::move(discovered_files.begin(), discovered_files.end(),
-                      std::back_inserter(files));
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriFolder: {
+            RETURN_NOT_OK(DiscoverFilesFromDir(filesystem, item_uri.path(), 
files));
             break;
           }
-          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPathGlob: {
-            ARROW_ASSIGN_OR_RAISE(auto discovered_files,
-                                  fs::internal::GlobFiles(filesystem, path));
-            std::move(discovered_files.begin(), discovered_files.end(),
-                      std::back_inserter(files));
+
+          case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath: {
+            // Let the filesystem API decide for us if the URI is a file or a 
directory
+            ARROW_ASSIGN_OR_RAISE(auto file_info,
+                                  filesystem->GetFileInfo(item_uri.path()));
+
+            // push the FileInfo if it's a file; else, recurse into the 
directory
+            if (file_info.type() == fs::FileType::File) {
+              files.push_back(std::move(file_info));
+            } else if (file_info.type() == fs::FileType::Directory) {
+              RETURN_NOT_OK(DiscoverFilesFromDir(filesystem, item_uri.path(), 
files));
+            }
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to