vibhatha commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r907461716
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -316,5 +323,97 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
rel.DebugString());
}
+namespace {
+// TODO: add other types
+enum ArrowRelationType : uint8_t {
+ SCAN,
+ FILTER,
+ PROJECT,
+ JOIN,
+ AGGREGATE,
+};
+
+const std::map<std::string, ArrowRelationType> enum_map{
+ {"scan", ArrowRelationType::SCAN}, {"filter",
ArrowRelationType::FILTER},
+ {"project", ArrowRelationType::PROJECT}, {"join",
ArrowRelationType::JOIN},
+ {"aggregate", ArrowRelationType::AGGREGATE},
+};
+
+struct ExtractRelation {
+ explicit ExtractRelation(substrait::Rel* rel, ExtensionSet* ext_set)
+ : rel_(rel), ext_set_(ext_set) {}
+
+ Status AddRelation(const compute::Declaration& declaration) {
+ const std::string& rel_name = declaration.factory_name;
+ switch (enum_map.find(rel_name)->second) {
+ case ArrowRelationType::SCAN:
+ return AddReadRelation(declaration);
+ case ArrowRelationType::FILTER:
+ return Status::NotImplemented("Filter operator not supported.");
+ case ArrowRelationType::PROJECT:
+ return Status::NotImplemented("Project operator not supported.");
+ case ArrowRelationType::JOIN:
+ return Status::NotImplemented("Join operator not supported.");
+ case ArrowRelationType::AGGREGATE:
+ return Status::NotImplemented("Aggregate operator not supported.");
+ default:
+ return Status::Invalid("Unsupported factory name :", rel_name);
+ }
+ }
+
+ Status AddReadRelation(const compute::Declaration& declaration) {
+ auto read_rel = internal::make_unique<substrait::ReadRel>();
+ const auto& scan_node_options =
+ internal::checked_cast<const
dataset::ScanNodeOptions&>(*declaration.options);
+
+ const auto& fds = internal::checked_cast<const
dataset::FileSystemDataset&>(
+ *scan_node_options.dataset);
+
+ // set schema
+ ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*fds.schema(), ext_set_));
+ read_rel->set_allocated_base_schema(named_struct.release());
+
+ // set local files
+ auto read_rel_lfs = internal::make_unique<substrait::ReadRel_LocalFiles>();
+ for (const auto& file : fds.files()) {
+ auto read_rel_lfs_ffs =
+ internal::make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+ read_rel_lfs_ffs->set_uri_path("file://" + file);
+
+ // set file format
+ auto format_type_name = fds.format()->type_name();
+ if (format_type_name == "parquet" || format_type_name == "arrow" ||
+ format_type_name == "feather") {
+ read_rel_lfs_ffs->set_format(
+ substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET);
+ } else {
+ return Status::Invalid("Unsupported file type : ", format_type_name);
+ }
+ read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
+ }
+ *read_rel->mutable_local_files() = *read_rel_lfs.get();
+
+ rel_->set_allocated_read(read_rel.release());
Review Comment:
JIRA created here: https://issues.apache.org/jira/browse/ARROW-16909
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]