bkietz commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r937881009


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1731,5 +1770,265 @@ TEST(Substrait, AggregateWithFilter) {
   ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return 
kNullConsumer; }));
 }
 
+TEST(Substrait, BasicPlanRoundTripping) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  compute::ExecContext exec_context;
+  ExtensionSet ext_set;
+  auto dummy_schema = schema(
+      {field("key", int32()), field("shared", int32()), field("distinct", 
int32())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])",
+                                            R"([
+      [0, 2, 1],
+      [1, 3, 2],
+      [4, 1, 3],
+      [3, 1, 3],
+      [1, 2, 5]
+    ])",
+                                            R"([
+      [2, 2, 12],
+      [5, 3, 12],
+      [1, 3, 12]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "serde_test.parquet";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       
arrow::internal::TemporaryDir::Make("substrait_tempdir"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToString();
+
+  // Note: there is an additional forward slash introduced by the tempdir
+  // it must be replaced to properly load into reading files
+  // TODO: (Review: Jira needs to be reported to handle this properly)
+  std::string toReplace("/T//");
+  size_t pos = file_path_str.find(toReplace);
+  file_path_str.replace(pos, toReplace.length(), "/T/");
+
+  ARROW_EXPECT_OK(WriteParquetData(file_path_str, filesystem, table));
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {file_path_str};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            filesystem, std::move(files), 
format, {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto scan_options = std::make_shared<dataset::ScanOptions>();
+  scan_options->projection = compute::project({}, {});
+  const std::string filter_col = "shared";
+  auto filter = compute::equal(compute::field_ref(filter_col), 
compute::literal(3));
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, scan_options};
+  auto filter_node_options = compute::FilterNodeOptions{filter};
+  auto sink_node_options = compute::SinkNodeOptions{&sink_gen};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options, 
"s"});
+  auto filter_declaration = compute::Declaration({"filter", 
filter_node_options, "f"});
+  auto sink_declaration = compute::Declaration({"sink", sink_node_options, 
"e"});
+
+  auto declarations = compute::Declaration::Sequence(
+      {scan_declaration, filter_declaration, sink_declaration});
+
+  ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context));
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_plan, SerializePlan(declarations, 
&ext_set));
+
+  for (auto sp_ext_id_reg :
+       {std::shared_ptr<ExtensionIdRegistry>(), 
substrait::MakeExtensionIdRegistry()}) {
+    ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+    ExtensionSet ext_set(ext_id_reg);
+    ASSERT_OK_AND_ASSIGN(
+        auto sink_decls,
+        DeserializePlans(
+            *serialized_plan, [] { return kNullConsumer; }, ext_id_reg, 
&ext_set));
+    // filter declaration
+    auto roundtripped_filter = 
sink_decls[0].inputs[0].get<compute::Declaration>();
+    const auto& filter_opts =
+        checked_cast<const 
compute::FilterNodeOptions&>(*(roundtripped_filter->options));
+    auto roundtripped_expr = filter_opts.filter_expression;
+
+    if (auto* call = roundtripped_expr.call()) {
+      EXPECT_EQ(call->function_name, "equal");
+      auto args = call->arguments;
+      auto index = args[0].field_ref()->field_path()->indices()[0];
+      EXPECT_EQ(dummy_schema->field_names()[index], filter_col);
+      EXPECT_EQ(args[1], compute::literal(3));
+    }
+    // scan declaration
+    auto roundtripped_scan = 
roundtripped_filter->inputs[0].get<compute::Declaration>();
+    const auto& dataset_opts =
+        checked_cast<const 
dataset::ScanNodeOptions&>(*(roundtripped_scan->options));
+    const auto& roundripped_ds = dataset_opts.dataset;
+    EXPECT_TRUE(roundripped_ds->schema()->Equals(*dummy_schema));
+    ASSERT_OK_AND_ASSIGN(auto roundtripped_frgs, 
roundripped_ds->GetFragments());
+    ASSERT_OK_AND_ASSIGN(auto expected_frgs, dataset->GetFragments());
+
+    auto roundtrip_frg_vec = IteratorToVector(std::move(roundtripped_frgs));
+    auto expected_frg_vec = IteratorToVector(std::move(expected_frgs));
+    EXPECT_EQ(expected_frg_vec.size(), roundtrip_frg_vec.size());
+    int64_t idx = 0;
+    for (auto fragment : expected_frg_vec) {
+      const auto* l_frag = checked_cast<const 
dataset::FileFragment*>(fragment.get());
+      const auto* r_frag =
+          checked_cast<const 
dataset::FileFragment*>(roundtrip_frg_vec[idx++].get());
+      EXPECT_TRUE(l_frag->Equals(*r_frag));
+    }
+  }
+#endif
+}
+
+TEST(Substrait, BasicPlanRoundTrippingEndToEnd) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  compute::ExecContext exec_context;
+  ExtensionSet ext_set;
+  auto dummy_schema = schema(
+      {field("key", int32()), field("shared", int32()), field("distinct", 
int32())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])",
+                                            R"([
+      [0, 2, 1],
+      [1, 3, 2],
+      [4, 1, 3],
+      [3, 1, 3],
+      [1, 2, 5]
+    ])",
+                                            R"([
+      [2, 2, 12],
+      [5, 3, 12],
+      [1, 3, 12]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "serde_test.parquet";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       
arrow::internal::TemporaryDir::Make("substrait_tempdir"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToNative();
+
+  // Note: there is an additional forward slash introduced by the tempdir
+  // it must be replaced to properly load into reading files
+  // TODO: (Review: Jira needs to be reported to handle this properly)
+  std::string toReplace("/T//");
+  size_t pos = file_path_str.find(toReplace);
+  file_path_str.replace(pos, toReplace.length(), "/T/");
+
+  ARROW_EXPECT_OK(WriteParquetData(file_path_str, filesystem, table));
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {file_path_str};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            filesystem, std::move(files), 
format, {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto scan_options = std::make_shared<dataset::ScanOptions>();
+  scan_options->projection = compute::project({}, {});
+  const std::string filter_col = "shared";
+  auto filter = compute::equal(compute::field_ref(filter_col), 
compute::literal(3));
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, scan_options};
+  auto filter_node_options = compute::FilterNodeOptions{filter};
+  auto sink_node_options = compute::SinkNodeOptions{&sink_gen};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options, 
"s"});
+  auto filter_declaration = compute::Declaration({"filter", 
filter_node_options, "f"});
+  auto sink_declaration = compute::Declaration({"sink", sink_node_options, 
"e"});
+
+  auto declarations = compute::Declaration::Sequence(
+      {scan_declaration, filter_declaration, sink_declaration});

Review Comment:
   Nit: instead of naming each of these could we fold them into a single 
sequence?
   ```suggestion
     auto declarations = compute::Declaration::Sequence({
         {"scan", dataset::ScanNodeOptions{dataset, scan_options}, "s"},
         {"filter", compute::FilterNodeOptions{filter}, "f"},
         {"sink", compute::SinkNodeOptions{&sink_gen}, "e"},
     });
   ```



##########
cpp/src/arrow/engine/substrait/registry.h:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// NOTE: API is EXPERIMENTAL and will change without going through a
+// deprecation cycle
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/engine/substrait/extension_set.h"
+#include "arrow/engine/substrait/extension_types.h"
+#include "arrow/engine/substrait/options.h"
+#include "arrow/engine/substrait/relation_internal.h"
+#include "arrow/engine/substrait/serde.h"
+#include "arrow/engine/substrait/visibility.h"
+#include "arrow/type_fwd.h"
+
+#include "substrait/algebra.pb.h"  // IWYU pragma: export
+
+namespace arrow {
+
+namespace engine {
+
+/// \brief Acero-Substrait integration contains converters which enables
+/// converting Acero ExecPlan related entities to the corresponding Substrait
+/// entities.
+///
+/// Note that the current registry definition only holds converters to convert
+/// an Acero plan to Substrait plan.
+class ARROW_ENGINE_EXPORT SubstraitConversionRegistry {
+ public:
+  virtual ~SubstraitConversionRegistry() = default;
+
+  /// \brief Alias for Acero-to-Substrait converter
+  using SubstraitConverter = 
std::function<Result<std::unique_ptr<substrait::Rel>>(
+      const std::shared_ptr<Schema>&, const compute::Declaration&, 
ExtensionSet*,
+      const ConversionOptions&)>;

Review Comment:
   Since this class references the protobuf message classes, it should be 
confined to `_internal.h` and `.cc` files to ensure it is not installed along 
with the other arrow headers; the protobuf message classes are currently 
private/internal



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1731,5 +1770,265 @@ TEST(Substrait, AggregateWithFilter) {
   ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return 
kNullConsumer; }));
 }
 
+TEST(Substrait, BasicPlanRoundTripping) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  compute::ExecContext exec_context;
+  ExtensionSet ext_set;
+  auto dummy_schema = schema(
+      {field("key", int32()), field("shared", int32()), field("distinct", 
int32())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])",
+                                            R"([
+      [0, 2, 1],
+      [1, 3, 2],
+      [4, 1, 3],
+      [3, 1, 3],
+      [1, 2, 5]
+    ])",
+                                            R"([
+      [2, 2, 12],
+      [5, 3, 12],
+      [1, 3, 12]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "serde_test.parquet";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       
arrow::internal::TemporaryDir::Make("substrait_tempdir"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToString();
+
+  // Note: there is an additional forward slash introduced by the tempdir
+  // it must be replaced to properly load into reading files
+  // TODO: (Review: Jira needs to be reported to handle this properly)
+  std::string toReplace("/T//");
+  size_t pos = file_path_str.find(toReplace);
+  file_path_str.replace(pos, toReplace.length(), "/T/");
+
+  ARROW_EXPECT_OK(WriteParquetData(file_path_str, filesystem, table));
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {file_path_str};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            filesystem, std::move(files), 
format, {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto scan_options = std::make_shared<dataset::ScanOptions>();
+  scan_options->projection = compute::project({}, {});
+  const std::string filter_col = "shared";
+  auto filter = compute::equal(compute::field_ref(filter_col), 
compute::literal(3));
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, scan_options};
+  auto filter_node_options = compute::FilterNodeOptions{filter};
+  auto sink_node_options = compute::SinkNodeOptions{&sink_gen};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options, 
"s"});
+  auto filter_declaration = compute::Declaration({"filter", 
filter_node_options, "f"});
+  auto sink_declaration = compute::Declaration({"sink", sink_node_options, 
"e"});
+
+  auto declarations = compute::Declaration::Sequence(
+      {scan_declaration, filter_declaration, sink_declaration});
+
+  ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context));
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_plan, SerializePlan(declarations, 
&ext_set));
+
+  for (auto sp_ext_id_reg :
+       {std::shared_ptr<ExtensionIdRegistry>(), 
substrait::MakeExtensionIdRegistry()}) {
+    ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+    ExtensionSet ext_set(ext_id_reg);
+    ASSERT_OK_AND_ASSIGN(
+        auto sink_decls,
+        DeserializePlans(
+            *serialized_plan, [] { return kNullConsumer; }, ext_id_reg, 
&ext_set));
+    // filter declaration
+    auto roundtripped_filter = 
sink_decls[0].inputs[0].get<compute::Declaration>();
+    const auto& filter_opts =
+        checked_cast<const 
compute::FilterNodeOptions&>(*(roundtripped_filter->options));
+    auto roundtripped_expr = filter_opts.filter_expression;
+
+    if (auto* call = roundtripped_expr.call()) {
+      EXPECT_EQ(call->function_name, "equal");
+      auto args = call->arguments;
+      auto index = args[0].field_ref()->field_path()->indices()[0];
+      EXPECT_EQ(dummy_schema->field_names()[index], filter_col);
+      EXPECT_EQ(args[1], compute::literal(3));
+    }
+    // scan declaration
+    auto roundtripped_scan = 
roundtripped_filter->inputs[0].get<compute::Declaration>();
+    const auto& dataset_opts =
+        checked_cast<const 
dataset::ScanNodeOptions&>(*(roundtripped_scan->options));
+    const auto& roundripped_ds = dataset_opts.dataset;
+    EXPECT_TRUE(roundripped_ds->schema()->Equals(*dummy_schema));
+    ASSERT_OK_AND_ASSIGN(auto roundtripped_frgs, 
roundripped_ds->GetFragments());
+    ASSERT_OK_AND_ASSIGN(auto expected_frgs, dataset->GetFragments());
+
+    auto roundtrip_frg_vec = IteratorToVector(std::move(roundtripped_frgs));
+    auto expected_frg_vec = IteratorToVector(std::move(expected_frgs));
+    EXPECT_EQ(expected_frg_vec.size(), roundtrip_frg_vec.size());
+    int64_t idx = 0;
+    for (auto fragment : expected_frg_vec) {
+      const auto* l_frag = checked_cast<const 
dataset::FileFragment*>(fragment.get());
+      const auto* r_frag =
+          checked_cast<const 
dataset::FileFragment*>(roundtrip_frg_vec[idx++].get());
+      EXPECT_TRUE(l_frag->Equals(*r_frag));
+    }
+  }
+#endif
+}
+
+TEST(Substrait, BasicPlanRoundTrippingEndToEnd) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  compute::ExecContext exec_context;
+  ExtensionSet ext_set;
+  auto dummy_schema = schema(
+      {field("key", int32()), field("shared", int32()), field("distinct", 
int32())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])",
+                                            R"([
+      [0, 2, 1],
+      [1, 3, 2],
+      [4, 1, 3],
+      [3, 1, 3],
+      [1, 2, 5]
+    ])",
+                                            R"([
+      [2, 2, 12],
+      [5, 3, 12],
+      [1, 3, 12]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "serde_test.parquet";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       
arrow::internal::TemporaryDir::Make("substrait_tempdir"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToNative();
+
+  // Note: there is an additional forward slash introduced by the tempdir
+  // it must be replaced to properly load into reading files
+  // TODO: (Review: Jira needs to be reported to handle this properly)
+  std::string toReplace("/T//");
+  size_t pos = file_path_str.find(toReplace);
+  file_path_str.replace(pos, toReplace.length(), "/T/");
+
+  ARROW_EXPECT_OK(WriteParquetData(file_path_str, filesystem, table));
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {file_path_str};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            filesystem, std::move(files), 
format, {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto scan_options = std::make_shared<dataset::ScanOptions>();
+  scan_options->projection = compute::project({}, {});
+  const std::string filter_col = "shared";
+  auto filter = compute::equal(compute::field_ref(filter_col), 
compute::literal(3));
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, scan_options};
+  auto filter_node_options = compute::FilterNodeOptions{filter};
+  auto sink_node_options = compute::SinkNodeOptions{&sink_gen};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options, 
"s"});
+  auto filter_declaration = compute::Declaration({"filter", 
filter_node_options, "f"});
+  auto sink_declaration = compute::Declaration({"sink", sink_node_options, 
"e"});
+
+  auto declarations = compute::Declaration::Sequence(
+      {scan_declaration, filter_declaration, sink_declaration});
+
+  ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context));

Review Comment:
   Instead of declaring here the plan which will be populated with 
declarations, please move it into GetTableFromPlan and be explicit about 
finishing the plan's execution there



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1731,5 +1770,265 @@ TEST(Substrait, AggregateWithFilter) {
   ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return 
kNullConsumer; }));
 }
 
+TEST(Substrait, BasicPlanRoundTripping) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  compute::ExecContext exec_context;
+  ExtensionSet ext_set;
+  auto dummy_schema = schema(
+      {field("key", int32()), field("shared", int32()), field("distinct", 
int32())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])",
+                                            R"([
+      [0, 2, 1],
+      [1, 3, 2],
+      [4, 1, 3],
+      [3, 1, 3],
+      [1, 2, 5]
+    ])",
+                                            R"([
+      [2, 2, 12],
+      [5, 3, 12],
+      [1, 3, 12]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "serde_test.parquet";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       
arrow::internal::TemporaryDir::Make("substrait_tempdir"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToString();
+
+  // Note: there is an additional forward slash introduced by the tempdir
+  // it must be replaced to properly load into reading files
+  // TODO: (Review: Jira needs to be reported to handle this properly)
+  std::string toReplace("/T//");
+  size_t pos = file_path_str.find(toReplace);
+  file_path_str.replace(pos, toReplace.length(), "/T/");
+
+  ARROW_EXPECT_OK(WriteParquetData(file_path_str, filesystem, table));
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {file_path_str};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            filesystem, std::move(files), 
format, {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto scan_options = std::make_shared<dataset::ScanOptions>();
+  scan_options->projection = compute::project({}, {});
+  const std::string filter_col = "shared";
+  auto filter = compute::equal(compute::field_ref(filter_col), 
compute::literal(3));
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, scan_options};
+  auto filter_node_options = compute::FilterNodeOptions{filter};
+  auto sink_node_options = compute::SinkNodeOptions{&sink_gen};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options, 
"s"});
+  auto filter_declaration = compute::Declaration({"filter", 
filter_node_options, "f"});
+  auto sink_declaration = compute::Declaration({"sink", sink_node_options, 
"e"});
+
+  auto declarations = compute::Declaration::Sequence(
+      {scan_declaration, filter_declaration, sink_declaration});
+
+  ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context));
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_plan, SerializePlan(declarations, 
&ext_set));
+
+  for (auto sp_ext_id_reg :
+       {std::shared_ptr<ExtensionIdRegistry>(), 
substrait::MakeExtensionIdRegistry()}) {
+    ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+    ExtensionSet ext_set(ext_id_reg);
+    ASSERT_OK_AND_ASSIGN(
+        auto sink_decls,
+        DeserializePlans(
+            *serialized_plan, [] { return kNullConsumer; }, ext_id_reg, 
&ext_set));
+    // filter declaration
+    auto roundtripped_filter = 
sink_decls[0].inputs[0].get<compute::Declaration>();
+    const auto& filter_opts =
+        checked_cast<const 
compute::FilterNodeOptions&>(*(roundtripped_filter->options));
+    auto roundtripped_expr = filter_opts.filter_expression;
+
+    if (auto* call = roundtripped_expr.call()) {
+      EXPECT_EQ(call->function_name, "equal");
+      auto args = call->arguments;
+      auto index = args[0].field_ref()->field_path()->indices()[0];
+      EXPECT_EQ(dummy_schema->field_names()[index], filter_col);
+      EXPECT_EQ(args[1], compute::literal(3));
+    }
+    // scan declaration
+    auto roundtripped_scan = 
roundtripped_filter->inputs[0].get<compute::Declaration>();
+    const auto& dataset_opts =
+        checked_cast<const 
dataset::ScanNodeOptions&>(*(roundtripped_scan->options));
+    const auto& roundripped_ds = dataset_opts.dataset;
+    EXPECT_TRUE(roundripped_ds->schema()->Equals(*dummy_schema));
+    ASSERT_OK_AND_ASSIGN(auto roundtripped_frgs, 
roundripped_ds->GetFragments());
+    ASSERT_OK_AND_ASSIGN(auto expected_frgs, dataset->GetFragments());
+
+    auto roundtrip_frg_vec = IteratorToVector(std::move(roundtripped_frgs));
+    auto expected_frg_vec = IteratorToVector(std::move(expected_frgs));
+    EXPECT_EQ(expected_frg_vec.size(), roundtrip_frg_vec.size());
+    int64_t idx = 0;
+    for (auto fragment : expected_frg_vec) {
+      const auto* l_frag = checked_cast<const 
dataset::FileFragment*>(fragment.get());
+      const auto* r_frag =
+          checked_cast<const 
dataset::FileFragment*>(roundtrip_frg_vec[idx++].get());
+      EXPECT_TRUE(l_frag->Equals(*r_frag));
+    }
+  }
+#endif
+}
+
+TEST(Substrait, BasicPlanRoundTrippingEndToEnd) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else

Review Comment:
   ```suggestion
   #endif
   ```
   Even if the file uri is not supported on windows, it's still worthwhile to 
ensure the below code compiles on all platforms



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to