vibhatha commented on code in PR #15288:
URL: https://github.com/apache/arrow/pull/15288#discussion_r1070201623


##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -322,26 +306,16 @@ void TestSourceSink(
     std::string source_factory_name,
     std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
         to_elements) {
-  ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
-  ExecContext exec_context(default_memory_pool(), executor.get());
-  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
-  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
-
   auto exp_batches = MakeBasicBatches();
   ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
   auto element_it_maker = [&elements]() {
     return MakeVectorIterator<ElementType>(elements);
   };
-
-  ASSERT_OK(Declaration::Sequence({
-                                      {source_factory_name,
-                                       OptionsType{exp_batches.schema, 
element_it_maker}},
-                                      {"sink", SinkNodeOptions{&sink_gen}},
-                                  })
-                .AddToPlan(plan.get()));
-
-  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
-              
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+  Declaration plan(source_factory_name,
+                   OptionsType{exp_batches.schema, element_it_maker});
+  ASSERT_OK_AND_ASSIGN(auto result,
+                       DeclarationToExecBatches(std::move(plan), 
/*use_threads=*/false));

Review Comment:
   One thing I observed is that the following happens, at least once per 20 
repetitions on a single bash script test. Total of 100 repetitions (5 bash 
runs), caused 5 failures. So I would say it is consistently failing.
   
   ```bash
   arrow-compute-plan-test: 
/home/asus/github/fork/arrow/cpp/src/arrow/util/async_generator.h:1736: 
arrow::BackgroundGenerator<T>::Cleanup::~Cleanup() [with T = 
std::optional<arrow::compute::ExecBatch>]: Assertion 
`state->worker_thread_id.load() != ::arrow::internal::GetThreadId()' failed.
   Aborted (core dumped)
   ```
   
   I recommend setting `use_threads=false` for the moment, but investigate why 
in a separate issue. But do you have an insight on this?
   
   PS: Reading the `CleanUp` destructor, seems like it is expected. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to