This is an automated email from the ASF dual-hosted git repository.

yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f4104a2c1c ARROW-16574: [C++] TSAN failure in 
arrow-ipc-read-write-test (#13245)
f4104a2c1c is described below

commit f4104a2c1c946bc9605c2b4b9016f2edd716a6c0
Author: Weston Pace <[email protected]>
AuthorDate: Wed Jun 1 15:24:15 2022 -1000

    ARROW-16574: [C++] TSAN failure in arrow-ipc-read-write-test (#13245)
    
    When reading multiple batches at the same time it was possible for 
concurrent attempts to increment `ReadStats` counters.
    
    Authored-by: Weston Pace <[email protected]>
    Signed-off-by: Yibo Cai <[email protected]>
---
 cpp/src/arrow/ipc/reader.cc | 41 +++++++++++++++++++++++++++++++----------
 1 file changed, 31 insertions(+), 10 deletions(-)

diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 0b46203795..f25f16b4e4 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1218,7 +1218,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
         auto batch_with_metadata,
         ReadRecordBatchInternal(*message->metadata(), schema_, 
field_inclusion_mask_,
                                 context, reader.get()));
-    ++stats_.num_record_batches;
+    stats_.num_record_batches.fetch_add(1, std::memory_order_relaxed);
     return batch_with_metadata;
   }
 
@@ -1266,7 +1266,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     RETURN_NOT_OK(UnpackSchemaMessage(footer_->schema(), options, 
&dictionary_memo_,
                                       &schema_, &out_schema_, 
&field_inclusion_mask_,
                                       &swap_endian_));
-    ++stats_.num_messages;
+    stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
     return Status::OK();
   }
 
@@ -1296,7 +1296,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
       RETURN_NOT_OK(UnpackSchemaMessage(
           self->footer_->schema(), options, &self->dictionary_memo_, 
&self->schema_,
           &self->out_schema_, &self->field_inclusion_mask_, 
&self->swap_endian_));
-      ++self->stats_.num_messages;
+      self->stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
       return Status::OK();
     });
   }
@@ -1305,7 +1305,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
 
   std::shared_ptr<const KeyValueMetadata> metadata() const override { return 
metadata_; }
 
-  ReadStats stats() const override { return stats_; }
+  ReadStats stats() const override { return stats_.poll(); }
 
   Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
       const bool coalesce, const io::IOContext& io_context,
@@ -1345,7 +1345,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     for (int index : indices) {
       Future<std::shared_ptr<Message>> metadata_loaded =
           all_metadata_ready.Then([this, index]() -> 
Result<std::shared_ptr<Message>> {
-            ++stats_.num_messages;
+            stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
             FileBlock block = GetRecordBatchBlock(index);
             ARROW_ASSIGN_OR_RAISE(
                 std::shared_ptr<Buffer> metadata,
@@ -1374,6 +1374,27 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
  private:
   friend class WholeIpcFileRecordBatchGenerator;
 
+  struct AtomicReadStats {
+    std::atomic<int64_t> num_messages{0};
+    std::atomic<int64_t> num_record_batches{0};
+    std::atomic<int64_t> num_dictionary_batches{0};
+    std::atomic<int64_t> num_dictionary_deltas{0};
+    std::atomic<int64_t> num_replaced_dictionaries{0};
+
+    /// \brief Capture a copy of the current counters
+    ReadStats poll() const {
+      ReadStats stats;
+      stats.num_messages = num_messages.load(std::memory_order_relaxed);
+      stats.num_record_batches = 
num_record_batches.load(std::memory_order_relaxed);
+      stats.num_dictionary_batches =
+          num_dictionary_batches.load(std::memory_order_relaxed);
+      stats.num_dictionary_deltas = 
num_dictionary_deltas.load(std::memory_order_relaxed);
+      stats.num_replaced_dictionaries =
+          num_replaced_dictionaries.load(std::memory_order_relaxed);
+      return stats;
+    }
+  };
+
   FileBlock GetRecordBatchBlock(int i) const {
     return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
   }
@@ -1386,7 +1407,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
       const FileBlock& block, const FieldsLoaderFunction& fields_loader = {}) {
     ARROW_ASSIGN_OR_RAISE(auto message,
                           arrow::ipc::ReadMessageFromBlock(block, file_, 
fields_loader));
-    ++stats_.num_messages;
+    stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
     return std::move(message);
   }
 
@@ -1396,7 +1417,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     for (int i = 0; i < num_dictionaries(); ++i) {
       ARROW_ASSIGN_OR_RAISE(auto message, 
ReadMessageFromBlock(GetDictionaryBlock(i)));
       RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
-      ++stats_.num_dictionary_batches;
+      stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
     }
     return Status::OK();
   }
@@ -1409,7 +1430,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     if (kind == DictionaryKind::Replacement) {
       return Status::Invalid("Unsupported dictionary replacement in IPC file");
     } else if (kind == DictionaryKind::Delta) {
-      ++stats_.num_dictionary_deltas;
+      stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
     }
     return Status::OK();
   }
@@ -1609,7 +1630,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
 
   Future<std::shared_ptr<RecordBatch>> ReadCachedRecordBatch(
       int index, Future<std::shared_ptr<Message>> message_fut) {
-    ++stats_.num_record_batches;
+    stats_.num_record_batches.fetch_add(1, std::memory_order_relaxed);
     return dictionary_load_finished_.Then([message_fut] { return message_fut; 
})
         .Then([this, index](const std::shared_ptr<Message>& message_obj)
                   -> Future<std::shared_ptr<RecordBatch>> {
@@ -1716,7 +1737,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
   // Schema with deselected fields dropped
   std::shared_ptr<Schema> out_schema_;
 
-  ReadStats stats_;
+  AtomicReadStats stats_;
   std::shared_ptr<io::internal::ReadRangeCache> metadata_cache_;
   std::unordered_set<int> cached_data_blocks_;
   Future<> dictionary_load_finished_;

Reply via email to