Tom-Newton commented on code in PR #44897:
URL: https://github.com/apache/arrow/pull/44897#discussion_r1869614409


##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -449,8 +468,8 @@ static void WorkerLoop(std::shared_ptr<ThreadPool::State> 
state,
 
       DCHECK_GE(state->tasks_queued_or_running_, 0);
       {
-        Task task = std::move(state->pending_tasks_.front());
-        state->pending_tasks_.pop_front();
+        Task task = 
std::move(const_cast<Task&>(state->pending_tasks_.top().task));

Review Comment:
   Hmm... it looks like `c` is protected so it doesn't seem to like it 
   ```
   /home/tomnewton/arrow/cpp/src/arrow/util/thread_pool.cc:471:53: error: 
‘std::vector<arrow::internal::{anonymous}::QueuedTask, 
std::allocator<arrow::internal::{anonymous}::QueuedTask> > 
std::priority_queue<arrow::internal::{anonymous}::QueuedTask>::c’ is protected 
within this context
   ```



##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
   AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
 }
 
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+  if (have_false_positive_memory_leak_with_async_close()) {
+    GTEST_SKIP() << "Filesystem have false positive memory leak with 
generator";
+  }
+#endif
+  auto originalThreads = io::GetIOThreadPoolCapacity();
+  // Needs to be smaller than the number of files we test with to catch 
GH-15233
+  ASSERT_OK(io::SetIOThreadPoolCapacity(2));
+  // Ensure the thread pool capacity is set back to the original value after 
the test
+  auto resetThreadPool = [originalThreads](void*) {
+    ASSERT_OK(io::SetIOThreadPoolCapacity(originalThreads));
+  };
+  std::unique_ptr<void, decltype(resetThreadPool)> 
resetThreadPoolGuard(nullptr,
+                                                                        
resetThreadPool);
+
+  auto mock_fs = std::make_shared<arrow::fs::internal::MockFileSystem>(
+      std::chrono::system_clock::now());
+  std::shared_ptr<FileSystem> shared_ptr_fs(fs, [](FileSystem*) {});

Review Comment:
   Yeah, that's just because I don't know what I'm doing with C++ :sweat-smile:



##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
   AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
 }
 
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+  if (have_false_positive_memory_leak_with_async_close()) {
+    GTEST_SKIP() << "Filesystem have false positive memory leak with 
generator";
+  }
+#endif
+  auto originalThreads = io::GetIOThreadPoolCapacity();

Review Comment:
   > Also it seems that we should actually call 
fs->io_context().executor()->capacity()?
   
   
   :+1: I've also changed the SetCapacity to be done in a similar way. 
Hopefully that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to