vibhatha commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r907408216


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -316,5 +323,97 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       rel.DebugString());
 }
 
+namespace {
+// TODO: add other types
+enum ArrowRelationType : uint8_t {
+  SCAN,
+  FILTER,
+  PROJECT,
+  JOIN,
+  AGGREGATE,
+};
+
+const std::map<std::string, ArrowRelationType> enum_map{
+    {"scan", ArrowRelationType::SCAN},           {"filter", 
ArrowRelationType::FILTER},
+    {"project", ArrowRelationType::PROJECT},     {"join", 
ArrowRelationType::JOIN},
+    {"aggregate", ArrowRelationType::AGGREGATE},
+};
+
+struct ExtractRelation {
+  explicit ExtractRelation(substrait::Rel* rel, ExtensionSet* ext_set)
+      : rel_(rel), ext_set_(ext_set) {}
+
+  Status AddRelation(const compute::Declaration& declaration) {
+    const std::string& rel_name = declaration.factory_name;
+    switch (enum_map.find(rel_name)->second) {
+      case ArrowRelationType::SCAN:
+        return AddReadRelation(declaration);
+      case ArrowRelationType::FILTER:
+        return Status::NotImplemented("Filter operator not supported.");
+      case ArrowRelationType::PROJECT:
+        return Status::NotImplemented("Project operator not supported.");
+      case ArrowRelationType::JOIN:
+        return Status::NotImplemented("Join operator not supported.");
+      case ArrowRelationType::AGGREGATE:
+        return Status::NotImplemented("Aggregate operator not supported.");
+      default:
+        return Status::Invalid("Unsupported factory name :", rel_name);
+    }
+  }
+
+  Status AddReadRelation(const compute::Declaration& declaration) {
+    auto read_rel = internal::make_unique<substrait::ReadRel>();
+    const auto& scan_node_options =
+        internal::checked_cast<const 
dataset::ScanNodeOptions&>(*declaration.options);
+
+    const auto& fds = internal::checked_cast<const 
dataset::FileSystemDataset&>(
+        *scan_node_options.dataset);
+
+    // set schema
+    ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*fds.schema(), ext_set_));
+    read_rel->set_allocated_base_schema(named_struct.release());
+
+    // set local files
+    auto read_rel_lfs = internal::make_unique<substrait::ReadRel_LocalFiles>();
+    for (const auto& file : fds.files()) {
+      auto read_rel_lfs_ffs =
+          internal::make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+      read_rel_lfs_ffs->set_uri_path("file://" + file);
+
+      // set file format
+      auto format_type_name = fds.format()->type_name();
+      if (format_type_name == "parquet" || format_type_name == "arrow" ||
+          format_type_name == "feather") {
+        read_rel_lfs_ffs->set_format(
+            substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET);
+      } else {
+        return Status::Invalid("Unsupported file type : ", format_type_name);
+      }
+      read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
+    }
+    *read_rel->mutable_local_files() = *read_rel_lfs.get();
+
+    rel_->set_allocated_read(read_rel.release());

Review Comment:
   Sure I will add one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to