This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 74756051c4 ARROW-16855: [C++] Adding Read Relation ToProto (#13401)
74756051c4 is described below
commit 74756051c4f6a8b13a40057f586817d56198d4ba
Author: Vibhatha Lakmal Abeykoon <[email protected]>
AuthorDate: Thu Sep 8 20:41:48 2022 +0530
ARROW-16855: [C++] Adding Read Relation ToProto (#13401)
This is the initial PR to set the util functions and structure to include
the `ToProto` functionality to relations.
Here the objective is to create an ACERO relation by interpretting what is
included in a Substrait-Relation.
In this PR the `read` relation ToProto is added.
Authored-by: Vibhatha Abeykoon <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/engine/substrait/extension_set.cc | 19 ++
cpp/src/arrow/engine/substrait/plan_internal.cc | 16 ++
cpp/src/arrow/engine/substrait/plan_internal.h | 14 +
.../arrow/engine/substrait/relation_internal.cc | 201 ++++++++++++--
cpp/src/arrow/engine/substrait/relation_internal.h | 10 +
cpp/src/arrow/engine/substrait/serde.cc | 17 ++
cpp/src/arrow/engine/substrait/serde.h | 24 ++
cpp/src/arrow/engine/substrait/serde_test.cc | 291 ++++++++++++++++++++-
cpp/src/arrow/filesystem/localfs_test.cc | 14 +-
cpp/src/arrow/filesystem/util_internal.cc | 1 +
cpp/src/arrow/util/io_util.cc | 4 +-
cpp/src/arrow/util/uri.cc | 10 +
cpp/src/arrow/util/uri.h | 4 +
13 files changed, 588 insertions(+), 37 deletions(-)
diff --git a/cpp/src/arrow/engine/substrait/extension_set.cc
b/cpp/src/arrow/engine/substrait/extension_set.cc
index 6e8522897e..0e1f5ebc66 100644
--- a/cpp/src/arrow/engine/substrait/extension_set.cc
+++ b/cpp/src/arrow/engine/substrait/extension_set.cc
@@ -698,6 +698,20 @@ ExtensionIdRegistry::ArrowToSubstraitCall
EncodeOptionlessOverflowableArithmetic
};
}
+ExtensionIdRegistry::ArrowToSubstraitCall EncodeOptionlessComparison(Id
substrait_fn_id) {
+ return
+ [substrait_fn_id](const compute::Expression::Call& call) ->
Result<SubstraitCall> {
+ // nullable=true isn't quite correct but we don't know the nullability
of
+ // the inputs
+ SubstraitCall substrait_call(substrait_fn_id, call.type.GetSharedPtr(),
+ /*nullable=*/true);
+ for (std::size_t i = 0; i < call.arguments.size(); i++) {
+ substrait_call.SetValueArg(static_cast<uint32_t>(i),
call.arguments[i]);
+ }
+ return std::move(substrait_call);
+ };
+}
+
ExtensionIdRegistry::SubstraitCallToArrow DecodeOptionlessBasicMapping(
const std::string& function_name, uint32_t max_args) {
return [function_name,
@@ -873,6 +887,11 @@ struct DefaultExtensionIdRegistry :
ExtensionIdRegistryImpl {
AddArrowToSubstraitCall(std::string(fn_name) + "_checked",
EncodeOptionlessOverflowableArithmetic<true>(fn_id)));
}
+ // Comparison operators
+ for (const auto& fn_name : {"equal", "is_not_distinct_from"}) {
+ Id fn_id{kSubstraitComparisonFunctionsUri, fn_name};
+ DCHECK_OK(AddArrowToSubstraitCall(fn_name,
EncodeOptionlessComparison(fn_id)));
+ }
}
};
diff --git a/cpp/src/arrow/engine/substrait/plan_internal.cc
b/cpp/src/arrow/engine/substrait/plan_internal.cc
index b0fdb9bdc2..1efd4e1a0a 100644
--- a/cpp/src/arrow/engine/substrait/plan_internal.cc
+++ b/cpp/src/arrow/engine/substrait/plan_internal.cc
@@ -17,6 +17,8 @@
#include "arrow/engine/substrait/plan_internal.h"
+#include "arrow/dataset/plan.h"
+#include "arrow/engine/substrait/relation_internal.h"
#include "arrow/result.h"
#include "arrow/util/hashing.h"
#include "arrow/util/logging.h"
@@ -133,5 +135,19 @@ Result<ExtensionSet> GetExtensionSetFromPlan(const
substrait::Plan& plan,
registry);
}
+Result<std::unique_ptr<substrait::Plan>> PlanToProto(
+ const compute::Declaration& declr, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options) {
+ auto subs_plan = internal::make_unique<substrait::Plan>();
+ auto plan_rel = internal::make_unique<substrait::PlanRel>();
+ auto rel_root = internal::make_unique<substrait::RelRoot>();
+ ARROW_ASSIGN_OR_RAISE(auto rel, ToProto(declr, ext_set, conversion_options));
+ rel_root->set_allocated_input(rel.release());
+ plan_rel->set_allocated_root(rel_root.release());
+ subs_plan->mutable_relations()->AddAllocated(plan_rel.release());
+ RETURN_NOT_OK(AddExtensionSetToPlan(*ext_set, subs_plan.get()));
+ return std::move(subs_plan);
+}
+
} // namespace engine
} // namespace arrow
diff --git a/cpp/src/arrow/engine/substrait/plan_internal.h
b/cpp/src/arrow/engine/substrait/plan_internal.h
index dce23cdceb..e1ced549ce 100644
--- a/cpp/src/arrow/engine/substrait/plan_internal.h
+++ b/cpp/src/arrow/engine/substrait/plan_internal.h
@@ -19,7 +19,9 @@
#pragma once
+#include "arrow/compute/exec/exec_plan.h"
#include "arrow/engine/substrait/extension_set.h"
+#include "arrow/engine/substrait/options.h"
#include "arrow/engine/substrait/visibility.h"
#include "arrow/type_fwd.h"
@@ -51,5 +53,17 @@ Result<ExtensionSet> GetExtensionSetFromPlan(
const substrait::Plan& plan,
const ExtensionIdRegistry* registry = default_extension_id_registry());
+/// \brief Serialize a declaration into a substrait::Plan.
+///
+/// Note that, this is a part of a roundtripping test API and not
+/// designed for use in production
+/// \param[in] declr the sequence of declarations to be serialized
+/// \param[in, out] ext_set the extension set to be updated
+/// \param[in] conversion_options options to control serialization behavior
+/// \return the serialized plan
+ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Plan>> PlanToProto(
+ const compute::Declaration& declr, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options = {});
+
} // namespace engine
} // namespace arrow
diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc
b/cpp/src/arrow/engine/substrait/relation_internal.cc
index c5c02f5155..c5d212c8c2 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.cc
+++ b/cpp/src/arrow/engine/substrait/relation_internal.cc
@@ -29,8 +29,16 @@
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/uri.h"
namespace arrow {
+
+using ::arrow::internal::UriFromAbsolutePath;
+using internal::checked_cast;
+using internal::make_unique;
+
namespace engine {
template <typename RelMessage>
@@ -162,36 +170,45 @@ Result<DeclarationInfo> FromProto(const substrait::Rel&
rel, const ExtensionSet&
}
path = path.substr(7);
- if (item.path_type_case() ==
- substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath) {
- ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
- if (file.type() == fs::FileType::File) {
- files.push_back(std::move(file));
- } else if (file.type() == fs::FileType::Directory) {
+ switch (item.path_type_case()) {
+ case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPath: {
+ ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(path));
+ if (file.type() == fs::FileType::File) {
+ files.push_back(std::move(file));
+ } else if (file.type() == fs::FileType::Directory) {
+ fs::FileSelector selector;
+ selector.base_dir = path;
+ selector.recursive = true;
+ ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+ filesystem->GetFileInfo(selector));
+ std::move(files.begin(), files.end(),
std::back_inserter(discovered_files));
+ }
+ break;
+ }
+ case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile: {
+ files.emplace_back(path, fs::FileType::File);
+ break;
+ }
+ case substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder: {
fs::FileSelector selector;
selector.base_dir = path;
selector.recursive = true;
ARROW_ASSIGN_OR_RAISE(auto discovered_files,
filesystem->GetFileInfo(selector));
- std::move(files.begin(), files.end(),
std::back_inserter(discovered_files));
+ std::move(discovered_files.begin(), discovered_files.end(),
+ std::back_inserter(files));
+ break;
+ }
+ case substrait::ReadRel_LocalFiles_FileOrFiles::kUriPathGlob: {
+ ARROW_ASSIGN_OR_RAISE(auto discovered_files,
+ fs::internal::GlobFiles(filesystem, path));
+ std::move(discovered_files.begin(), discovered_files.end(),
+ std::back_inserter(files));
+ break;
+ }
+ default: {
+ return Status::Invalid("Unrecognized file type in LocalFiles");
}
- }
- if (item.path_type_case() ==
- substrait::ReadRel_LocalFiles_FileOrFiles::kUriFile) {
- files.emplace_back(path, fs::FileType::File);
- } else if (item.path_type_case() ==
- substrait::ReadRel_LocalFiles_FileOrFiles::kUriFolder) {
- fs::FileSelector selector;
- selector.base_dir = path;
- selector.recursive = true;
- ARROW_ASSIGN_OR_RAISE(auto discovered_files,
filesystem->GetFileInfo(selector));
- std::move(discovered_files.begin(), discovered_files.end(),
- std::back_inserter(files));
- } else {
- ARROW_ASSIGN_OR_RAISE(auto discovered_files,
- fs::internal::GlobFiles(filesystem, path));
- std::move(discovered_files.begin(), discovered_files.end(),
- std::back_inserter(files));
}
}
@@ -421,5 +438,141 @@ Result<DeclarationInfo> FromProto(const substrait::Rel&
rel, const ExtensionSet&
rel.DebugString());
}
+namespace {
+
+Result<std::shared_ptr<Schema>> ExtractSchemaToBind(const
compute::Declaration& declr) {
+ std::shared_ptr<Schema> bind_schema;
+ if (declr.factory_name == "scan") {
+ const auto& opts = checked_cast<const
dataset::ScanNodeOptions&>(*(declr.options));
+ bind_schema = opts.dataset->schema();
+ } else if (declr.factory_name == "filter") {
+ auto input_declr = util::get<compute::Declaration>(declr.inputs[0]);
+ ARROW_ASSIGN_OR_RAISE(bind_schema, ExtractSchemaToBind(input_declr));
+ } else if (declr.factory_name == "sink") {
+ // Note that the sink has no output_schema
+ return bind_schema;
+ } else {
+ return Status::Invalid("Schema extraction failed, unsupported factory ",
+ declr.factory_name);
+ }
+ return bind_schema;
+}
+
+Result<std::unique_ptr<substrait::ReadRel>> ScanRelationConverter(
+ const std::shared_ptr<Schema>& schema, const compute::Declaration&
declaration,
+ ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+ auto read_rel = make_unique<substrait::ReadRel>();
+ const auto& scan_node_options =
+ checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+ auto dataset =
+
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+ if (dataset == nullptr) {
+ return Status::Invalid(
+ "Can only convert scan node with FileSystemDataset to a Substrait
plan.");
+ }
+ // set schema
+ ARROW_ASSIGN_OR_RAISE(auto named_struct,
+ ToProto(*dataset->schema(), ext_set,
conversion_options));
+ read_rel->set_allocated_base_schema(named_struct.release());
+
+ // set local files
+ auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+ for (const auto& file : dataset->files()) {
+ auto read_rel_lfs_ffs =
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+ read_rel_lfs_ffs->set_uri_path(UriFromAbsolutePath(file));
+ // set file format
+ auto format_type_name = dataset->format()->type_name();
+ if (format_type_name == "parquet") {
+ read_rel_lfs_ffs->set_allocated_parquet(
+ new
substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions());
+ } else if (format_type_name == "ipc") {
+ read_rel_lfs_ffs->set_allocated_arrow(
+ new substrait::ReadRel::LocalFiles::FileOrFiles::ArrowReadOptions());
+ } else if (format_type_name == "orc") {
+ read_rel_lfs_ffs->set_allocated_orc(
+ new substrait::ReadRel::LocalFiles::FileOrFiles::OrcReadOptions());
+ } else {
+ return Status::NotImplemented("Unsupported file type: ",
format_type_name);
+ }
+ read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
+ }
+ read_rel->set_allocated_local_files(read_rel_lfs.release());
+ return std::move(read_rel);
+}
+
+Result<std::unique_ptr<substrait::FilterRel>> FilterRelationConverter(
+ const std::shared_ptr<Schema>& schema, const compute::Declaration&
declaration,
+ ExtensionSet* ext_set, const ConversionOptions& conversion_options) {
+ auto filter_rel = make_unique<substrait::FilterRel>();
+ const auto& filter_node_options =
+ checked_cast<const compute::FilterNodeOptions&>(*(declaration.options));
+
+ auto filter_expr = filter_node_options.filter_expression;
+ compute::Expression bound_expression;
+ if (!filter_expr.IsBound()) {
+ ARROW_ASSIGN_OR_RAISE(bound_expression, filter_expr.Bind(*schema));
+ }
+
+ if (declaration.inputs.size() == 0) {
+ return Status::Invalid("Filter node doesn't have an input.");
+ }
+
+ // handling input
+ auto declr_input = declaration.inputs[0];
+ ARROW_ASSIGN_OR_RAISE(
+ auto input_rel,
+ ToProto(util::get<compute::Declaration>(declr_input), ext_set,
conversion_options));
+ filter_rel->set_allocated_input(input_rel.release());
+
+ ARROW_ASSIGN_OR_RAISE(auto subs_expr,
+ ToProto(bound_expression, ext_set,
conversion_options));
+ filter_rel->set_allocated_condition(subs_expr.release());
+ return std::move(filter_rel);
+}
+
+} // namespace
+
+Status SerializeAndCombineRelations(const compute::Declaration& declaration,
+ ExtensionSet* ext_set,
+ std::unique_ptr<substrait::Rel>* rel,
+ const ConversionOptions&
conversion_options) {
+ const auto& factory_name = declaration.factory_name;
+ ARROW_ASSIGN_OR_RAISE(auto schema, ExtractSchemaToBind(declaration));
+ // Note that the sink declaration factory doesn't exist for serialization as
+ // Substrait doesn't deal with a sink node definition
+
+ if (factory_name == "scan") {
+ ARROW_ASSIGN_OR_RAISE(
+ auto read_rel,
+ ScanRelationConverter(schema, declaration, ext_set,
conversion_options));
+ (*rel)->set_allocated_read(read_rel.release());
+ } else if (factory_name == "filter") {
+ ARROW_ASSIGN_OR_RAISE(
+ auto filter_rel,
+ FilterRelationConverter(schema, declaration, ext_set,
conversion_options));
+ (*rel)->set_allocated_filter(filter_rel.release());
+ } else if (factory_name == "sink") {
+ // Generally when a plan is deserialized the declaration will be a sink
declaration.
+ // Since there is no Sink relation in substrait, this function would be
recursively
+ // called on the input of the Sink declaration.
+ auto sink_input_decl =
util::get<compute::Declaration>(declaration.inputs[0]);
+ RETURN_NOT_OK(
+ SerializeAndCombineRelations(sink_input_decl, ext_set, rel,
conversion_options));
+ } else {
+ return Status::NotImplemented("Factory ", factory_name,
+ " not implemented for roundtripping.");
+ }
+
+ return Status::OK();
+}
+
+Result<std::unique_ptr<substrait::Rel>> ToProto(
+ const compute::Declaration& declr, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options) {
+ auto rel = make_unique<substrait::Rel>();
+ RETURN_NOT_OK(SerializeAndCombineRelations(declr, ext_set, &rel,
conversion_options));
+ return std::move(rel);
+}
+
} // namespace engine
} // namespace arrow
diff --git a/cpp/src/arrow/engine/substrait/relation_internal.h
b/cpp/src/arrow/engine/substrait/relation_internal.h
index 3699d1f657..778d1e5bc0 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.h
+++ b/cpp/src/arrow/engine/substrait/relation_internal.h
@@ -40,9 +40,19 @@ struct DeclarationInfo {
int num_columns;
};
+/// \brief Convert a Substrait Rel object to an Acero declaration
ARROW_ENGINE_EXPORT
Result<DeclarationInfo> FromProto(const substrait::Rel&, const ExtensionSet&,
const ConversionOptions&);
+/// \brief Convert an Acero Declaration to a Substrait Rel
+///
+/// Note that, in order to provide a generic interface for ToProto,
+/// the ExecNode or ExecPlan are not used in this context as Declaration
+/// is preferred in the Substrait space rather than internal components of
+/// Acero execution engine.
+ARROW_ENGINE_EXPORT Result<std::unique_ptr<substrait::Rel>> ToProto(
+ const compute::Declaration&, ExtensionSet*, const ConversionOptions&);
+
} // namespace engine
} // namespace arrow
diff --git a/cpp/src/arrow/engine/substrait/serde.cc
b/cpp/src/arrow/engine/substrait/serde.cc
index 9f7d979e2f..c629767549 100644
--- a/cpp/src/arrow/engine/substrait/serde.cc
+++ b/cpp/src/arrow/engine/substrait/serde.cc
@@ -52,6 +52,23 @@ Result<Message> ParseFromBuffer(const Buffer& buf) {
return message;
}
+Result<std::shared_ptr<Buffer>> SerializePlan(
+ const compute::Declaration& declaration, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options) {
+ ARROW_ASSIGN_OR_RAISE(auto subs_plan,
+ PlanToProto(declaration, ext_set, conversion_options));
+ std::string serialized = subs_plan->SerializeAsString();
+ return Buffer::FromString(std::move(serialized));
+}
+
+Result<std::shared_ptr<Buffer>> SerializeRelation(
+ const compute::Declaration& declaration, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options) {
+ ARROW_ASSIGN_OR_RAISE(auto relation, ToProto(declaration, ext_set,
conversion_options));
+ std::string serialized = relation->SerializeAsString();
+ return Buffer::FromString(std::move(serialized));
+}
+
Result<compute::Declaration> DeserializeRelation(
const Buffer& buf, const ExtensionSet& ext_set,
const ConversionOptions& conversion_options) {
diff --git a/cpp/src/arrow/engine/substrait/serde.h
b/cpp/src/arrow/engine/substrait/serde.h
index 6c2083fb56..2a14ca6757 100644
--- a/cpp/src/arrow/engine/substrait/serde.h
+++ b/cpp/src/arrow/engine/substrait/serde.h
@@ -36,6 +36,19 @@
namespace arrow {
namespace engine {
+/// \brief Serialize an Acero Plan to a binary protobuf Substrait message
+///
+/// \param[in] declaration the Acero declaration to serialize.
+/// This declaration is the sink relation of the Acero plan.
+/// \param[in,out] ext_set the extension mapping to use; may be updated to add
+/// \param[in] conversion_options options to control how the conversion is done
+///
+/// \return a buffer containing the protobuf serialization of the Acero
relation
+ARROW_ENGINE_EXPORT
+Result<std::shared_ptr<Buffer>> SerializePlan(
+ const compute::Declaration& declaration, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options = {});
+
/// Factory function type for generating the node that consumes the batches
produced by
/// each toplevel Substrait relation when deserializing a Substrait Plan.
using ConsumerFactory =
std::function<std::shared_ptr<compute::SinkNodeConsumer>()>;
@@ -202,6 +215,17 @@ Result<std::shared_ptr<Buffer>> SerializeExpression(
const compute::Expression& expr, ExtensionSet* ext_set,
const ConversionOptions& conversion_options = {});
+/// \brief Serialize an Acero Declaration to a binary protobuf Substrait
message
+///
+/// \param[in] declaration the Acero declaration to serialize
+/// \param[in,out] ext_set the extension mapping to use; may be updated to add
+/// \param[in] conversion_options options to control how the conversion is done
+///
+/// \return a buffer containing the protobuf serialization of the Acero
relation
+ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeRelation(
+ const compute::Declaration& declaration, ExtensionSet* ext_set,
+ const ConversionOptions& conversion_options = {});
+
/// \brief Deserializes a Substrait Rel (relation) message to an ExecNode
declaration
///
/// \param[in] buf a buffer containing the protobuf serialization of a
Substrait
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc
b/cpp/src/arrow/engine/substrait/serde_test.cc
index 04405b3168..9b6c3f715f 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -23,17 +23,30 @@
#include "arrow/compute/exec/expression_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/file_parquet.h"
+
#include "arrow/dataset/plan.h"
#include "arrow/dataset/scanner.h"
#include "arrow/engine/substrait/extension_types.h"
#include "arrow/engine/substrait/serde.h"
+
#include "arrow/engine/substrait/util.h"
+
+#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/test_util.h"
+#include "arrow/io/compressed.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/writer.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/util/key_value_metadata.h"
+#include "parquet/arrow/writer.h"
+
+#include "arrow/util/hash_util.h"
+#include "arrow/util/hashing.h"
+
using testing::ElementsAre;
using testing::Eq;
using testing::HasSubstr;
@@ -42,9 +55,46 @@ using testing::UnorderedElementsAre;
namespace arrow {
using internal::checked_cast;
-
+using internal::hash_combine;
namespace engine {
+Status WriteIpcData(const std::string& path,
+ const std::shared_ptr<fs::FileSystem> file_system,
+ const std::shared_ptr<Table> input) {
+ EXPECT_OK_AND_ASSIGN(auto mmap, file_system->OpenOutputStream(path));
+ ARROW_ASSIGN_OR_RAISE(
+ auto file_writer,
+ MakeFileWriter(mmap, input->schema(), ipc::IpcWriteOptions::Defaults()));
+ TableBatchReader reader(input);
+ std::shared_ptr<RecordBatch> batch;
+ while (true) {
+ RETURN_NOT_OK(reader.ReadNext(&batch));
+ if (batch == nullptr) {
+ break;
+ }
+ RETURN_NOT_OK(file_writer->WriteRecordBatch(*batch));
+ }
+ RETURN_NOT_OK(file_writer->Close());
+ return Status::OK();
+}
+
+Result<std::shared_ptr<Table>> GetTableFromPlan(
+ compute::Declaration& declarations,
+ arrow::AsyncGenerator<util::optional<compute::ExecBatch>>& sink_gen,
+ compute::ExecContext& exec_context, std::shared_ptr<Schema>&
output_schema) {
+ ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context));
+ ARROW_ASSIGN_OR_RAISE(auto decl, declarations.AddToPlan(plan.get()));
+
+ RETURN_NOT_OK(decl->Validate());
+
+ std::shared_ptr<arrow::RecordBatchReader> sink_reader =
compute::MakeGeneratorReader(
+ output_schema, std::move(sink_gen), exec_context.memory_pool());
+
+ RETURN_NOT_OK(plan->Validate());
+ RETURN_NOT_OK(plan->StartProducing());
+ return arrow::Table::FromRecordBatchReader(sink_reader.get());
+}
+
class NullSinkNodeConsumer : public compute::SinkNodeConsumer {
public:
Status Init(const std::shared_ptr<Schema>&, compute::BackpressureControl*)
override {
@@ -866,6 +916,7 @@ Result<std::string> GetSubstraitJSON() {
auto file_name =
arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet");
auto file_path = file_name->ToString();
+
std::string substrait_json = R"({
"relations": [
{"rel": {
@@ -1814,5 +1865,243 @@ TEST(Substrait, AggregateBadPhase) {
ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return
kNullConsumer; }));
}
+TEST(Substrait, BasicPlanRoundTripping) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ arrow::dataset::internal::Initialize();
+
+ auto dummy_schema = schema(
+ {field("key", int32()), field("shared", int32()), field("distinct",
int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 4, 20]
+ ])",
+ R"([
+ [0, 2, 1],
+ [1, 3, 2],
+ [4, 1, 3],
+ [3, 1, 3],
+ [1, 2, 5]
+ ])",
+ R"([
+ [2, 2, 12],
+ [5, 3, 12],
+ [1, 3, 12]
+ ])"});
+
+ auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+ auto filesystem = std::make_shared<fs::LocalFileSystem>();
+ const std::string file_name = "serde_test.arrow";
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait-tempdir-"));
+ std::cout << "file_path_str " << tempdir->path().ToString() << std::endl;
+ ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+ std::string file_path_str = file_path.ToString();
+
+ ARROW_EXPECT_OK(WriteIpcData(file_path_str, filesystem, table));
+
+ std::vector<fs::FileInfo> files;
+ const std::vector<std::string> f_paths = {file_path_str};
+
+ for (const auto& f_path : f_paths) {
+ ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+ files.push_back(std::move(f_file));
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto ds_factory,
dataset::FileSystemDatasetFactory::Make(
+ filesystem, std::move(files),
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+ auto scan_options = std::make_shared<dataset::ScanOptions>();
+ scan_options->projection = compute::project({}, {});
+ const std::string filter_col_left = "shared";
+ const std::string filter_col_right = "distinct";
+ auto comp_left_value = compute::field_ref(filter_col_left);
+ auto comp_right_value = compute::field_ref(filter_col_right);
+ auto filter = compute::equal(comp_left_value, comp_right_value);
+
+ arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+ auto declarations = compute::Declaration::Sequence(
+ {compute::Declaration(
+ {"scan", dataset::ScanNodeOptions{dataset, scan_options}, "s"}),
+ compute::Declaration({"filter", compute::FilterNodeOptions{filter},
"f"}),
+ compute::Declaration({"sink", compute::SinkNodeOptions{&sink_gen},
"e"})});
+
+ std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg =
MakeExtensionIdRegistry();
+ ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+ ExtensionSet ext_set(ext_id_reg);
+
+ ASSERT_OK_AND_ASSIGN(auto serialized_plan, SerializePlan(declarations,
&ext_set));
+
+ ASSERT_OK_AND_ASSIGN(
+ auto sink_decls,
+ DeserializePlans(
+ *serialized_plan, [] { return kNullConsumer; }, ext_id_reg,
&ext_set));
+ // filter declaration
+ auto roundtripped_filter =
sink_decls[0].inputs[0].get<compute::Declaration>();
+ const auto& filter_opts =
+ checked_cast<const
compute::FilterNodeOptions&>(*(roundtripped_filter->options));
+ auto roundtripped_expr = filter_opts.filter_expression;
+
+ if (auto* call = roundtripped_expr.call()) {
+ EXPECT_EQ(call->function_name, "equal");
+ auto args = call->arguments;
+ auto left_index = args[0].field_ref()->field_path()->indices()[0];
+ EXPECT_EQ(dummy_schema->field_names()[left_index], filter_col_left);
+ auto right_index = args[1].field_ref()->field_path()->indices()[0];
+ EXPECT_EQ(dummy_schema->field_names()[right_index], filter_col_right);
+ }
+ // scan declaration
+ auto roundtripped_scan =
roundtripped_filter->inputs[0].get<compute::Declaration>();
+ const auto& dataset_opts =
+ checked_cast<const
dataset::ScanNodeOptions&>(*(roundtripped_scan->options));
+ const auto& roundripped_ds = dataset_opts.dataset;
+ EXPECT_TRUE(roundripped_ds->schema()->Equals(*dummy_schema));
+ ASSERT_OK_AND_ASSIGN(auto roundtripped_frgs, roundripped_ds->GetFragments());
+ ASSERT_OK_AND_ASSIGN(auto expected_frgs, dataset->GetFragments());
+
+ auto roundtrip_frg_vec = IteratorToVector(std::move(roundtripped_frgs));
+ auto expected_frg_vec = IteratorToVector(std::move(expected_frgs));
+ EXPECT_EQ(expected_frg_vec.size(), roundtrip_frg_vec.size());
+ int64_t idx = 0;
+ for (auto fragment : expected_frg_vec) {
+ const auto* l_frag = checked_cast<const
dataset::FileFragment*>(fragment.get());
+ const auto* r_frag =
+ checked_cast<const
dataset::FileFragment*>(roundtrip_frg_vec[idx++].get());
+ EXPECT_TRUE(l_frag->Equals(*r_frag));
+ }
+}
+
+TEST(Substrait, BasicPlanRoundTrippingEndToEnd) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ arrow::dataset::internal::Initialize();
+
+ auto dummy_schema = schema(
+ {field("key", int32()), field("shared", int32()), field("distinct",
int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 4, 4]
+ ])",
+ R"([
+ [0, 2, 1],
+ [1, 3, 2],
+ [4, 1, 1],
+ [3, 1, 3],
+ [1, 2, 2]
+ ])",
+ R"([
+ [2, 2, 12],
+ [5, 3, 12],
+ [1, 3, 3]
+ ])"});
+
+ auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+ auto filesystem = std::make_shared<fs::LocalFileSystem>();
+ const std::string file_name = "serde_test.arrow";
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait-tempdir-"));
+ ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+ std::string file_path_str = file_path.ToString();
+
+ ARROW_EXPECT_OK(WriteIpcData(file_path_str, filesystem, table));
+
+ std::vector<fs::FileInfo> files;
+ const std::vector<std::string> f_paths = {file_path_str};
+
+ for (const auto& f_path : f_paths) {
+ ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+ files.push_back(std::move(f_file));
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto ds_factory,
dataset::FileSystemDatasetFactory::Make(
+ filesystem, std::move(files),
format, {}));
+ ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+ auto scan_options = std::make_shared<dataset::ScanOptions>();
+ scan_options->projection = compute::project({}, {});
+ const std::string filter_col_left = "shared";
+ const std::string filter_col_right = "distinct";
+ auto comp_left_value = compute::field_ref(filter_col_left);
+ auto comp_right_value = compute::field_ref(filter_col_right);
+ auto filter = compute::equal(comp_left_value, comp_right_value);
+
+ arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+ auto declarations = compute::Declaration::Sequence(
+ {compute::Declaration(
+ {"scan", dataset::ScanNodeOptions{dataset, scan_options}, "s"}),
+ compute::Declaration({"filter", compute::FilterNodeOptions{filter},
"f"}),
+ compute::Declaration({"sink", compute::SinkNodeOptions{&sink_gen},
"e"})});
+
+ ASSERT_OK_AND_ASSIGN(auto expected_table, GetTableFromPlan(declarations,
sink_gen,
+ exec_context,
dummy_schema));
+
+ std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg =
MakeExtensionIdRegistry();
+ ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+ ExtensionSet ext_set(ext_id_reg);
+
+ ASSERT_OK_AND_ASSIGN(auto serialized_plan, SerializePlan(declarations,
&ext_set));
+
+ ASSERT_OK_AND_ASSIGN(
+ auto sink_decls,
+ DeserializePlans(
+ *serialized_plan, [] { return kNullConsumer; }, ext_id_reg,
&ext_set));
+ // filter declaration
+ auto roundtripped_filter =
sink_decls[0].inputs[0].get<compute::Declaration>();
+ const auto& filter_opts =
+ checked_cast<const
compute::FilterNodeOptions&>(*(roundtripped_filter->options));
+ auto roundtripped_expr = filter_opts.filter_expression;
+
+ if (auto* call = roundtripped_expr.call()) {
+ EXPECT_EQ(call->function_name, "equal");
+ auto args = call->arguments;
+ auto left_index = args[0].field_ref()->field_path()->indices()[0];
+ EXPECT_EQ(dummy_schema->field_names()[left_index], filter_col_left);
+ auto right_index = args[1].field_ref()->field_path()->indices()[0];
+ EXPECT_EQ(dummy_schema->field_names()[right_index], filter_col_right);
+ }
+ // scan declaration
+ auto roundtripped_scan =
roundtripped_filter->inputs[0].get<compute::Declaration>();
+ const auto& dataset_opts =
+ checked_cast<const
dataset::ScanNodeOptions&>(*(roundtripped_scan->options));
+ const auto& roundripped_ds = dataset_opts.dataset;
+ EXPECT_TRUE(roundripped_ds->schema()->Equals(*dummy_schema));
+ ASSERT_OK_AND_ASSIGN(auto roundtripped_frgs, roundripped_ds->GetFragments());
+ ASSERT_OK_AND_ASSIGN(auto expected_frgs, dataset->GetFragments());
+
+ auto roundtrip_frg_vec = IteratorToVector(std::move(roundtripped_frgs));
+ auto expected_frg_vec = IteratorToVector(std::move(expected_frgs));
+ EXPECT_EQ(expected_frg_vec.size(), roundtrip_frg_vec.size());
+ int64_t idx = 0;
+ for (auto fragment : expected_frg_vec) {
+ const auto* l_frag = checked_cast<const
dataset::FileFragment*>(fragment.get());
+ const auto* r_frag =
+ checked_cast<const
dataset::FileFragment*>(roundtrip_frg_vec[idx++].get());
+ EXPECT_TRUE(l_frag->Equals(*r_frag));
+ }
+ arrow::AsyncGenerator<util::optional<compute::ExecBatch>> rnd_trp_sink_gen;
+ auto rnd_trp_sink_node_options = compute::SinkNodeOptions{&rnd_trp_sink_gen};
+ auto rnd_trp_sink_declaration =
+ compute::Declaration({"sink", rnd_trp_sink_node_options, "e"});
+ auto rnd_trp_declarations =
+ compute::Declaration::Sequence({*roundtripped_filter,
rnd_trp_sink_declaration});
+ ASSERT_OK_AND_ASSIGN(auto rnd_trp_table,
+ GetTableFromPlan(rnd_trp_declarations, rnd_trp_sink_gen,
+ exec_context, dummy_schema));
+ EXPECT_TRUE(expected_table->Equals(*rnd_trp_table));
+}
+
} // namespace engine
} // namespace arrow
diff --git a/cpp/src/arrow/filesystem/localfs_test.cc
b/cpp/src/arrow/filesystem/localfs_test.cc
index 0078a59393..fd36faf30f 100644
--- a/cpp/src/arrow/filesystem/localfs_test.cc
+++ b/cpp/src/arrow/filesystem/localfs_test.cc
@@ -32,6 +32,7 @@
#include "arrow/filesystem/util_internal.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
+#include "arrow/util/uri.h"
namespace arrow {
namespace fs {
@@ -40,6 +41,7 @@ namespace internal {
using ::arrow::internal::FileDescriptor;
using ::arrow::internal::PlatformFilename;
using ::arrow::internal::TemporaryDir;
+using ::arrow::internal::UriFromAbsolutePath;
class LocalFSTestMixin : public ::testing::Test {
public:
@@ -173,16 +175,6 @@ class TestLocalFS : public LocalFSTestMixin {
fs_ = std::make_shared<SubTreeFileSystem>(local_path_, local_fs_);
}
- std::string UriFromAbsolutePath(const std::string& path) {
-#ifdef _WIN32
- // Path is supposed to start with "X:/..."
- return "file:///" + path;
-#else
- // Path is supposed to start with "/..."
- return "file://" + path;
-#endif
- }
-
template <typename FileSystemFromUriFunc>
void CheckFileSystemFromUriFunc(const std::string& uri,
FileSystemFromUriFunc&& fs_from_uri) {
@@ -307,7 +299,7 @@ TYPED_TEST(TestLocalFS, NormalizePathThroughSubtreeFS) {
TYPED_TEST(TestLocalFS, FileSystemFromUriFile) {
// Concrete test with actual file
- const auto uri_string = this->UriFromAbsolutePath(this->local_path_);
+ const auto uri_string = UriFromAbsolutePath(this->local_path_);
this->TestFileSystemFromUri(uri_string);
this->TestFileSystemFromUriOrPath(uri_string);
diff --git a/cpp/src/arrow/filesystem/util_internal.cc
b/cpp/src/arrow/filesystem/util_internal.cc
index 0d2ad70902..e6f301bdbf 100644
--- a/cpp/src/arrow/filesystem/util_internal.cc
+++ b/cpp/src/arrow/filesystem/util_internal.cc
@@ -78,6 +78,7 @@ Status InvalidDeleteDirContents(util::string_view path) {
Result<FileInfoVector> GlobFiles(const std::shared_ptr<FileSystem>& filesystem,
const std::string& glob) {
+ // TODO: ARROW-17640
// The candidate entries at the current depth level.
// We start with the filesystem root.
FileInfoVector results{FileInfo("", FileType::Directory)};
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index 11ae80d03e..a62040f3a7 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -1867,7 +1867,9 @@ Result<std::unique_ptr<TemporaryDir>>
TemporaryDir::Make(const std::string& pref
[&](const NativePathString& base_dir) ->
Result<std::unique_ptr<TemporaryDir>> {
Status st;
for (int attempt = 0; attempt < 3; ++attempt) {
- PlatformFilename fn(base_dir + kNativeSep + base_name + kNativeSep);
+ PlatformFilename fn_base_dir(base_dir);
+ PlatformFilename fn_base_name(base_name + kNativeSep);
+ PlatformFilename fn = fn_base_dir.Join(fn_base_name);
auto result = CreateDir(fn);
if (!result.ok()) {
// Probably a permissions error or a non-existing base_dir
diff --git a/cpp/src/arrow/util/uri.cc b/cpp/src/arrow/util/uri.cc
index 7a8484ce51..abfc9de8b4 100644
--- a/cpp/src/arrow/util/uri.cc
+++ b/cpp/src/arrow/util/uri.cc
@@ -304,5 +304,15 @@ Status Uri::Parse(const std::string& uri_string) {
return Status::OK();
}
+std::string UriFromAbsolutePath(const std::string& path) {
+#ifdef _WIN32
+ // Path is supposed to start with "X:/..."
+ return "file:///" + path;
+#else
+ // Path is supposed to start with "/..."
+ return "file://" + path;
+#endif
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/util/uri.h b/cpp/src/arrow/util/uri.h
index eae1956eaf..50d9eccf82 100644
--- a/cpp/src/arrow/util/uri.h
+++ b/cpp/src/arrow/util/uri.h
@@ -104,5 +104,9 @@ std::string UriEncodeHost(const std::string& host);
ARROW_EXPORT
bool IsValidUriScheme(const arrow::util::string_view s);
+/// Create a file uri from a given absolute path
+ARROW_EXPORT
+std::string UriFromAbsolutePath(const std::string& path);
+
} // namespace internal
} // namespace arrow