vibhatha commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r965463633


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -421,5 +433,144 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
       rel.DebugString());
 }
 
+namespace {
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const 
compute::Declaration& declr) {
+  std::shared_ptr<Schema> bind_schema;
+  if (declr.factory_name == "scan") {
+    const auto& opts = checked_cast<const 
dataset::ScanNodeOptions&>(*(declr.options));
+    bind_schema = opts.dataset->schema();
+  } else if (declr.factory_name == "filter") {
+    auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+    ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+  } else if (declr.factory_name == "sink") {
+    // Note that the sink has no output_schema
+    return bind_schema;
+  } else {
+    return Status::Invalid("Schema extraction failed, unsupported factory ",
+                           declr.factory_name);
+  }
+  return bind_schema;
+}
+
+Result<std::unique_ptr<substrait::ReadRel>> ScanRelationConverter(
+    const std::shared_ptr<Schema>& schema, const compute::Declaration& 
declaration,
+    ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+  auto dataset =
+      
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid(
+        "Can only convert scan node with FileSystemDataset to a Substrait 
plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct,
+                        ToProto(*dataset->schema(), ext_set, 
conversion_options));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = 
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path(UriFromAbsolutePath(file));
+    // set file format
+    auto format_type_name = dataset->format()->type_name();
+    if (format_type_name == "parquet") {
+      auto parquet_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>();
+      read_rel_lfs_ffs->set_allocated_parquet(parquet_fmt.release());
+    } else if (format_type_name == "ipc") {
+      auto arrow_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ArrowReadOptions>();
+      read_rel_lfs_ffs->set_allocated_arrow(arrow_fmt.release());
+    } else if (format_type_name == "orc") {
+      auto orc_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_OrcReadOptions>();
+      read_rel_lfs_ffs->set_allocated_orc(orc_fmt.release());
+    } else {
+      return Status::NotImplemented("Unsupported file type: ", 
format_type_name);
+    }
+    read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
+  }
+  read_rel->set_allocated_local_files(read_rel_lfs.release());

Review Comment:
   Nice catch. Jira created: https://issues.apache.org/jira/browse/ARROW-17647



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to