taepper commented on issue #49214:
URL: https://github.com/apache/arrow/issues/49214#issuecomment-4781240932

   Maybe some valuable test, the `FunctionExecutor` abstraction you might be 
looking for is available in the C++ api, so this issue could be solved by 
exposing it to python:
   
   ```
   // Benchmark reproducing the pyarrow `is_in` "rebuild hash table every call"
   // problem in C++, and checking whether driving the internal stateful kernel
   // directly (build the hash table once, probe many times) avoids it.
   //
   // Mirrors:
   //   value_set = pa.array(np.arange(10_000_000))
   //   blocks    = [pa.array(np.random.choice(np.arange(1000), 100)) for _ in 
20]
   //   for block in blocks: pc.is_in(block, value_set)
   //
   // Build (against the in-tree installed Arrow at cpp/install):
   //   clang++ -std=c++17 -O2 is_in_state_benchmark.cpp \
   //     -I cpp/install/include -L cpp/install/lib \
   //     -larrow -larrow_compute \
   //     -Wl,-rpath,cpp/install/lib -o is_in_state_benchmark
   //   ./is_in_state_benchmark
   
   #include <arrow/api.h>
   #include <arrow/array/data.h>
   #include <arrow/buffer.h>
   #include <arrow/compute/api.h>
   #include <arrow/compute/exec.h>
   #include <arrow/compute/function.h>
   #include <arrow/compute/initialize.h>
   
   #include <chrono>
   #include <iostream>
   #include <memory>
   #include <random>
   #include <vector>
   
   using arrow::Array;
   using arrow::ArrayData;
   using arrow::ArraySpan;
   using arrow::Datum;
   using arrow::Int64Builder;
   using arrow::TypeHolder;
   namespace cp = arrow::compute;
   
   static arrow::Result<std::shared_ptr<Array>> MakeRange(int64_t n) {
     Int64Builder builder;
     ARROW_RETURN_NOT_OK(builder.Reserve(n));
     for (int64_t i = 0; i < n; ++i) builder.UnsafeAppend(i);
     std::shared_ptr<Array> out;
     ARROW_RETURN_NOT_OK(builder.Finish(&out));
     return out;
   }
   
   static arrow::Result<std::shared_ptr<Array>> 
MakeRandomBlock(std::mt19937_64& rng, int64_t n,
                                                 int64_t high) {
     std::uniform_int_distribution<int64_t> dist(0, high - 1);
     Int64Builder builder;
     ARROW_RETURN_NOT_OK(builder.Reserve(n));
     for (int64_t i = 0; i < n; ++i) builder.UnsafeAppend(dist(rng));
     std::shared_ptr<Array> out;
     ARROW_RETURN_NOT_OK(builder.Finish(&out));
     return out;
   }
   
   // Defined below main(); prevents the optimizer from discarding the timed 
work.
   void benchmark_keep_alive(const Datum& d);
   
   arrow::Status runBenchmark() {
     constexpr int64_t kValueSetLen = 10'000'000;
     constexpr int kNumBlocks = 20;
     constexpr int64_t kBlockLen = 100;
     constexpr int64_t kBlockHigh = 1000;
   
     ARROW_RETURN_NOT_OK(  cp::Initialize());
   
     std::cout << "Building value_set (" << kValueSetLen << " int64) and " << 
kNumBlocks
               << " blocks of " << kBlockLen << " ...\n";
   
     ARROW_ASSIGN_OR_RAISE(auto value_set, MakeRange(kValueSetLen));
     std::mt19937_64 rng(42);
     std::vector<std::shared_ptr<Array>> blocks;
     blocks.reserve(kNumBlocks);
     for (int i = 0; i < kNumBlocks; ++i) {
       ARROW_ASSIGN_OR_RAISE(auto block,MakeRandomBlock(rng, kBlockLen, 
kBlockHigh));
       blocks.push_back(std::move(block));
     }
   
     cp::SetLookupOptions options{Datum(value_set)};
   
     // Warm up the function registry / first hash-table build so neither 
approach
     // pays a one-off cost the other doesn't.
     (void)cp::IsIn(Datum(blocks[0]), options);
   
     // 
---------------------------------------------------------------------------
     // Approach A: IsIn call for every block
     // 
---------------------------------------------------------------------------
   
     double time_a;
     {
       auto start = std::chrono::steady_clock::now();
       for (const auto& block : blocks) {
         ARROW_ASSIGN_OR_RAISE(Datum result, cp::IsIn(Datum(block), options));
         benchmark_keep_alive(result);
       }
       auto end = std::chrono::steady_clock::now();
       time_a = std::chrono::duration<double>(end - start).count();
     }
   
     // 
---------------------------------------------------------------------------
     // Approach B: reuse FunctionExecutor
     // 
---------------------------------------------------------------------------
   
     // Resolve the is_in kernel for an int64 input.
     std::vector<TypeHolder> in_types = {TypeHolder(arrow::int64())};
     ARROW_ASSIGN_OR_RAISE(auto function_executor, 
cp::GetFunctionExecutor("is_in", in_types, &options));
   
     double time_b;
     {
       auto start = std::chrono::steady_clock::now();
       for (const auto& block : blocks) {
         ARROW_ASSIGN_OR_RAISE(auto out_data, 
function_executor->Execute({block}));
         benchmark_keep_alive(out_data);
       }
       auto end = std::chrono::steady_clock::now();
       time_b = std::chrono::duration<double>(end - start).count();
     }
   
     std::cout << "\nResults over " << kNumBlocks << " blocks:\n";
     std::cout << "  A: " << time_a
               << " s\n";
     std::cout << "  B: " << time_b
               << " s\n";
     std::cout << "  speedup (A / B): " << (time_a / time_b) << "x\n";
     return arrow::Status::OK();
   }
   
   int main() {
     auto error = runBenchmark();
     return error == arrow::Status::OK();
   }
   
   // Defined out-of-line to keep the timed lambdas readable; prevents the 
optimizer
   // from discarding the work.
   void benchmark_keep_alive(const Datum& d) {
     asm volatile("" : : "r,m"(d.kind()) : "memory");
   }
   ```
   
   yields this result:
   
   ```
   Results over 20 blocks:
     A: 54.1688 s
     B: 3.90086 s
     speedup (A / B): 13.8864x
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to