westonpace commented on a change in pull request #11426:
URL: https://github.com/apache/arrow/pull/11426#discussion_r739573619



##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/compute/exec.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/make_unique.h"
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <unordered_map>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/feather.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
+#include "arrow/io/file.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = 
{ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kCPULevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kGPULevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return 
MemoryLevelName(memory_level_); }
+
+class CPUDataHolder : public DataHolder {
+ public:
+  explicit CPUDataHolder(const std::shared_ptr<RecordBatch>& record_batch)
+      : DataHolder(MemoryLevel::kCPULevel), 
record_batch_(std::move(record_batch)) {}
+
+  Result<ExecBatch> Get() override { return ExecBatch(*record_batch_); }
+
+ private:
+  std::shared_ptr<RecordBatch> record_batch_;
+};
+
+namespace {
+
+std::string RandomString(std::size_t length) {
+  const std::string characters =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution(0, characters.size() - 1);
+  std::string random_string;
+  for (std::size_t i = 0; i < length; ++i) {
+    random_string += characters[distribution(generator)];
+  }
+  return random_string;
+}
+
+}  // namespace
+
+Status StoreRecordBatch(const std::shared_ptr<RecordBatch>& record_batch,
+                        const std::shared_ptr<fs::FileSystem>& filesystem,
+                        const std::string& file_path) {
+  auto output = filesystem->OpenOutputStream(file_path).ValueOrDie();
+  auto writer =
+      arrow::ipc::MakeFileWriter(output.get(), 
record_batch->schema()).ValueOrDie();
+  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+  return writer->Close();
+}
+Result<std::shared_ptr<RecordBatch>> RecoverRecordBatch(
+    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& 
file_path) {
+  ARROW_ASSIGN_OR_RAISE(auto input, filesystem->OpenInputFile(file_path));
+  ARROW_ASSIGN_OR_RAISE(auto reader, arrow::ipc::feather::Reader::Open(input));
+  std::shared_ptr<Table> table;
+  ARROW_RETURN_NOT_OK(reader->Read(&table));
+  TableBatchReader batch_iter(*table);
+  ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next());
+  return batch;
+}
+
+class DiskDataHolder : public DataHolder {
+ public:
+  DiskDataHolder(const std::shared_ptr<RecordBatch>& record_batch,
+                 MemoryPool* memory_pool)
+      : DataHolder(MemoryLevel::kDiskLevel), memory_pool_(memory_pool) {
+    std::string root_path;
+    std::string file_name = "data-holder-temp-" + RandomString(64) + 
".feather";
+
+    filesystem_ =
+        arrow::fs::FileSystemFromUri(cache_storage_root_path, 
&root_path).ValueOrDie();
+
+    file_path_ = root_path + file_name;
+    status_ = StoreRecordBatch(record_batch, filesystem_, file_path_);
+  }
+
+  Result<ExecBatch> Get() override {
+    ARROW_RETURN_NOT_OK(status_);
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, RecoverRecordBatch(filesystem_, 
file_path_));
+    return ExecBatch(*record_batch);
+  }
+
+ private:
+  std::string file_path_;
+  Status status_;
+  MemoryPool* memory_pool_;
+  std::shared_ptr<arrow::fs::FileSystem> filesystem_;
+  const std::string cache_storage_root_path = "file:///tmp/";
+};
+
+class MemoryResources::MemoryResourcesImpl {
+ public:
+  Status AddMemoryResource(std::unique_ptr<MemoryResource> resource) {
+    std::lock_guard<std::mutex> mutation_guard(lock_);

Review comment:
       This might be a bit of a premature optimization.  I don't think 
`AddMemoryResource` is going to be called a lot.  It might be simpler to do 
away with the locking (and the mutex) and then you could even get rid of the 
pimpl at that point.

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();
+
+      auto memory_resource_result = resources->memory_resource(id);
+      if (memory_resource_result.ok()) {
+        auto memory_resource = memory_resource_result.ValueOrDie();
+        auto memory_to_use = memory_resource->memory_used();

Review comment:
       Nit: Name this variable `memory_used`?

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();

Review comment:
       Maybe it would make more sense for `MemoryResources` to return a vector 
of `MemoryResource` instances so you could iterate over that.

##########
File path: cpp/src/arrow/compute/exec/data_holder_node_test.cc
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <random>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+
+using testing::UnorderedElementsAreArray;
+
+namespace arrow {
+namespace compute {
+
+struct TestDataHolderNode : public ::testing::Test {
+  static constexpr int kNumBatches = 10;
+
+  TestDataHolderNode() : rng_(0) {}
+
+  std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+    static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+        arrow::null(),    arrow::boolean(), arrow::int8(),    arrow::int16(),
+        arrow::int32(),   arrow::int64(),   arrow::float16(), arrow::float32(),
+        arrow::float64(), arrow::utf8(),    arrow::binary(),  arrow::date32()};
+
+    std::vector<std::shared_ptr<Field>> fields(num_inputs);
+    std::default_random_engine gen(42);
+    std::uniform_int_distribution<int> types_dist(
+        0, static_cast<int>(some_arrow_types.size()) - 1);
+    for (size_t i = 0; i < num_inputs; i++) {
+      int random_index = types_dist(gen);
+      auto col_type = some_arrow_types.at(random_index);
+      fields[i] =
+          field("column_" + std::to_string(i) + "_" + col_type->ToString(), 
col_type);
+    }
+    return schema(fields);
+  }
+
+  void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+                                 size_t num_batches, BatchesWithSchema* 
out_batches,
+                                 int multiplicity = 1, int64_t batch_size = 4) 
{
+    if (num_batches == 0) {
+      auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+      out_batches->batches.push_back(empty_record_batch);
+    } else {
+      for (size_t j = 0; j < num_batches; j++) {
+        out_batches->batches.push_back(
+            ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+      }
+    }
+
+    size_t batch_count = out_batches->batches.size();
+    for (int repeat = 1; repeat < multiplicity; ++repeat) {
+      for (size_t i = 0; i < batch_count; ++i) {
+        out_batches->batches.push_back(out_batches->batches[i]);
+      }
+    }
+    out_batches->schema = schema;
+  }
+
+  void CheckRunOutput(const std::vector<BatchesWithSchema>& batches,
+                      const BatchesWithSchema& exp_batches) {
+    ExecContext exec_context(default_memory_pool(),
+                             ::arrow::internal::GetCpuThreadPool());
+
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+
+    Declaration union_decl{"union", ExecNodeOptions{}};
+
+    for (const auto& batch : batches) {
+      union_decl.inputs.emplace_back(Declaration{
+          "source", SourceNodeOptions{batch.schema, 
batch.gen(/*parallel=*/true,
+                                                              
/*slow=*/false)}});
+    }
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    if (batches.size() == 0) {
+      ASSERT_RAISES(Invalid, Declaration::Sequence({union_decl,
+                                                    {"data_holder", 
ExecNodeOptions{}},
+                                                    {"sink", 
SinkNodeOptions{&sink_gen}}})
+                                 .AddToPlan(plan.get()));
+      return;
+    } else {
+      ASSERT_OK(Declaration::Sequence({union_decl,
+                                       {"data_holder", ExecNodeOptions{}},
+                                       {"sink", SinkNodeOptions{&sink_gen}}})
+                    .AddToPlan(plan.get()));
+    }
+    Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(), 
sink_gen);
+
+    auto expected_matcher =
+        Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
+    ASSERT_THAT(actual, expected_matcher);
+  }
+
+  void CheckDataHolderExecNode(size_t num_input_nodes, size_t num_batches) {
+    auto random_schema = GenerateRandomSchema(num_input_nodes);
+
+    std::vector<std::shared_ptr<RecordBatch>> all_record_batches;
+    std::vector<BatchesWithSchema> input_batches(num_input_nodes);
+    BatchesWithSchema exp_batches;
+    exp_batches.schema = random_schema;
+    for (size_t i = 0; i < num_input_nodes; i++) {
+      GenerateBatchesFromSchema(random_schema, num_batches, &input_batches[i]);
+      for (const auto& batch : input_batches[i].batches) {
+        exp_batches.batches.push_back(batch);
+      }
+    }
+    CheckRunOutput(input_batches, exp_batches);
+  }
+
+  ::arrow::random::RandomArrayGenerator rng_;
+};
+
+TEST_F(TestDataHolderNode, TestNonEmpty) {

Review comment:
       This will eventually need tests that cover the case where limits are hit 
and batches are actually persisted.

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();
+
+      auto memory_resource_result = resources->memory_resource(id);
+      if (memory_resource_result.ok()) {
+        auto memory_resource = memory_resource_result.ValueOrDie();
+        auto memory_to_use = memory_resource->memory_used();
+        if (memory_to_use < memory_resource->memory_limit()) {
+          ARROW_ASSIGN_OR_RAISE(auto data_holder, 
memory_resource->GetDataHolder(batch));
+          this->producer_.Push(std::move(data_holder));
+          break;
+        }
+      }
+    }
+    return Status::OK();
+  }
+  AsyncGenerator<std::shared_ptr<DataHolder>> generator() { return gen_; }
+
+ public:
+  ExecContext* context_;
+  PushGenerator<std::shared_ptr<DataHolder>> gen_;
+  PushGenerator<std::shared_ptr<DataHolder>>::Producer producer_;
+};
+
+class DataHolderNode : public ExecNode {
+ public:
+  DataHolderNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> 
input_labels,
+                 std::shared_ptr<Schema> output_schema, int num_outputs)
+      : ExecNode(plan, std::move(inputs), input_labels, 
std::move(output_schema),
+                 /*num_outputs=*/num_outputs) {
+    executor_ = plan->exec_context()->executor();
+
+    data_holder_manager_ =
+        
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());
+
+    auto status = task_group_.AddTask([this]() -> Result<Future<>> {
+      ARROW_DCHECK(executor_ != nullptr);
+      return executor_->Submit(this->stop_source_.token(), [this] {
+        auto generator = this->data_holder_manager_->generator();
+        auto iterator = MakeGeneratorIterator(std::move(generator));
+        while (true) {
+          ARROW_ASSIGN_OR_RAISE(auto result, iterator.Next());
+          if (IsIterationEnd(result)) {
+            break;
+          }
+          ARROW_ASSIGN_OR_RAISE(ExecBatch batch, result->Get());
+          this->outputs_[0]->InputReceived(this, batch);
+        }
+        return Status::OK();
+      });
+    });
+    if (!status.ok()) {
+      if (input_counter_.Cancel()) {
+        this->Finish(status);
+      }
+      inputs_[0]->StopProducing(this);
+    }
+  }
+
+  void ErrorReceived(ExecNode* input, Status error) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->ErrorReceived(this, std::move(error));
+  }
+
+  void InputFinished(ExecNode* input, int total_batches) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->InputFinished(this, total_batches);
+    if (input_counter_.SetTotal(total_batches)) {
+      this->Finish();
+    }
+  }
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    auto schema = inputs[0]->output_schema();
+    return plan->EmplaceNode<DataHolderNode>(plan, std::move(inputs),
+                                             
std::vector<std::string>{"target"},
+                                             std::move(schema), 
/*num_outputs=*/1);
+  }
+
+  const char* kind_name() const override { return "DataHolderNode"; }
+
+  void InputReceived(ExecNode* input, ExecBatch batch) override {
+    if (finished_.is_finished()) {
+      return;
+    }
+    auto status = task_group_.AddTask([this, batch]() -> Result<Future<>> {
+      return this->executor_->Submit(this->stop_source_.token(), [this, 
batch]() {
+        auto pool = this->plan()->exec_context()->memory_pool();
+        ARROW_ASSIGN_OR_RAISE(auto record_batch,
+                              batch.ToRecordBatch(this->output_schema(), 
pool));

Review comment:
       Converting every `ExecBatch` to `RecordBatch` sort of defeats the 
purpose of having `ExecBatch` in the first place.  Ideally this would only have 
to happen if we needed to store to disk.

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+#include <iterator>
+#include <memory>
+#include <string>
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGPULevel, kCPULevel, kDiskLevel };

Review comment:
       I think these should be kGpuLevel and kCpuLevel according to the style 
guide (not that we are terribly consistent):
   
   https://google.github.io/styleguide/cppguide.html#General_Naming_Rules

##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/compute/exec.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/make_unique.h"
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <unordered_map>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/feather.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
+#include "arrow/io/file.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = 
{ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kCPULevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kGPULevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return 
MemoryLevelName(memory_level_); }
+
+class CPUDataHolder : public DataHolder {
+ public:
+  explicit CPUDataHolder(const std::shared_ptr<RecordBatch>& record_batch)
+      : DataHolder(MemoryLevel::kCPULevel), 
record_batch_(std::move(record_batch)) {}
+
+  Result<ExecBatch> Get() override { return ExecBatch(*record_batch_); }
+
+ private:
+  std::shared_ptr<RecordBatch> record_batch_;
+};
+
+namespace {
+
+std::string RandomString(std::size_t length) {
+  const std::string characters =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution(0, characters.size() - 1);
+  std::string random_string;
+  for (std::size_t i = 0; i < length; ++i) {
+    random_string += characters[distribution(generator)];
+  }
+  return random_string;
+}
+
+}  // namespace
+
+Status StoreRecordBatch(const std::shared_ptr<RecordBatch>& record_batch,
+                        const std::shared_ptr<fs::FileSystem>& filesystem,
+                        const std::string& file_path) {
+  auto output = filesystem->OpenOutputStream(file_path).ValueOrDie();
+  auto writer =
+      arrow::ipc::MakeFileWriter(output.get(), 
record_batch->schema()).ValueOrDie();
+  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+  return writer->Close();
+}
+Result<std::shared_ptr<RecordBatch>> RecoverRecordBatch(
+    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& 
file_path) {
+  ARROW_ASSIGN_OR_RAISE(auto input, filesystem->OpenInputFile(file_path));
+  ARROW_ASSIGN_OR_RAISE(auto reader, arrow::ipc::feather::Reader::Open(input));
+  std::shared_ptr<Table> table;
+  ARROW_RETURN_NOT_OK(reader->Read(&table));
+  TableBatchReader batch_iter(*table);
+  ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next());
+  return batch;
+}
+
+class DiskDataHolder : public DataHolder {
+ public:
+  DiskDataHolder(const std::shared_ptr<RecordBatch>& record_batch,
+                 MemoryPool* memory_pool)
+      : DataHolder(MemoryLevel::kDiskLevel), memory_pool_(memory_pool) {
+    std::string root_path;
+    std::string file_name = "data-holder-temp-" + RandomString(64) + 
".feather";
+
+    filesystem_ =
+        arrow::fs::FileSystemFromUri(cache_storage_root_path, 
&root_path).ValueOrDie();
+
+    file_path_ = root_path + file_name;
+    status_ = StoreRecordBatch(record_batch, filesystem_, file_path_);
+  }
+
+  Result<ExecBatch> Get() override {
+    ARROW_RETURN_NOT_OK(status_);
+    ARROW_ASSIGN_OR_RAISE(auto record_batch, RecoverRecordBatch(filesystem_, 
file_path_));
+    return ExecBatch(*record_batch);
+  }
+
+ private:
+  std::string file_path_;
+  Status status_;
+  MemoryPool* memory_pool_;
+  std::shared_ptr<arrow::fs::FileSystem> filesystem_;
+  const std::string cache_storage_root_path = "file:///tmp/";
+};
+
+class MemoryResources::MemoryResourcesImpl {
+ public:
+  Status AddMemoryResource(std::unique_ptr<MemoryResource> resource) {
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+    auto level = resource->memory_level();
+    auto it = stats_.find(level);
+    if (it != stats_.end()) {
+      return Status::KeyError("Already have a resource type registered with 
name: ",
+                              resource->ToString());
+    }
+    stats_[level] = std::move(resource);
+    return Status::OK();
+  }
+
+  size_t size() const { return stats_.size(); }
+
+  Result<int64_t> memory_limit(MemoryLevel level) const {
+    auto it = stats_.find(level);
+    if (it == stats_.end()) {
+      return Status::KeyError("No memory resource registered with level: ",
+                              MemoryLevelName(level));
+    }
+    return it->second->memory_limit();
+  }
+
+  Result<int64_t> memory_used(MemoryLevel level) const {
+    auto it = stats_.find(level);
+    if (it == stats_.end()) {
+      return Status::KeyError("No memory resource registered with level: ",
+                              MemoryLevelName(level));
+    }
+    return it->second->memory_used();
+  }
+
+  Result<MemoryResource*> memory_resource(MemoryLevel level) const {
+    auto it = stats_.find(level);
+    if (it == stats_.end()) {
+      return Status::KeyError("No memory resource registered with level: ",
+                              MemoryLevelName(level));
+    }
+    return it->second.get();
+  }
+
+ private:
+  std::mutex lock_;
+
+  std::unordered_map<MemoryLevel, std::unique_ptr<MemoryResource>> stats_;

Review comment:
       Given that `MemoryLevel` is an enum could this just be a fixed array?

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();
+
+      auto memory_resource_result = resources->memory_resource(id);
+      if (memory_resource_result.ok()) {
+        auto memory_resource = memory_resource_result.ValueOrDie();
+        auto memory_to_use = memory_resource->memory_used();
+        if (memory_to_use < memory_resource->memory_limit()) {

Review comment:
       What happens if this is false?  It seems to me the batch would be lost.

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();
+
+      auto memory_resource_result = resources->memory_resource(id);
+      if (memory_resource_result.ok()) {
+        auto memory_resource = memory_resource_result.ValueOrDie();
+        auto memory_to_use = memory_resource->memory_used();
+        if (memory_to_use < memory_resource->memory_limit()) {
+          ARROW_ASSIGN_OR_RAISE(auto data_holder, 
memory_resource->GetDataHolder(batch));
+          this->producer_.Push(std::move(data_holder));
+          break;
+        }
+      }
+    }
+    return Status::OK();
+  }
+  AsyncGenerator<std::shared_ptr<DataHolder>> generator() { return gen_; }
+
+ public:
+  ExecContext* context_;
+  PushGenerator<std::shared_ptr<DataHolder>> gen_;
+  PushGenerator<std::shared_ptr<DataHolder>>::Producer producer_;
+};
+
+class DataHolderNode : public ExecNode {
+ public:
+  DataHolderNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> 
input_labels,
+                 std::shared_ptr<Schema> output_schema, int num_outputs)
+      : ExecNode(plan, std::move(inputs), input_labels, 
std::move(output_schema),
+                 /*num_outputs=*/num_outputs) {
+    executor_ = plan->exec_context()->executor();
+
+    data_holder_manager_ =
+        
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());
+
+    auto status = task_group_.AddTask([this]() -> Result<Future<>> {
+      ARROW_DCHECK(executor_ != nullptr);
+      return executor_->Submit(this->stop_source_.token(), [this] {
+        auto generator = this->data_holder_manager_->generator();
+        auto iterator = MakeGeneratorIterator(std::move(generator));
+        while (true) {
+          ARROW_ASSIGN_OR_RAISE(auto result, iterator.Next());
+          if (IsIterationEnd(result)) {
+            break;
+          }
+          ARROW_ASSIGN_OR_RAISE(ExecBatch batch, result->Get());
+          this->outputs_[0]->InputReceived(this, batch);
+        }
+        return Status::OK();
+      });
+    });
+    if (!status.ok()) {
+      if (input_counter_.Cancel()) {
+        this->Finish(status);
+      }
+      inputs_[0]->StopProducing(this);
+    }

Review comment:
       It's a bit odd to be calling these methods in the constructor.  
Admittedly it's a rather odd error case (AddTask probably shouldn't fail unless 
maybe the underlying thread pool has been shutdown).  I'm not sure if this will 
work or if the next call to `StopProducing` will clear things out.

##########
File path: cpp/src/arrow/compute/exec.h
##########
@@ -78,6 +80,11 @@ class ARROW_EXPORT ExecContext {
   /// registry provided by GetFunctionRegistry.
   FunctionRegistry* func_registry() const { return func_registry_; }
 
+  /// \brief The MemoryResources for looking up memory resources by memory 
level
+  /// and getting data holders to enable out of core processing. Defaults to 
the
+  /// library-global function registry provided by GetMemoryResources.

Review comment:
       ```suggestion
     /// and getting data holders to enable out of core processing. Defaults to 
the
     /// instance provided by GetMemoryResources.
   ```

##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/compute/exec.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/make_unique.h"
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <unordered_map>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/feather.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
+#include "arrow/io/file.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = 
{ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kCPULevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kGPULevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return 
MemoryLevelName(memory_level_); }
+
+class CPUDataHolder : public DataHolder {
+ public:
+  explicit CPUDataHolder(const std::shared_ptr<RecordBatch>& record_batch)
+      : DataHolder(MemoryLevel::kCPULevel), 
record_batch_(std::move(record_batch)) {}
+
+  Result<ExecBatch> Get() override { return ExecBatch(*record_batch_); }
+
+ private:
+  std::shared_ptr<RecordBatch> record_batch_;
+};
+
+namespace {
+
+std::string RandomString(std::size_t length) {
+  const std::string characters =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution(0, characters.size() - 1);
+  std::string random_string;
+  for (std::size_t i = 0; i < length; ++i) {
+    random_string += characters[distribution(generator)];
+  }
+  return random_string;
+}

Review comment:
       This probably fits better in a utility class somewhere so it can be 
shared.  It's pretty similar to the existing function in `bloom_filter_test.cc`

##########
File path: cpp/src/arrow/compute/exec/data_holder_node.cc
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+class DataHolderManager {
+ public:
+  DataHolderManager(ExecContext* context)
+      : context_(context), gen_(), producer_(gen_.producer()) {}
+
+  Status Push(const std::shared_ptr<RecordBatch>& batch) {
+    static const MemoryLevel all_memory_levels[] = {
+        MemoryLevel::kGPULevel, MemoryLevel::kCPULevel, 
MemoryLevel::kDiskLevel};
+
+    for (auto id : all_memory_levels) {
+      auto resources = context_->memory_resources();
+
+      auto memory_resource_result = resources->memory_resource(id);
+      if (memory_resource_result.ok()) {
+        auto memory_resource = memory_resource_result.ValueOrDie();
+        auto memory_to_use = memory_resource->memory_used();
+        if (memory_to_use < memory_resource->memory_limit()) {
+          ARROW_ASSIGN_OR_RAISE(auto data_holder, 
memory_resource->GetDataHolder(batch));
+          this->producer_.Push(std::move(data_holder));
+          break;
+        }
+      }
+    }
+    return Status::OK();
+  }
+  AsyncGenerator<std::shared_ptr<DataHolder>> generator() { return gen_; }
+
+ public:
+  ExecContext* context_;
+  PushGenerator<std::shared_ptr<DataHolder>> gen_;
+  PushGenerator<std::shared_ptr<DataHolder>>::Producer producer_;
+};
+
+class DataHolderNode : public ExecNode {
+ public:
+  DataHolderNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> 
input_labels,
+                 std::shared_ptr<Schema> output_schema, int num_outputs)
+      : ExecNode(plan, std::move(inputs), input_labels, 
std::move(output_schema),
+                 /*num_outputs=*/num_outputs) {
+    executor_ = plan->exec_context()->executor();
+
+    data_holder_manager_ =
+        
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());
+
+    auto status = task_group_.AddTask([this]() -> Result<Future<>> {
+      ARROW_DCHECK(executor_ != nullptr);
+      return executor_->Submit(this->stop_source_.token(), [this] {
+        auto generator = this->data_holder_manager_->generator();
+        auto iterator = MakeGeneratorIterator(std::move(generator));

Review comment:
       This won't work.  You need to avoid blocking on the CPU thread pool.  
This basically takes one of the CPU pool's threads and dedicates it to pulling 
from the data holder output.  If the data holder fills up then this thread will 
be a useless thread and since our thread pool is a fixed thread pool this is a 
bad thing.  Even if the data holder hasn't filled up this thread will spend a 
lot of time idle if the scanner is slow.

##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/compute/exec.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/make_unique.h"
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <unordered_map>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/feather.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
+#include "arrow/io/file.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = 
{ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kCPULevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kGPULevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return 
MemoryLevelName(memory_level_); }
+
+class CPUDataHolder : public DataHolder {
+ public:
+  explicit CPUDataHolder(const std::shared_ptr<RecordBatch>& record_batch)
+      : DataHolder(MemoryLevel::kCPULevel), 
record_batch_(std::move(record_batch)) {}
+
+  Result<ExecBatch> Get() override { return ExecBatch(*record_batch_); }
+
+ private:
+  std::shared_ptr<RecordBatch> record_batch_;
+};
+
+namespace {
+
+std::string RandomString(std::size_t length) {
+  const std::string characters =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution(0, characters.size() - 1);
+  std::string random_string;
+  for (std::size_t i = 0; i < length; ++i) {
+    random_string += characters[distribution(generator)];
+  }
+  return random_string;
+}
+
+}  // namespace
+
+Status StoreRecordBatch(const std::shared_ptr<RecordBatch>& record_batch,
+                        const std::shared_ptr<fs::FileSystem>& filesystem,
+                        const std::string& file_path) {
+  auto output = filesystem->OpenOutputStream(file_path).ValueOrDie();
+  auto writer =
+      arrow::ipc::MakeFileWriter(output.get(), 
record_batch->schema()).ValueOrDie();
+  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+  return writer->Close();
+}
+Result<std::shared_ptr<RecordBatch>> RecoverRecordBatch(
+    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& 
file_path) {
+  ARROW_ASSIGN_OR_RAISE(auto input, filesystem->OpenInputFile(file_path));
+  ARROW_ASSIGN_OR_RAISE(auto reader, arrow::ipc::feather::Reader::Open(input));
+  std::shared_ptr<Table> table;
+  ARROW_RETURN_NOT_OK(reader->Read(&table));
+  TableBatchReader batch_iter(*table);
+  ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next());
+  return batch;
+}
+
+class DiskDataHolder : public DataHolder {

Review comment:
       Nit: These `DataHolder` implementations might fit better elsewhere.  I'm 
not sure we want the `compute` module to depend on the `ipc` module.

##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+#include "arrow/compute/exec.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+#include "arrow/util/make_unique.h"
+
+#include <memory>
+#include <mutex>
+#include <random>
+#include <unordered_map>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/feather.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
+#include "arrow/io/file.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = 
{ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kCPULevel),
+                                           
ARROW_STRINGIFY(MemoryLevel::kGPULevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return 
MemoryLevelName(memory_level_); }
+
+class CPUDataHolder : public DataHolder {
+ public:
+  explicit CPUDataHolder(const std::shared_ptr<RecordBatch>& record_batch)
+      : DataHolder(MemoryLevel::kCPULevel), 
record_batch_(std::move(record_batch)) {}
+
+  Result<ExecBatch> Get() override { return ExecBatch(*record_batch_); }
+
+ private:
+  std::shared_ptr<RecordBatch> record_batch_;
+};
+
+namespace {
+
+std::string RandomString(std::size_t length) {
+  const std::string characters =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution(0, characters.size() - 1);
+  std::string random_string;
+  for (std::size_t i = 0; i < length; ++i) {
+    random_string += characters[distribution(generator)];
+  }
+  return random_string;
+}
+
+}  // namespace
+
+Status StoreRecordBatch(const std::shared_ptr<RecordBatch>& record_batch,
+                        const std::shared_ptr<fs::FileSystem>& filesystem,
+                        const std::string& file_path) {
+  auto output = filesystem->OpenOutputStream(file_path).ValueOrDie();
+  auto writer =
+      arrow::ipc::MakeFileWriter(output.get(), 
record_batch->schema()).ValueOrDie();
+  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+  return writer->Close();
+}
+Result<std::shared_ptr<RecordBatch>> RecoverRecordBatch(
+    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& 
file_path) {
+  ARROW_ASSIGN_OR_RAISE(auto input, filesystem->OpenInputFile(file_path));
+  ARROW_ASSIGN_OR_RAISE(auto reader, arrow::ipc::feather::Reader::Open(input));
+  std::shared_ptr<Table> table;
+  ARROW_RETURN_NOT_OK(reader->Read(&table));
+  TableBatchReader batch_iter(*table);
+  ARROW_ASSIGN_OR_RAISE(auto batch, batch_iter.Next());
+  return batch;
+}
+
+class DiskDataHolder : public DataHolder {
+ public:
+  DiskDataHolder(const std::shared_ptr<RecordBatch>& record_batch,
+                 MemoryPool* memory_pool)
+      : DataHolder(MemoryLevel::kDiskLevel), memory_pool_(memory_pool) {
+    std::string root_path;
+    std::string file_name = "data-holder-temp-" + RandomString(64) + 
".feather";
+
+    filesystem_ =
+        arrow::fs::FileSystemFromUri(cache_storage_root_path, 
&root_path).ValueOrDie();
+
+    file_path_ = root_path + file_name;
+    status_ = StoreRecordBatch(record_batch, filesystem_, file_path_);

Review comment:
       Complex logic shouldn't happen in constructors.  Maybe there needs to be 
a `Put` method or something.

##########
File path: cpp/src/arrow/compute/exec/data_holder_node_test.cc
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <random>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+
+using testing::UnorderedElementsAreArray;
+
+namespace arrow {
+namespace compute {
+
+struct TestDataHolderNode : public ::testing::Test {
+  static constexpr int kNumBatches = 10;
+
+  TestDataHolderNode() : rng_(0) {}
+
+  std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+    static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+        arrow::null(),    arrow::boolean(), arrow::int8(),    arrow::int16(),
+        arrow::int32(),   arrow::int64(),   arrow::float16(), arrow::float32(),
+        arrow::float64(), arrow::utf8(),    arrow::binary(),  arrow::date32()};
+
+    std::vector<std::shared_ptr<Field>> fields(num_inputs);
+    std::default_random_engine gen(42);
+    std::uniform_int_distribution<int> types_dist(
+        0, static_cast<int>(some_arrow_types.size()) - 1);
+    for (size_t i = 0; i < num_inputs; i++) {
+      int random_index = types_dist(gen);
+      auto col_type = some_arrow_types.at(random_index);
+      fields[i] =
+          field("column_" + std::to_string(i) + "_" + col_type->ToString(), 
col_type);
+    }
+    return schema(fields);
+  }
+
+  void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+                                 size_t num_batches, BatchesWithSchema* 
out_batches,
+                                 int multiplicity = 1, int64_t batch_size = 4) 
{
+    if (num_batches == 0) {
+      auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+      out_batches->batches.push_back(empty_record_batch);
+    } else {
+      for (size_t j = 0; j < num_batches; j++) {
+        out_batches->batches.push_back(
+            ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+      }
+    }
+
+    size_t batch_count = out_batches->batches.size();
+    for (int repeat = 1; repeat < multiplicity; ++repeat) {
+      for (size_t i = 0; i < batch_count; ++i) {
+        out_batches->batches.push_back(out_batches->batches[i]);
+      }
+    }
+    out_batches->schema = schema;
+  }

Review comment:
       While this is an interesting concept I don't see why we need random 
batches to test a data holder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to