This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ee2e9448c8 ARROW-17115: [C++] HashJoin fails if it encounters a batch 
with more than 32Ki rows (#13679)
ee2e9448c8 is described below

commit ee2e9448c8565820ba38a2df9e44ab6055e5df1d
Author: Weston Pace <[email protected]>
AuthorDate: Fri Jul 22 11:21:02 2022 -1000

    ARROW-17115: [C++] HashJoin fails if it encounters a batch with more than 
32Ki rows (#13679)
    
    The swiss join was correctly breaking up probe side batches but build size 
batches would get partitioned as-is before any breaking up happened.  That 
partitioning assumed 16-bit addressable indices and this failed if a build side 
batch was too large.
    
    Rather than break batches up in the hash-join node I went ahead and started 
breaking batches up in the source node.  This matches the morsel / batch model 
and is basically a small precursor for future scheduler changes.
    
    This will have some small end-user impact as the output for larger queries 
is going to be batched more finely.  However, we were already slicing batches 
up into 128Ki chunks in the scanner starting with 8.0.0 and so I don't think 
this is a significant difference.
    
    Authored-by: Weston Pace <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/compute/exec/exec_plan.cc      |  2 ++
 cpp/src/arrow/compute/exec/exec_plan.h       | 16 ++++++++++++
 cpp/src/arrow/compute/exec/hash_join_node.cc |  5 ++++
 cpp/src/arrow/compute/exec/plan_test.cc      | 23 +++++++++++++++++
 cpp/src/arrow/compute/exec/source_node.cc    | 38 ++++++++++++++++++++++------
 cpp/src/arrow/dataset/scanner.cc             | 16 +++++++-----
 python/pyarrow/tests/test_dataset.py         |  2 +-
 7 files changed, 87 insertions(+), 15 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc 
b/cpp/src/arrow/compute/exec/exec_plan.cc
index a7cd3472be..15d9569007 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -348,6 +348,8 @@ util::optional<int> GetNodeIndex(const 
std::vector<ExecNode*>& nodes,
 
 }  // namespace
 
+const uint32_t ExecPlan::kMaxBatchSize;
+
 Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
     ExecContext* ctx, std::shared_ptr<const KeyValueMetadata> metadata) {
   return std::shared_ptr<ExecPlan>(new ExecPlanImpl{ctx, metadata});
diff --git a/cpp/src/arrow/compute/exec/exec_plan.h 
b/cpp/src/arrow/compute/exec/exec_plan.h
index d2663972f2..5e52f606a6 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.h
+++ b/cpp/src/arrow/compute/exec/exec_plan.h
@@ -40,6 +40,8 @@ namespace compute {
 
 class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
  public:
+  // This allows operators to rely on signed 16-bit indices
+  static const uint32_t kMaxBatchSize = 1 << 15;
   using NodeVector = std::vector<ExecNode*>;
 
   virtual ~ExecPlan() = default;
@@ -144,10 +146,24 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
   /// \brief Return the plan's attached metadata
   std::shared_ptr<const KeyValueMetadata> metadata() const;
 
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool UseLegacyBatching() const { return use_legacy_batching_; }
+  // For internal use only, see above comment
+  void SetUseLegacyBatching(bool value) { use_legacy_batching_ = value; }
+
   std::string ToString() const;
 
  protected:
   ExecContext* exec_context_;
+  bool use_legacy_batching_ = false;
   explicit ExecPlan(ExecContext* exec_context) : exec_context_(exec_context) {}
 };
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc 
b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 1785df8784..44667b9f28 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -947,6 +947,11 @@ class HashJoinNode : public ExecNode {
 
   Status Init() override {
     RETURN_NOT_OK(ExecNode::Init());
+    if (plan_->UseLegacyBatching()) {
+      return Status::Invalid(
+          "The plan was configured to use legacy batching but contained a join 
node "
+          "which is incompatible with legacy batching");
+    }
     bool use_sync_execution = !(plan_->exec_context()->executor());
     // TODO(ARROW-15732)
     // Each side of join might have an IO thread being called from. Once this 
is fixed
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc 
b/cpp/src/arrow/compute/exec/plan_test.cc
index c29f3fa9d5..e06c41c748 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -1458,5 +1458,28 @@ TEST(ExecPlan, RecordBatchReaderSourceSink) {
               Finishes(ResultWith(UnorderedElementsAreArray(input.batches))));
 }
 
+TEST(ExecPlan, SourceEnforcesBatchLimit) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  auto random_data = MakeRandomBatches(
+      schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3,
+      /*batch_size=*/static_cast<int32_t>(std::floor(ExecPlan::kMaxBatchSize * 
3.5)));
+
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"source", SourceNodeOptions{random_data.schema,
+                                                 
random_data.gen(/*parallel=*/true,
+                                                                 
/*slow=*/false)}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+  ASSERT_FINISHES_OK_AND_ASSIGN(std::vector<ExecBatch> batches,
+                                StartAndCollect(plan.get(), 
std::move(sink_gen)));
+  for (const auto& batch : batches) {
+    ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize);
+  }
+}
+
 }  // namespace compute
 }  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/source_node.cc 
b/cpp/src/arrow/compute/exec/source_node.cc
index 677659c815..a640cf737e 100644
--- a/cpp/src/arrow/compute/exec/source_node.cc
+++ b/cpp/src/arrow/compute/exec/source_node.cc
@@ -106,23 +106,45 @@ struct SourceNode : ExecNode {
     }
     auto fut = Loop([this, options] {
                  std::unique_lock<std::mutex> lock(mutex_);
-                 int total_batches = batch_count_++;
                  if (stop_requested_) {
-                   return 
Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
+                   return 
Future<ControlFlow<int>>::MakeFinished(Break(batch_count_));
                  }
                  lock.unlock();
 
                  return generator_().Then(
-                     [=](const util::optional<ExecBatch>& maybe_batch)
+                     [=](const util::optional<ExecBatch>& maybe_morsel)
                          -> Future<ControlFlow<int>> {
                        std::unique_lock<std::mutex> lock(mutex_);
-                       if (IsIterationEnd(maybe_batch) || stop_requested_) {
-                         return Break(total_batches);
+                       if (IsIterationEnd(maybe_morsel) || stop_requested_) {
+                         return Break(batch_count_);
                        }
                        lock.unlock();
-                       ExecBatch batch = std::move(*maybe_batch);
+                       bool use_legacy_batching = plan_->UseLegacyBatching();
+                       ExecBatch morsel = std::move(*maybe_morsel);
+                       int64_t morsel_length = 
static_cast<int64_t>(morsel.length);
+                       if (use_legacy_batching || morsel_length == 0) {
+                         // For various reasons (e.g. ARROW-13982) we pass 
empty batches
+                         // through
+                         batch_count_++;
+                       } else {
+                         int num_batches = static_cast<int>(
+                             bit_util::CeilDiv(morsel_length, 
ExecPlan::kMaxBatchSize));
+                         batch_count_ += num_batches;
+                       }
                        RETURN_NOT_OK(plan_->ScheduleTask([=]() {
-                         outputs_[0]->InputReceived(this, std::move(batch));
+                         int64_t offset = 0;
+                         do {
+                           int64_t batch_size = std::min<int64_t>(
+                               morsel_length - offset, 
ExecPlan::kMaxBatchSize);
+                           // In order for the legacy batching model to work 
we must
+                           // not slice batches from the source
+                           if (use_legacy_batching) {
+                             batch_size = morsel_length;
+                           }
+                           ExecBatch batch = morsel.Slice(offset, batch_size);
+                           offset += batch_size;
+                           outputs_[0]->InputReceived(this, std::move(batch));
+                         } while (offset < morsel.length);
                          return Status::OK();
                        }));
                        lock.lock();
@@ -135,7 +157,7 @@ struct SourceNode : ExecNode {
                      },
                      [=](const Status& error) -> ControlFlow<int> {
                        outputs_[0]->ErrorReceived(this, error);
-                       return Break(total_batches);
+                       return Break(batch_count_);
                      },
                      options);
                })
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index c3e5b2a477..0ef1d4577c 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -210,7 +210,7 @@ class AsyncScanner : public Scanner, public 
std::enable_shared_from_this<AsyncSc
   Future<> VisitBatchesAsync(std::function<Status(TaggedRecordBatch)> visitor,
                              Executor* executor);
   Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync(
-      Executor* executor, bool sequence_fragments);
+      Executor* executor, bool sequence_fragments, bool use_legacy_batching = 
false);
   Future<std::shared_ptr<Table>> ToTableAsync(Executor* executor);
 
   Result<FragmentGenerator> GetFragments() const;
@@ -351,7 +351,7 @@ Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
 }
 
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
-    Executor* cpu_executor, bool sequence_fragments) {
+    Executor* cpu_executor, bool sequence_fragments, bool use_legacy_batching) 
{
   if (!scan_options_->use_threads) {
     cpu_executor = nullptr;
   }
@@ -362,6 +362,7 @@ Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync(
       std::make_shared<compute::ExecContext>(scan_options_->pool, 
cpu_executor);
 
   ARROW_ASSIGN_OR_RAISE(auto plan, 
compute::ExecPlan::Make(exec_context.get()));
+  plan->SetUseLegacyBatching(use_legacy_batching);
   AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
 
   auto exprs = scan_options_->projection.call()->arguments;
@@ -518,8 +519,9 @@ Result<TaggedRecordBatchGenerator> 
AsyncScanner::ScanBatchesAsync() {
 
 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
     Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(
-                                            cpu_executor, 
/*sequence_fragments=*/true));
+  ARROW_ASSIGN_OR_RAISE(
+      auto unordered, ScanBatchesUnorderedAsync(cpu_executor, 
/*sequence_fragments=*/true,
+                                                /*use_legacy_batching=*/true));
   // We need an initial value sentinel, so we use one with fragment.index < 0
   auto is_before_any = [](const EnumeratedRecordBatch& batch) {
     return batch.fragment.index < 0;
@@ -611,8 +613,10 @@ Future<> 
AsyncScanner::VisitBatchesAsync(std::function<Status(TaggedRecordBatch)
 
 Future<std::shared_ptr<Table>> AsyncScanner::ToTableAsync(Executor* 
cpu_executor) {
   auto scan_options = scan_options_;
-  ARROW_ASSIGN_OR_RAISE(auto positioned_batch_gen,
-                        ScanBatchesUnorderedAsync(cpu_executor));
+  ARROW_ASSIGN_OR_RAISE(
+      auto positioned_batch_gen,
+      ScanBatchesUnorderedAsync(cpu_executor, /*sequence_fragments=*/false,
+                                /*use_legacy_batching=*/true));
   /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't
   /// invalidate concurrently running tasks when Finish() early returns
   /// and the mutex/batches fail out of scope.
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index 277c6866f1..b900e694a9 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -1816,7 +1816,7 @@ def test_positional_keywords_raises(tempdir):
 @pytest.mark.parquet
 @pytest.mark.pandas
 def test_read_partition_keys_only(tempdir):
-    BATCH_SIZE = 2 ** 17
+    BATCH_SIZE = 2 ** 15
     # This is a regression test for ARROW-15318 which saw issues
     # reading only the partition keys from files with batches larger
     # than the default batch size (e.g. so we need to return two chunks)

Reply via email to