pitrou commented on code in PR #45372:
URL: https://github.com/apache/arrow/pull/45372#discussion_r1941468163


##########
cpp/src/arrow/record_batch.cc:
##########
@@ -102,8 +103,10 @@ class SimpleRecordBatch : public RecordBatch {
   std::shared_ptr<Array> column(int i) const override {
     std::shared_ptr<Array> result = std::atomic_load(&boxed_columns_[i]);
     if (!result) {
-      result = MakeArray(columns_[i]);
-      std::atomic_store(&boxed_columns_[i], result);
+      auto new_array = MakeArray(columns_[i]);
+      if (std::atomic_compare_exchange_strong(&boxed_columns_[i], &result, 
new_array)) {

Review Comment:
   ```suggestion
         // Be careful not to overwrite existing entry if another thread has 
been calling
         // `column(i)` at the same time, since the `boxed_columns_` contents 
are exposed
         // by `columns()` (see GH-45371).
         if (std::atomic_compare_exchange_strong(&boxed_columns_[i], &result, 
new_array)) {
   ```



##########
cpp/src/arrow/record_batch_test.cc:
##########
@@ -393,6 +399,27 @@ TEST_F(TestRecordBatch, RemoveColumnEmpty) {
   AssertBatchesEqual(*added, *batch1);
 }
 
+TEST_F(TestRecordBatch, ColumnsThreadSafety) {
+  const int length = 10;
+
+  random::RandomArrayGenerator gen(42);
+  std::shared_ptr<ArrayData> array_data = gen.ArrayOf(utf8(), length)->data();
+  auto schema = ::arrow::schema({field("f1", utf8())});
+  auto record_batch = RecordBatch::Make(schema, length, {array_data});
+  std::atomic_bool start_flag{false};
+  std::thread t([record_batch, &start_flag]() {
+    start_flag.store(true);
+    auto columns = record_batch->columns();
+    ASSERT_EQ(columns.size(), 1);

Review Comment:
   > This test will either produce a TSAN warning or not, but there is no 
assertion that will fail consistently under the data race.
   
   Actually, one could check that the returned `columns` store identical 
`Array*` pointers?
   
   Something like:
   ```c++
     constexpr int kNumThreads = 10;
   
     random::RandomArrayGenerator gen(42);
     std::shared_ptr<ArrayData> array_data = gen.ArrayOf(utf8(), 1)->data();
     auto schema = ::arrow::schema({field("f1", utf8())});
     auto record_batch = RecordBatch::Make(schema, length, {array_data});
   
     std::mutex mutex;
     std::vector<std::shared_ptr<Array>> all_columns;
     for (auto& thread : threads) {
       thread = std::thread([&]() {
         const auto& columns = record_batch->columns();
         all_columns.push_back(columns[0].get());
       });
     }
     for (auto& thread : threads) {
       thread.join();
     }
     for (const auto& col : all_columns) {
       ASSERT_EQ(col.get(), all_columns[0].get());
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to