mapleFU commented on PR #38466:
URL: https://github.com/apache/arrow/pull/38466#issuecomment-1781528962
```c++
class DelayedBufferReader : public ::arrow::io::BufferReader {
public:
explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>&
buffer)
: ::arrow::io::BufferReader(buffer) {}
::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(const
::arrow::io::IOContext& io_context, int64_t position,
int64_t nbytes) override {
read_async_count.fetch_add(1);
std::this_thread::sleep_for(std::chrono::seconds(10));
return ::arrow::io::BufferReader::ReadAsync(io_context, position,
nbytes);
}
std::atomic<int> read_async_count{0};
};
TEST(TestArrowReadWrite, ScanContentsGracefulShutdown) {
ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_batch_size(256);
properties.set_pre_buffer(true);
properties.set_use_threads(true);
auto cache_options = ::arrow::io::CacheOptions::LazyDefaults();
cache_options.hole_size_limit = 1;
cache_options.range_size_limit = 1;
properties.set_cache_options(cache_options);
const int num_rows = 1024;
const int row_group_size = 16;
const int num_columns = 1;
std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));
auto mock_input_stream = std::make_shared<DelayedBufferReader>(buffer);
std::vector<std::unique_ptr<::arrow::DelayedExecutor>> delayed_executors;
delayed_executors.resize(3);
// vector of futures
std::vector<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>>
futures;
for (int idx = 0; idx < 3; ++idx) {
delayed_executors[idx] = std::make_unique<::arrow::DelayedExecutor>();
auto& delayed_executor = *delayed_executors[idx];
std::shared_ptr<FileReader> reader;
{
std::unique_ptr<FileReader> unique_reader;
FileReaderBuilder builder;
ASSERT_OK(builder.Open(mock_input_stream));
ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
reader = std::move(unique_reader);
}
{
ASSERT_OK_AND_ASSIGN(
auto batch_generator,
reader->GetRecordBatchGenerator(reader, {0, 1}, {0},
&delayed_executor, 256));
auto fut1 = batch_generator();
auto fut2 = batch_generator();
futures.push_back(fut1);
futures.push_back(fut2);
}
// clear reader.
reader = nullptr;
}
auto pool = ::arrow::internal::ThreadPool::Make(2).ValueOrDie();
pool->Submit([&]() {
for (int idx = 0; idx < 3; ++idx) {
auto& delayed_executor = *delayed_executors[idx];
while (!delayed_executor.captured_tasks.empty()) {
std::cout << "count:" << delayed_executor.captured_tasks.size() <<
std::endl;
auto callbacks = std::move(delayed_executor.captured_tasks);
for (auto& callback : callbacks) {
std::move(callback)();
}
}
}
}).ValueOrDie().Wait();
}
```
I've tried an ugly test like this, but though `
reader->GetRecordBatchGenerator` has dtor, the `this->executor_` didn't trigger
the ASAN / UBSAN error. Maybe I need to take another ways later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]