This is an automated email from the ASF dual-hosted git repository.
yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f4104a2c1c ARROW-16574: [C++] TSAN failure in
arrow-ipc-read-write-test (#13245)
f4104a2c1c is described below
commit f4104a2c1c946bc9605c2b4b9016f2edd716a6c0
Author: Weston Pace <[email protected]>
AuthorDate: Wed Jun 1 15:24:15 2022 -1000
ARROW-16574: [C++] TSAN failure in arrow-ipc-read-write-test (#13245)
When reading multiple batches at the same time it was possible for
concurrent attempts to increment `ReadStats` counters.
Authored-by: Weston Pace <[email protected]>
Signed-off-by: Yibo Cai <[email protected]>
---
cpp/src/arrow/ipc/reader.cc | 41 +++++++++++++++++++++++++++++++----------
1 file changed, 31 insertions(+), 10 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 0b46203795..f25f16b4e4 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1218,7 +1218,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
auto batch_with_metadata,
ReadRecordBatchInternal(*message->metadata(), schema_,
field_inclusion_mask_,
context, reader.get()));
- ++stats_.num_record_batches;
+ stats_.num_record_batches.fetch_add(1, std::memory_order_relaxed);
return batch_with_metadata;
}
@@ -1266,7 +1266,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
RETURN_NOT_OK(UnpackSchemaMessage(footer_->schema(), options,
&dictionary_memo_,
&schema_, &out_schema_,
&field_inclusion_mask_,
&swap_endian_));
- ++stats_.num_messages;
+ stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
return Status::OK();
}
@@ -1296,7 +1296,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
RETURN_NOT_OK(UnpackSchemaMessage(
self->footer_->schema(), options, &self->dictionary_memo_,
&self->schema_,
&self->out_schema_, &self->field_inclusion_mask_,
&self->swap_endian_));
- ++self->stats_.num_messages;
+ self->stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
return Status::OK();
});
}
@@ -1305,7 +1305,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
std::shared_ptr<const KeyValueMetadata> metadata() const override { return
metadata_; }
- ReadStats stats() const override { return stats_; }
+ ReadStats stats() const override { return stats_.poll(); }
Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
const bool coalesce, const io::IOContext& io_context,
@@ -1345,7 +1345,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
for (int index : indices) {
Future<std::shared_ptr<Message>> metadata_loaded =
all_metadata_ready.Then([this, index]() ->
Result<std::shared_ptr<Message>> {
- ++stats_.num_messages;
+ stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
FileBlock block = GetRecordBatchBlock(index);
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> metadata,
@@ -1374,6 +1374,27 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
private:
friend class WholeIpcFileRecordBatchGenerator;
+ struct AtomicReadStats {
+ std::atomic<int64_t> num_messages{0};
+ std::atomic<int64_t> num_record_batches{0};
+ std::atomic<int64_t> num_dictionary_batches{0};
+ std::atomic<int64_t> num_dictionary_deltas{0};
+ std::atomic<int64_t> num_replaced_dictionaries{0};
+
+ /// \brief Capture a copy of the current counters
+ ReadStats poll() const {
+ ReadStats stats;
+ stats.num_messages = num_messages.load(std::memory_order_relaxed);
+ stats.num_record_batches =
num_record_batches.load(std::memory_order_relaxed);
+ stats.num_dictionary_batches =
+ num_dictionary_batches.load(std::memory_order_relaxed);
+ stats.num_dictionary_deltas =
num_dictionary_deltas.load(std::memory_order_relaxed);
+ stats.num_replaced_dictionaries =
+ num_replaced_dictionaries.load(std::memory_order_relaxed);
+ return stats;
+ }
+ };
+
FileBlock GetRecordBatchBlock(int i) const {
return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
}
@@ -1386,7 +1407,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
const FileBlock& block, const FieldsLoaderFunction& fields_loader = {}) {
ARROW_ASSIGN_OR_RAISE(auto message,
arrow::ipc::ReadMessageFromBlock(block, file_,
fields_loader));
- ++stats_.num_messages;
+ stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
return std::move(message);
}
@@ -1396,7 +1417,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
for (int i = 0; i < num_dictionaries(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto message,
ReadMessageFromBlock(GetDictionaryBlock(i)));
RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
- ++stats_.num_dictionary_batches;
+ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
}
return Status::OK();
}
@@ -1409,7 +1430,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
if (kind == DictionaryKind::Replacement) {
return Status::Invalid("Unsupported dictionary replacement in IPC file");
} else if (kind == DictionaryKind::Delta) {
- ++stats_.num_dictionary_deltas;
+ stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
}
return Status::OK();
}
@@ -1609,7 +1630,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
Future<std::shared_ptr<RecordBatch>> ReadCachedRecordBatch(
int index, Future<std::shared_ptr<Message>> message_fut) {
- ++stats_.num_record_batches;
+ stats_.num_record_batches.fetch_add(1, std::memory_order_relaxed);
return dictionary_load_finished_.Then([message_fut] { return message_fut;
})
.Then([this, index](const std::shared_ptr<Message>& message_obj)
-> Future<std::shared_ptr<RecordBatch>> {
@@ -1716,7 +1737,7 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
// Schema with deselected fields dropped
std::shared_ptr<Schema> out_schema_;
- ReadStats stats_;
+ AtomicReadStats stats_;
std::shared_ptr<io::internal::ReadRangeCache> metadata_cache_;
std::unordered_set<int> cached_data_blocks_;
Future<> dictionary_load_finished_;