westonpace commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r965395555


##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serialize a declaration and into a substrait::Plan.

Review Comment:
   ```suggestion
   /// \brief Serialize a declaration into a substrait::Plan.
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +433,144 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
       rel.DebugString());
 }
 
+namespace {
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const 
compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const 
dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Result<std::unique_ptr<substrait::ReadRel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>& schema, const compute::Declaration& 
declaration,
+    ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+  auto dataset =
+      
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid(
+        "Can only convert scan node with FileSystemDataset to a Substrait 
plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct,
+                        ToProto(*dataset->schema(), ext_set, 
conversion_options));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = 
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path(UriFromAbsolutePath(file));
+    // set file format
+    auto format_type_name = dataset->format()->type_name();
+    if (format_type_name == "parquet") {
+      auto parquet_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>();
+      read_rel_lfs_ffs->set_allocated_parquet(parquet_fmt.release());

Review Comment:
   Minor nit but creating a unique pointer only to release it on the next line 
seems a bit odd.  Maybe just:
   
   ```
   read_rel_lfs_ffs->set_allocated_parquet(new 
substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>());
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -162,36 +170,40 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
         }
 
         path = path.substr(7);
-        if (item.path_type_case() ==
-            substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
-          ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
-          if (file.type() == fs::FileType::File) {
-            files.push_back(std::move(file));
-          } else if (file.type() == fs::FileType::Directory) {
+        switch (item.path_type_case()) {
+          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath: {
+            ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
+            if (file.type() == fs::FileType::File) {
+              files.push_back(std::move(file));
+            } else if (file.type() == fs::FileType::Directory) {
+              fs::FileSelector selector;
+              selector.base_dir = path;
+              selector.recursive = true;
+              ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+                                    filesystem->GetFileInfo(selector));
+              std::move(files.begin(), files.end(), 
std::back_inserter(discovered_files));
+            }
+            break;
+          }
+          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile: {
+            files.emplace_back(path, fs::FileType::File);
+            break;
+          }
+          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder: {
             fs::FileSelector selector;
             selector.base_dir = path;
             selector.recursive = true;
             ARROW_ASSIGN_OR_RAISE(auto discovered_files,
                                   filesystem->GetFileInfo(selector));
-            std::move(files.begin(), files.end(), 
std::back_inserter(discovered_files));
+            std::move(discovered_files.begin(), discovered_files.end(),
+                      std::back_inserter(files));
+            break;
           }
-        }
-        if (item.path_type_case() ==
-            substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
-          files.emplace_back(path, fs::FileType::File);
-        } else if (item.path_type_case() ==
-                   substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
-          fs::FileSelector selector;
-          selector.base_dir = path;
-          selector.recursive = true;
-          ARROW_ASSIGN_OR_RAISE(auto discovered_files, 
filesystem->GetFileInfo(selector));
-          std::move(discovered_files.begin(), discovered_files.end(),
-                    std::back_inserter(files));
-        } else {
-          ARROW_ASSIGN_OR_RAISE(auto discovered_files,
-                                fs::internal::GlobFiles(filesystem, path));
-          std::move(discovered_files.begin(), discovered_files.end(),
-                    std::back_inserter(files));
+          default:

Review Comment:
   Don't use `default` for the glob case.  Instead do something like...
   
   ```
   case substrait::ReadRel_LocalFiles_FileOrFiles::kUriGlob: {
     ...
     break;
   }
   default: {
     return Status::Invalid("Unrecognized file type in LocalFiles");
   }
   ```
   
   This will future-proof us against additions to the enum.



##########
cpp/src/arrow/util/io_util.cc:
##########
@@ -1867,7 +1867,16 @@ Result<std::unique_ptr<TemporaryDir>> 
TemporaryDir::Make(const std::string& pref
       [&](const NativePathString& base_dir) -> 
Result<std::unique_ptr<TemporaryDir>> {
     Status st;
     for (int attempt = 0; attempt < 3; ++attempt) {
-      PlatformFilename fn(base_dir + kNativeSep + base_name + kNativeSep);
+      // Note: certain temporary directories of MacOS contains a trailing slash
+      // Handling the base_dir with trailing slash
+      PlatformFilename fn;
+      if (base_dir.back() == kNativeSep) {
+        PlatformFilename fn_base_dir(base_dir);
+        PlatformFilename fn_base_name(base_name + kNativeSep);
+        fn = fn_base_dir.Join(fn_base_name);
+      } else {
+        fn = PlatformFilename(base_dir + kNativeSep + base_name + kNativeSep);
+      }

Review Comment:
   ```suggestion
         PlatformFilename fn_base_dir(base_dir);
         fn = fn_base_dir.Join(base_name) + kNativeSep;
   ```
   
   Does this work?  `Join` already has logic in place to test for a trailing 
separator so I'm not sure why we need to also test for one here.



##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serialize a declaration and into a substrait::Plan.
+///
+/// Note that, this is a part of roundtripping test API and not

Review Comment:
   ```suggestion
   /// Note that, this is a part of a roundtripping test API and not
   ```



##########
cpp/src/arrow/util/uri.h:
##########
@@ -104,5 +104,10 @@ std::string UriEncodeHost(const std::string& host);
 ARROW_EXPORT
 bool IsValidUriScheme(const arrow::util::string_view s);
 
+/// Create a file uri from a given URI

Review Comment:
   ```suggestion
   /// Create a file uri from a given absolute path
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -162,36 +170,40 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
         }
 
         path = path.substr(7);
-        if (item.path_type_case() ==
-            substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
-          ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
-          if (file.type() == fs::FileType::File) {
-            files.push_back(std::move(file));
-          } else if (file.type() == fs::FileType::Directory) {
+        switch (item.path_type_case()) {
+          case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath: {

Review Comment:
   I kind of prefer `substrait::ReadRel::LocalFiles::FileOrFiles` over 
`substrait::ReadRel_LocalFiles_FileOrFiles` but I see we have some precedence 
for this case so I don't think you have to fix it for this PR.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +433,144 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
       rel.DebugString());
 }
 
+namespace {
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const 
compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const 
dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Result<std::unique_ptr<substrait::ReadRel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>& schema, const compute::Declaration& 
declaration,
+    ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+  auto dataset =
+      
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid(
+        "Can only convert scan node with FileSystemDataset to a Substrait 
plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct,
+                        ToProto(*dataset->schema(), ext_set, 
conversion_options));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = 
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path(UriFromAbsolutePath(file));
+    // set file format
+    auto format_type_name = dataset->format()->type_name();
+    if (format_type_name == "parquet") {
+      auto parquet_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>();
+      read_rel_lfs_ffs->set_allocated_parquet(parquet_fmt.release());
+    } else if (format_type_name == "ipc") {
+      auto arrow_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ArrowReadOptions>();
+      read_rel_lfs_ffs->set_allocated_arrow(arrow_fmt.release());
+    } else if (format_type_name == "orc") {
+      auto orc_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_OrcReadOptions>();
+      read_rel_lfs_ffs->set_allocated_orc(orc_fmt.release());
+    } else {
+      return Status::NotImplemented("Unsupported file type: ", 
format_type_name);
+    }
+    read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
+  }
+  read_rel->set_allocated_local_files(read_rel_lfs.release());

Review Comment:
   Can we have a follow-up JIRA to add support for scan options projection & 
filter?  I don't think it should be done as part of this JIRA since it is 
changing.



##########
cpp/src/arrow/engine/substrait/relation_internal.h:
##########
@@ -40,9 +40,19 @@ struct DeclarationInfo {
   int num_columns;
 };
 
+/// \brief Convert a Substrait Rel object to an Acero declaration
 ARROW_ENGINE_EXPORT
 Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
                                   const ConversionOptions&);
 
+/// \brief Convert an Acero Declaration to a Substrait Rel
+///
+/// Note that in order to provide a generic interface for ToProto for
+/// declaration. The ExecNode or ExecPlan is not used in this context as 
Declaration

Review Comment:
   ```suggestion
   /// Note that, in order to provide a generic interface for ToProto,
   /// the ExecNode or ExecPlan are not used in this context as Declaration
   ```



##########
cpp/src/arrow/util/uri.h:
##########
@@ -104,5 +104,10 @@ std::string UriEncodeHost(const std::string& host);
 ARROW_EXPORT
 bool IsValidUriScheme(const arrow::util::string_view s);
 
+/// Create a file uri from a given URI
+/// file:///<some_path>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/engine/substrait/plan_internal.h:
##########
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
     const substrait::Plan& plan,
     const ExtensionIdRegistry* registry = default_extension_id_registry());
 
+/// \brief Serialize a declaration and into a substrait::Plan.
+///
+/// Note that, this is a part of roundtripping test API and not
+/// designed to use in production

Review Comment:
   ```suggestion
   /// designed for use in production
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to