westonpace commented on a change in pull request #12110:
URL: https://github.com/apache/arrow/pull/12110#discussion_r788045868



##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {

Review comment:
       Nit: This might be slightly more readable if it were just two string 
constants.  For example:
   
   ```
   const std::string kLeftRelationCsvData = R"csv(lkey,shared,ldistinct"
   1,4,7
   2,5,8
   11,20,21
   3,6,9)csv";
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+cp::Expression Materialize(std::vector<std::string> names) {
+  std::vector<cp::Expression> exprs;
+  for (const auto& name : names) {
+    exprs.push_back(cp::field_ref(name));
+  }
+
+  return cp::project(exprs, names);
+}
+
+arrow::Status DoHashJoin() {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::dataset::internal::Initialize();
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  cp::ExecNode* left_source;
+  cp::ExecNode* right_source;
+
+  ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData("l"));
+  ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData("r"));
+
+  auto l_options = std::make_shared<arrow::dataset::ScanOptions>();
+  l_options->projection = Materialize({});  // create empty projection
+
+  auto r_options = std::make_shared<arrow::dataset::ScanOptions>();
+  r_options->projection = Materialize({});  // create empty projection
+
+  // construct the scan node
+  auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, 
l_options};
+  auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, 
r_options};
+
+  ARROW_ASSIGN_OR_RAISE(left_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
l_scan_node_options));
+  ARROW_ASSIGN_OR_RAISE(right_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
r_scan_node_options));
+
+  arrow::compute::HashJoinNodeOptions join_opts{
+      arrow::compute::JoinType::INNER,
+      /*left_keys=*/{"lkey"},
+      /*right_keys=*/{"rkey"},         arrow::compute::literal(true), "_l", 
"_r"};
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto hashjoin,
+      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, 
join_opts));
+
+  ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), 
{hashjoin},
+                                                      
cp::SinkNodeOptions{&sink_gen}));
+  // expected columns l_a, l_b
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
cp::MakeGeneratorReader(
+      hashjoin->output_schema(), std::move(sink_gen), 
exec_context.memory_pool());
+
+  // // validate the ExecPlan
+  ABORT_ON_FAILURE(plan->Validate());
+  // // // start the ExecPlan

Review comment:
       ```suggestion
     // start the ExecPlan
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+cp::Expression Materialize(std::vector<std::string> names) {
+  std::vector<cp::Expression> exprs;
+  for (const auto& name : names) {
+    exprs.push_back(cp::field_ref(name));
+  }
+
+  return cp::project(exprs, names);
+}
+
+arrow::Status DoHashJoin() {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::dataset::internal::Initialize();
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  cp::ExecNode* left_source;
+  cp::ExecNode* right_source;
+
+  ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData("l"));
+  ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData("r"));
+
+  auto l_options = std::make_shared<arrow::dataset::ScanOptions>();
+  l_options->projection = Materialize({});  // create empty projection
+
+  auto r_options = std::make_shared<arrow::dataset::ScanOptions>();
+  r_options->projection = Materialize({});  // create empty projection
+
+  // construct the scan node
+  auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, 
l_options};
+  auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, 
r_options};
+
+  ARROW_ASSIGN_OR_RAISE(left_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
l_scan_node_options));
+  ARROW_ASSIGN_OR_RAISE(right_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
r_scan_node_options));
+
+  arrow::compute::HashJoinNodeOptions join_opts{
+      arrow::compute::JoinType::INNER,
+      /*left_keys=*/{"lkey"},
+      /*right_keys=*/{"rkey"},         arrow::compute::literal(true), "_l", 
"_r"};
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto hashjoin,
+      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, 
join_opts));
+
+  ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), 
{hashjoin},
+                                                      
cp::SinkNodeOptions{&sink_gen}));
+  // expected columns l_a, l_b
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
cp::MakeGeneratorReader(
+      hashjoin->output_schema(), std::move(sink_gen), 
exec_context.memory_pool());
+
+  // // validate the ExecPlan
+  ABORT_ON_FAILURE(plan->Validate());
+  // // // start the ExecPlan
+  ABORT_ON_FAILURE(plan->StartProducing());
+
+  // // // collect sink_reader into a Table

Review comment:
       ```suggestion
     // collect sink_reader into a Table
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+

Review comment:
       ```suggestion
   ```
   
   I don't think this method is used.

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {

Review comment:
       ```suggestion
   std::string GetDataAsCsvString(const std::string& relation) {
   ```

##########
File path: cpp/src/arrow/compute/exec/hash_join_node.cc
##########
@@ -275,30 +275,56 @@ Status HashJoinSchema::ValidateSchemas(JoinType 
join_type, const Schema& left_sc
 }
 
 std::shared_ptr<Schema> HashJoinSchema::MakeOutputSchema(
-    const std::string& left_field_name_prefix,
-    const std::string& right_field_name_prefix) {
+    const std::string& left_field_name_suffix,
+    const std::string& right_field_name_suffix) {
   std::vector<std::shared_ptr<Field>> fields;
   int left_size = proj_maps[0].num_cols(HashJoinProjection::OUTPUT);
   int right_size = proj_maps[1].num_cols(HashJoinProjection::OUTPUT);
   fields.resize(left_size + right_size);
 
-  for (int i = 0; i < left_size + right_size; ++i) {
-    bool is_left = (i < left_size);
-    int side = (is_left ? 0 : 1);
-    int input_field_id = proj_maps[side]
-                             .map(HashJoinProjection::OUTPUT, 
HashJoinProjection::INPUT)
-                             .get(is_left ? i : i - left_size);
+  std::unordered_multimap<std::string, int> left_field_map;

Review comment:
       At first I was a little confused why you used a `multimap` instead of a 
`set`.  Then I remembered that we might have multiple columns with the same 
name.
   
   However, in order for that case to work, you would need to use `equal_range` 
instead of `find` down below when you actually search for the field.
   
   In fact, I don't think we have any test cases for this in the hash join 
tests (and I'm not 100% sure we'd completely support it elsewhere) but it might 
be an interesting test case.
   
   Either way, can you change the below "finding" code to use `equal_range` 
instead of `find`?

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {

Review comment:
       ```suggestion
       const std::string& relation) {
   ```

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -245,9 +245,9 @@ class ARROW_EXPORT HashJoinNodeOptions : public 
ExecNodeOptions {
   // prefix added to names of output fields coming from left input (used to 
distinguish,
   // if necessary, between fields of the same name in left and right input and 
can be left
   // empty if there are no name collisions)
-  std::string output_prefix_for_left;
+  std::string output_suffix_for_left;
   // prefix added to names of output fields coming from right input

Review comment:
       ```suggestion
     // suffix added to names of output fields coming from right input
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;

Review comment:
       ```suggestion
     const std::shared_ptr<arrow::csv::TableReader>& reader = table_reader;
   ```
   
   Do you need this alias?  It is only used in one spot below and 
`table_reader` is a pretty reasonable name already

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+cp::Expression Materialize(std::vector<std::string> names) {

Review comment:
       ```suggestion
   cp::Expression Materialize(const std::vector<std::string>& names) {
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();

Review comment:
       ```suggestion
     const arrow::io::IOContext& io_context = arrow::io::default_io_context();
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+cp::Expression Materialize(std::vector<std::string> names) {
+  std::vector<cp::Expression> exprs;
+  for (const auto& name : names) {
+    exprs.push_back(cp::field_ref(name));
+  }
+
+  return cp::project(exprs, names);
+}
+
+arrow::Status DoHashJoin() {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::dataset::internal::Initialize();
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  cp::ExecNode* left_source;
+  cp::ExecNode* right_source;
+
+  ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData("l"));
+  ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData("r"));
+
+  auto l_options = std::make_shared<arrow::dataset::ScanOptions>();
+  l_options->projection = Materialize({});  // create empty projection
+
+  auto r_options = std::make_shared<arrow::dataset::ScanOptions>();
+  r_options->projection = Materialize({});  // create empty projection
+
+  // construct the scan node
+  auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, 
l_options};
+  auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, 
r_options};
+
+  ARROW_ASSIGN_OR_RAISE(left_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
l_scan_node_options));
+  ARROW_ASSIGN_OR_RAISE(right_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
r_scan_node_options));
+
+  arrow::compute::HashJoinNodeOptions join_opts{
+      arrow::compute::JoinType::INNER,
+      /*left_keys=*/{"lkey"},
+      /*right_keys=*/{"rkey"},         arrow::compute::literal(true), "_l", 
"_r"};
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto hashjoin,
+      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, 
join_opts));
+
+  ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), 
{hashjoin},
+                                                      
cp::SinkNodeOptions{&sink_gen}));
+  // expected columns l_a, l_b
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
cp::MakeGeneratorReader(
+      hashjoin->output_schema(), std::move(sink_gen), 
exec_context.memory_pool());
+
+  // // validate the ExecPlan

Review comment:
       ```suggestion
     // validate the ExecPlan
   ```

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -245,9 +245,9 @@ class ARROW_EXPORT HashJoinNodeOptions : public 
ExecNodeOptions {
   // prefix added to names of output fields coming from left input (used to 
distinguish,

Review comment:
       ```suggestion
     // suffix added to names of output fields coming from left input (used to 
distinguish,
   ```

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+cp::Expression Materialize(std::vector<std::string> names) {

Review comment:
       `Materialize` is a big of a confusing name.  Perhaps 
`MakeDefaultProjection` would be better?  Ideally this entire method will go 
away at some point in the future as the default projection shouldn't really 
need to be specified.
   
   Also, why take a vector of names as an argument when you only ever pass an 
empty vector?  It might be simpler to get rid of this function entirely and 
just call `cp::project({}, {})`
   
   Also, there should be a comment somewhere explaining that passing in an 
empty vector of names/expressions to `cp::project` will result in a "default" 
projection where each field is mapped to a field_ref.  That comment may belong 
on `cp::project`'s docs though.

##########
File path: cpp/examples/arrow/join_example.cc
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetDataAsCsvString(std::string relation) {
+  std::string data_str = "";
+  if (relation == "l") {
+    data_str = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+  } else if (relation == "r") {
+    data_str = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+  } else {
+    return data_str;
+  }
+  return data_str;
+}
+
+arrow::Result<std::shared_ptr<arrow::Table>> GetTableFromExecBatches(
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::vector<arrow::compute::ExecBatch>& exec_batches) {
+  arrow::RecordBatchVector batches;
+  for (const auto& batch : exec_batches) {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
+    batches.push_back(std::move(rb));
+  }
+  return arrow::Table::FromRecordBatches(schema, batches);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData(
+    std::string relation) {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString(relation);
+  std::cout << "CSV DATA : " << relation << std::endl;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+cp::Expression Materialize(std::vector<std::string> names) {
+  std::vector<cp::Expression> exprs;
+  for (const auto& name : names) {
+    exprs.push_back(cp::field_ref(name));
+  }
+
+  return cp::project(exprs, names);
+}
+
+arrow::Status DoHashJoin() {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::dataset::internal::Initialize();
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  cp::ExecNode* left_source;
+  cp::ExecNode* right_source;
+
+  ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData("l"));
+  ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData("r"));
+
+  auto l_options = std::make_shared<arrow::dataset::ScanOptions>();
+  l_options->projection = Materialize({});  // create empty projection
+
+  auto r_options = std::make_shared<arrow::dataset::ScanOptions>();
+  r_options->projection = Materialize({});  // create empty projection
+
+  // construct the scan node
+  auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, 
l_options};
+  auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, 
r_options};
+
+  ARROW_ASSIGN_OR_RAISE(left_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
l_scan_node_options));
+  ARROW_ASSIGN_OR_RAISE(right_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
r_scan_node_options));
+
+  arrow::compute::HashJoinNodeOptions join_opts{
+      arrow::compute::JoinType::INNER,
+      /*left_keys=*/{"lkey"},
+      /*right_keys=*/{"rkey"},         arrow::compute::literal(true), "_l", 
"_r"};

Review comment:
       ```suggestion
         /*in_left_keys=*/{"lkey"},
         /*in_right_keys=*/{"rkey"},         arrow::compute::literal(true), 
"_l", "_r"};
   ```
   Also, if you're going to specify the names for these parameters you should 
probably specify the names for the rest.  None of them are intuitive.

##########
File path: cpp/src/arrow/compute/exec/hash_join_node_test.cc
##########
@@ -937,6 +937,73 @@ void HashJoinWithExecPlan(Random64Bit& rng, bool parallel,
   ASSERT_OK_AND_ASSIGN(*output, TableFromExecBatches(output_schema, res));
 }
 
+TEST(HashJoin, Suffix) {
+  BatchesWithSchema input_left;
+  input_left.batches = {ExecBatchFromJSON({int32(), int32(), int32()}, R"([
+                   [1, 4, 7],
+                   [2, 5, 8],
+                   [3, 6, 9]
+                 ])")};
+  input_left.schema = schema(
+      {field("lkey", int32()), field("shared", int32()), field("ldistinct", 
int32())});
+
+  BatchesWithSchema input_right;
+  input_right.batches = {ExecBatchFromJSON({int32(), int32(), int32()}, R"([
+                   [1, 10, 13],
+                   [2, 11, 14],
+                   [3, 12, 15]
+                 ])")};
+  input_right.schema = schema(
+      {field("rkey", int32()), field("shared", int32()), field("rdistinct", 
int32())});
+
+  BatchesWithSchema expected;
+  expected.batches = {
+      ExecBatchFromJSON({int32(), int32(), int32(), int32(), int32(), 
int32()}, R"([
+    [1, 4, 7, 1, 10, 13],
+    [2, 5, 8, 2, 11, 14],
+    [3, 6, 9, 3, 12, 15]
+  ])")};
+
+  expected.schema = schema({field("lkey", int32()), field("shared_l", int32()),
+                            field("ldistinct", int32()), field("rkey", 
int32()),
+                            field("shared_r", int32()), field("rdistinct", 
int32())});
+
+  ExecContext exec_ctx;
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  ExecNode* left_source;
+  ExecNode* right_source;
+  ASSERT_OK_AND_ASSIGN(
+      left_source,
+      MakeExecNode("source", plan.get(), {},
+                   SourceNodeOptions{input_left.schema, 
input_left.gen(/*parallel=*/false,
+                                                                       
/*slow=*/false)}));
+
+  ASSERT_OK_AND_ASSIGN(right_source,
+                       MakeExecNode("source", plan.get(), {},
+                                    SourceNodeOptions{input_right.schema,
+                                                      
input_right.gen(/*parallel=*/false,
+                                                                      
/*slow=*/false)}))
+
+  HashJoinNodeOptions join_opts{JoinType::INNER,
+                                /*left_keys=*/{"lkey"},
+                                /*right_keys=*/{"rkey"}, literal(true), "_l", 
"_r"};

Review comment:
       ```suggestion
                                   /*in_left_keys=*/{"lkey"},
                                   /*in_right_keys=*/{"rkey"}, literal(true), 
"_l", "_r"};
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to