ilya-biryukov updated this revision to Diff 132105.
ilya-biryukov added a comment.

- An initial version of thread-per-file approach.

This is by no means a final version, we should definitely move things between 
files, do some renames, etc. before landing the final version.
Some things are not used anymore (e.g. ThreadPool), but are still in the patch, 
we'll need to remove them too.

The new version does not drop updates that are immidieately followed by other 
updates, this seems like an oversight and should be fixed too.
Please take a look at the overall design and let me know what you think. Feel 
free to add suggestions on how we can improve things, too!


Repository:
  rCTE Clang Tools Extra

https://reviews.llvm.org/D42573

Files:
  clangd/CMakeLists.txt
  clangd/ClangdServer.h
  clangd/ClangdUnitStore.cpp
  clangd/ClangdUnitStore.h
  clangd/TUScheduler.cpp
  clangd/TUScheduler.h
  clangd/Threading.cpp
  clangd/Threading.h
  clangd/threading/Cancellation.cpp
  clangd/threading/Cancellation.h
  clangd/threading/RequestQueue.cpp
  clangd/threading/RequestQueue.h

Index: clangd/threading/RequestQueue.h
===================================================================
--- /dev/null
+++ clangd/threading/RequestQueue.h
@@ -0,0 +1,50 @@
+//===--- RequestQueue.h -----------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+
+#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H
+#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H
+
+#include "Function.h"
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+namespace clang {
+namespace clangd {
+
+/// A thread-safe request queue managed by ThreadPool. This is an implementation
+/// detail, see QueueHandle and ThreadPool for user-facing APIs.
+class RequestQueue {
+public:
+  RequestQueue(std::condition_variable &Condition);
+
+  void addToFront(UniqueFunction<void()> Req);
+  void addToBack(UniqueFunction<void()> Req);
+
+  llvm::Optional<UniqueFunction<void()>> pop();
+
+  bool needsProcessing() const;
+
+  bool isScheduledForRemoval() const;
+  void setScheduledForRemoval();
+
+  void startProcessing();
+  void stopProcessing();
+
+private:
+  std::condition_variable &Condition;
+  mutable std::mutex Mutex;
+  bool IsProcessing;
+  bool IsScheduledForRemoval;
+  std::deque<UniqueFunction<void()>> Requests;
+};
+
+} // namespace clangd
+} // namespace clang
+#endif
Index: clangd/threading/RequestQueue.cpp
===================================================================
--- /dev/null
+++ clangd/threading/RequestQueue.cpp
@@ -0,0 +1,78 @@
+//===--- RequestQueue.cpp ---------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+#include "RequestQueue.h"
+
+namespace clang {
+namespace clangd {
+
+RequestQueue::RequestQueue(std::condition_variable &Condition)
+    : Condition(Condition), IsProcessing(false), IsScheduledForRemoval(false) {}
+
+void RequestQueue::addToFront(UniqueFunction<void()> Req) {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  Requests.emplace_front(std::move(Req));
+
+  Lock.unlock();
+  Condition.notify_one();
+}
+
+void RequestQueue::addToBack(UniqueFunction<void()> Req) {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  Requests.emplace_back(std::move(Req));
+
+  Lock.unlock();
+  Condition.notify_one();
+}
+
+llvm::Optional<UniqueFunction<void()>> RequestQueue::pop() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  if (Requests.empty())
+    return llvm::None;
+
+  UniqueFunction<void()> Result = std::move(Requests.front());
+  Requests.pop_front();
+  return Result;
+}
+
+bool RequestQueue::needsProcessing() const {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  if (IsProcessing)
+    return false;
+  return !Requests.empty() || IsScheduledForRemoval;
+}
+
+bool RequestQueue::isScheduledForRemoval() const {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  return IsScheduledForRemoval;
+}
+
+void RequestQueue::setScheduledForRemoval() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(!IsScheduledForRemoval);
+  IsScheduledForRemoval = true;
+}
+
+void RequestQueue::startProcessing() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(!IsProcessing);
+  assert(!Requests.empty() || IsScheduledForRemoval);
+  IsProcessing = true;
+}
+
+void RequestQueue::stopProcessing() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(IsProcessing);
+  IsProcessing = false;
+
+  Lock.unlock();
+  Condition.notify_one();
+}
+
+} // namespace clangd
+} // namespace clang
Index: clangd/threading/Cancellation.h
===================================================================
--- /dev/null
+++ clangd/threading/Cancellation.h
@@ -0,0 +1,44 @@
+//===--- Cancellation.h -----------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+
+#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H
+#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H
+
+#include <atomic>
+#include <cassert>
+#include <memory>
+
+namespace clang {
+namespace clangd {
+
+/// A shared boolean flag indicating if the computation was cancelled.
+/// Once cancelled, cannot be returned to the previous state.
+/// FIXME: We should split this class it into consumers and producers of the
+/// cancellation flags.
+class CancellationFlag {
+public:
+  CancellationFlag();
+
+  void setCancelled() {
+    assert(WasCancelled && "the object was moved");
+    WasCancelled->store(true);
+  }
+
+  bool isCancelled() const {
+    assert(WasCancelled && "the object was moved");
+    return WasCancelled->load();
+  }
+
+private:
+  std::shared_ptr<std::atomic<bool>> WasCancelled;
+};
+} // namespace clangd
+} // namespace clang
+
+#endif
Index: clangd/threading/Cancellation.cpp
===================================================================
--- /dev/null
+++ clangd/threading/Cancellation.cpp
@@ -0,0 +1,18 @@
+//===--- Cancellation.cpp ---------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+
+#include "Cancellation.h"
+namespace clang {
+namespace clangd {
+
+CancellationFlag::CancellationFlag()
+    : WasCancelled(std::make_shared<std::atomic<bool>>(false)) {}
+
+} // namespace clangd
+} // namespace clang
Index: clangd/Threading.h
===================================================================
--- clangd/Threading.h
+++ clangd/Threading.h
@@ -11,15 +11,53 @@
 #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_H
 
 #include "Function.h"
+#include "threading/RequestQueue.h"
 #include <condition_variable>
 #include <deque>
 #include <mutex>
 #include <thread>
 #include <vector>
+#include <map>
 
 namespace clang {
 namespace clangd {
-/// A simple fixed-size thread pool implementation.
+/// A handle to a queue managed by ThreadPool. Can be used to schedule
+/// computation on a queue by calling ThreadPool::scheduleOnQueue or signal that
+/// the queue is not used anymore by calling ThreadPool::removeQueue.
+class QueueHandle {
+  friend class ThreadPool;
+
+public:
+  QueueHandle() = default;
+
+  QueueHandle(const QueueHandle &) = delete;
+  QueueHandle &operator=(const QueueHandle &) = delete;
+
+  QueueHandle(QueueHandle &&Other) { *this = std::move(Other); }
+
+  QueueHandle &operator=(QueueHandle &&Other) {
+    assert(Other.isValid());
+    Queue = Other.Queue;
+    Other.Queue = nullptr;
+    return *this;
+  }
+
+  bool isValid() const { return Queue != nullptr; }
+
+private:
+  explicit QueueHandle(RequestQueue &Queue) : Queue(&Queue) {}
+
+private:
+  RequestQueue *Queue = nullptr;
+};
+
+/// Asynchronous tasks executor backed by a fixed size thread pool.
+/// There are two ways to schedule computations in ThreadPool:
+///    - schedule will schedule computation to be executed on one of
+///      the working threads as soon as an idle thread is available.
+///    - scheduleOnQueue will schedule to a specific queue. Requests from the
+///      same queue are not processed concurrently. Requests in each queue are
+///      executed in the FIFO order.
 class ThreadPool {
 public:
   /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
@@ -31,37 +69,43 @@
 
   /// Add a new request to run function \p F with args \p As to the start of the
   /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args>
-  void addToFront(Func &&F, Args &&... As) {
+  template <class Func, class... Args> void schedule(Func &&F, Args &&... As) {
     if (RunSynchronously) {
       std::forward<Func>(F)(std::forward<Args>(As)...);
       return;
     }
 
     {
       std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.push_front(
+      Requests.addToFront(
           BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
     }
     RequestCV.notify_one();
   }
 
-  /// Add a new request to run function \p F with args \p As to the end of the
-  /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
+  /// Add a new request to the end of the queue specified by \p Q. Requests on
+  /// the same queue will not be processed concurrently.
+  template <class Func, class... Args>
+  void scheduleOnQueue(const QueueHandle &Q, Func &&F, Args &&... As) {
+    assert(Q.isValid());
+
     if (RunSynchronously) {
       std::forward<Func>(F)(std::forward<Args>(As)...);
       return;
     }
 
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.push_back(
-          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
-    }
-    RequestCV.notify_one();
+    Q.Queue->addToBack(
+        BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
   }
 
+  /// Create a new queue and return a handle to it. You can schedule new
+  /// requests on the queue by passing the handle to scheduleOnQueue.
+  QueueHandle createQueue();
+  /// Remove a queue, corresponding to the passed handle \p Q. No new requests
+  /// can be added the queue after calling this function, but the pending
+  /// requests will be executed.
+  void removeQueue(QueueHandle &&Q);
+
 private:
   bool RunSynchronously;
   mutable std::mutex Mutex;
@@ -71,11 +115,14 @@
   std::vector<std::thread> Workers;
   /// Setting Done to true will make the worker threads terminate.
   bool Done = false;
-  /// A queue of requests.
-  std::deque<UniqueFunction<void()>> RequestQueue;
+  /// Separate queues, created for the process.
+  std::map<RequestQueue *, std::unique_ptr<RequestQueue>> ExclusiveQueues;
   /// Condition variable to wake up worker threads.
   std::condition_variable RequestCV;
+  /// A queue of requests.
+  RequestQueue Requests;
 };
+
 } // namespace clangd
 } // namespace clang
 #endif
Index: clangd/Threading.cpp
===================================================================
--- clangd/Threading.cpp
+++ clangd/Threading.cpp
@@ -5,7 +5,7 @@
 namespace clang {
 namespace clangd {
 ThreadPool::ThreadPool(unsigned AsyncThreadsCount)
-    : RunSynchronously(AsyncThreadsCount == 0) {
+    : RunSynchronously(AsyncThreadsCount == 0), Requests(RequestCV) {
   if (RunSynchronously) {
     // Don't start the worker thread if we're running synchronously
     return;
@@ -17,27 +17,53 @@
       llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
       while (true) {
         UniqueFunction<void()> Request;
+        RequestQueue *ExclQueue = nullptr;
+        std::unique_ptr<RequestQueue> QueueToRemove;
 
-        // Pick request from the queue
+        // Pick request from the queue.
         {
           std::unique_lock<std::mutex> Lock(Mutex);
           // Wait for more requests.
-          RequestCV.wait(Lock,
-                         [this] { return !RequestQueue.empty() || Done; });
-          if (RequestQueue.empty()) {
+          RequestCV.wait(Lock, [&] {
+            if (auto PriorityReq = Requests.pop()) {
+              Request = std::move(*PriorityReq);
+              return true;
+            }
+            for (auto It = ExclusiveQueues.begin(), End = ExclusiveQueues.end();
+                 It != End; ++It) {
+              if (!It->first->needsProcessing())
+                continue;
+
+              ExclQueue = It->first;
+              ExclQueue->startProcessing();
+              if (ExclQueue->isScheduledForRemoval()) {
+                QueueToRemove = std::move(It->second);
+                ExclusiveQueues.erase(It);
+              }
+              return true;
+            }
+            if (Done)
+              return true;
+            return false;
+          });
+
+          if (!Request && !ExclQueue) {
             assert(Done);
             return;
           }
+          } // unlock Mutex
 
-          // We process requests starting from the front of the queue. Users of
-          // ThreadPool have a way to prioritise their requests by putting
-          // them to the either side of the queue (using either addToEnd or
-          // addToFront).
-          Request = std::move(RequestQueue.front());
-          RequestQueue.pop_front();
-        } // unlock Mutex
+        // We're processing a foreground request.
+        if (Request) {
+          Request();
+          continue;
+        }
 
-        Request();
+        // We're processing a non-empty request queue.
+        assert(ExclQueue);
+        while (auto ExclRequest = ExclQueue->pop())
+          (*ExclRequest)();
+        ExclQueue->stopProcessing();
       }
     }));
   }
@@ -57,5 +83,19 @@
   for (auto &Worker : Workers)
     Worker.join();
 }
+
+QueueHandle ThreadPool::createQueue() {
+  auto NewQueueOwner = llvm::make_unique<RequestQueue>(RequestCV);
+  auto NewQueue = NewQueueOwner.get();
+  {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    ExclusiveQueues.emplace(NewQueue, std::move(NewQueueOwner));
+  }
+  return QueueHandle(*NewQueue);
+}
+
+void ThreadPool::removeQueue(QueueHandle &&Q) {
+  Q.Queue->setScheduledForRemoval();
+}
 } // namespace clangd
 } // namespace clang
Index: clangd/TUScheduler.h
===================================================================
--- clangd/TUScheduler.h
+++ clangd/TUScheduler.h
@@ -11,9 +11,10 @@
 #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H
 
 #include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
 #include "Function.h"
 #include "Threading.h"
+#include "threading/Cancellation.h"
+#include <map>
 
 namespace clang {
 namespace clangd {
@@ -32,6 +33,66 @@
   const PreambleData *Preamble;
 };
 
+/// Limits the number of threads that can acquire this semaphore.
+class CountingSemaphore {
+public:
+  CountingSemaphore(std::size_t MaxLocks);
+
+  void lock();
+  void unlock();
+
+private:
+  std::mutex Mutex;
+  std::condition_variable SlotsChanged;
+  std::size_t FreeSlots;
+};
+
+class FileASTThread {
+public:
+  FileASTThread(CountingSemaphore &Barrier, std::shared_ptr<CppFile> File,
+                bool RunSync);
+  ~FileASTThread();
+
+  std::shared_ptr<const PreambleData> getPossiblyStalePreamble() const;
+  std::size_t getUsedBytes() const;
+
+  void setDone();
+
+  void update(
+      Context Ctx, ParseInputs Inputs,
+      UniqueFunction<void(Context, llvm::Optional<std::vector<DiagWithFixIts>>)>
+          OnUpdated);
+  void enqueueRead(UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action);
+
+private:
+  const bool RunSync;
+  mutable std::mutex Mutex;
+  bool Done;
+  std::condition_variable RequestsCV;
+  RequestQueue Requests;
+  std::shared_ptr<CppFile> File;
+  // Inputs, corresponding to the current state of File. Note that Requests may
+  // contain a request to update the inputs.
+  ParseInputs Inputs;
+  std::thread Worker;
+};
+
+class CleanupThread {
+public:
+  CleanupThread();
+  ~CleanupThread();
+
+  void cleanupFile(std::unique_ptr<FileASTThread> Thread);
+  void waitTillFinished(std::future<void> Task);
+
+private:
+  std::mutex Mutex;
+  std::condition_variable RequestsCV;
+  RequestQueue Requests;
+  bool Done;
+  std::thread Worker;
+};
+
 /// Handles running tasks for ClangdServer and managing the resources (e.g.,
 /// preambles and ASTs) for opened files.
 /// TUScheduler is not thread-safe, only one thread should be providing updates
@@ -82,11 +143,31 @@
       UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action);
 
 private:
-  const ParseInputs &getInputs(PathRef File);
+  struct FileData {
+    FileData() = default;
+    FileData(ParseInputs Inputs, std::unique_ptr<FileASTThread> Worker);
+
+    ~FileData() {
+      if (!Worker)
+        return;
+      Worker->setDone();
+    }
+
+    ParseInputs Inputs;
+    std::unique_ptr<FileASTThread> Worker;
+  };
+
+  struct AuxData {
+    bool StorePreamblesInMemory;
+    std::shared_ptr<PCHContainerOperations> PCHs;
+    ASTParsedCallback ASTCallback;
+  };
 
-  llvm::StringMap<ParseInputs> CachedInputs;
-  CppFileCollection Files;
-  ThreadPool Threads;
+  const bool RunSync;
+  const AuxData Data;
+  CountingSemaphore Barrier;
+  llvm::StringMap<std::unique_ptr<FileData>> Files;
+  CleanupThread GCThread;
 };
 } // namespace clangd
 } // namespace clang
Index: clangd/TUScheduler.cpp
===================================================================
--- clangd/TUScheduler.cpp
+++ clangd/TUScheduler.cpp
@@ -14,111 +14,269 @@
   return HardwareConcurrency;
 }
 
+FileASTThread::FileASTThread(CountingSemaphore &Barrier,
+                             std::shared_ptr<CppFile> File, bool RunSync)
+    : RunSync(RunSync), Done(false), Requests(RequestsCV),
+      File(std::move(File)) {
+  if (RunSync)
+    return;
+  Worker = std::thread([&]() {
+    while (true) {
+      std::unique_lock<std::mutex> Lock(Mutex);
+      RequestsCV.wait(Lock,
+                      [&]() { return Done || Requests.needsProcessing(); });
+      Lock.unlock();
+
+      std::lock_guard<CountingSemaphore> BarrierLock(Barrier);
+      Requests.startProcessing();
+      while (auto Req = Requests.pop())
+        (*Req)();
+      Requests.stopProcessing();
+
+      if (Done)
+        return;
+    }
+  });
+}
+
+FileASTThread::~FileASTThread() {
+  if (RunSync)
+    return;
+
+#ifndef NDEBUG
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(Done && "setDone must be called before running destructor");
+  Lock.unlock();
+#endif
+
+  Worker.join();
+}
+
+std::shared_ptr<const PreambleData>
+FileASTThread::getPossiblyStalePreamble() const {
+  std::lock_guard<std::mutex> Lock(Mutex);
+  return File->getPossiblyStalePreamble();
+}
+
+std::size_t FileASTThread::getUsedBytes() const {
+  std::lock_guard<std::mutex> Lock(Mutex);
+  return File->getUsedBytes();
+}
+
+void FileASTThread::setDone() {
+  {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    Done = true;
+    Requests.setScheduledForRemoval();
+  }
+  RequestsCV.notify_one();
+}
+
+void FileASTThread::update(
+    Context Ctx, ParseInputs Inputs,
+    UniqueFunction<void(Context, llvm::Optional<std::vector<DiagWithFixIts>>)>
+        OnUpdated) {
+
+  auto Task = [=](Context Ctx, decltype(OnUpdated) OnUpdated) {
+    this->Inputs = Inputs;
+    auto Diags = File->rebuild(Ctx, ParseInputs(Inputs));
+    OnUpdated(std::move(Ctx), std::move(Diags));
+  };
+
+  if (RunSync) {
+    Task(std::move(Ctx), std::move(OnUpdated));
+    return;
+  }
+
+  Requests.addToBack(
+      BindWithForward(Task, std::move(Ctx), std::move(OnUpdated)));
+}
+
+void FileASTThread::enqueueRead(
+    UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
+  auto Task = [=](decltype(Action) Action) {
+    auto AST = File->getAST().get();
+
+    AST->runUnderLock([&](ParsedAST *AST) {
+      if (!AST) {
+        Action(llvm::make_error<llvm::StringError>(
+            "invalid AST", llvm::errc::invalid_argument));
+        return;
+      }
+      Action(InputsAndAST{Inputs, *AST});
+    });
+  };
+
+  if (RunSync) {
+    Task(std::move(Action));
+    return;
+  }
+
+  Requests.addToBack(BindWithForward(Task, std::move(Action)));
+}
+
+CleanupThread::CleanupThread() : Requests(RequestsCV), Done(false) {
+  Worker = std::thread([&]() {
+    while (true) {
+      std::unique_lock<std::mutex> Lock(Mutex);
+      RequestsCV.wait(Lock,
+                      [&]() { return Done || Requests.needsProcessing(); });
+      if (!Requests.needsProcessing()) {
+        assert(Done);
+        return;
+      }
+
+      auto Req = Requests.pop();
+      Lock.unlock();
+      (*Req)();
+    }
+  });
+}
+
+CleanupThread::~CleanupThread() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  Done = true;
+  Lock.unlock();
+
+  RequestsCV.notify_one();
+  Worker.join();
+}
+
+void CleanupThread::cleanupFile(std::unique_ptr<FileASTThread> Thread) {
+  assert(Thread);
+
+  auto CleanupTask = [](std::unique_ptr<FileASTThread>) {
+    // Wait for FileASTThreads's destructor to finish. Nothing else to do.
+  };
+  Thread->setDone();
+  Requests.addToBack(BindWithForward(CleanupTask, std::move(Thread)));
+}
+
+void CleanupThread::waitTillFinished(std::future<void> Task) {
+  auto CleanupTask = [](std::future<void> Task) { Task.wait(); };
+  Requests.addToBack(BindWithForward(CleanupTask, std::move(Task)));
+}
+
+CountingSemaphore::CountingSemaphore(std::size_t MaxLocks)
+    : FreeSlots(MaxLocks) {}
+
+void CountingSemaphore::lock() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; });
+  --FreeSlots;
+}
+
+void CountingSemaphore::unlock() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  ++FreeSlots;
+  Lock.unlock();
+
+  SlotsChanged.notify_one();
+}
+
 TUScheduler::TUScheduler(unsigned AsyncThreadsCount,
                          bool StorePreamblesInMemory,
                          ASTParsedCallback ASTCallback)
-
-    : Files(StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
-            std::move(ASTCallback)),
-      Threads(AsyncThreadsCount) {}
+    : RunSync(AsyncThreadsCount == 0),
+      Data{StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
+           std::move(ASTCallback)},
+      Barrier(AsyncThreadsCount) {}
 
 void TUScheduler::update(
     Context Ctx, PathRef File, ParseInputs Inputs,
     UniqueFunction<void(Context Ctx,
                         llvm::Optional<std::vector<DiagWithFixIts>>)>
         OnUpdated) {
-  CachedInputs[File] = Inputs;
-
-  auto Resources = Files.getOrCreateFile(File);
-  auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
+  auto It = Files.find(File);
+  if (It == Files.end()) {
+    auto Worker = llvm::make_unique<FileASTThread>(
+        Barrier,
+        CppFile::Create(File, Data.StorePreamblesInMemory, Data.PCHs,
+                        Data.ASTCallback),
+        RunSync);
+    It = Files
+             .insert({File, std::unique_ptr<FileData>(
+                                new FileData{Inputs, std::move(Worker)})})
+             .first;
+  } else {
+    It->second->Inputs = Inputs;
+  }
 
-  Threads.addToFront(
-      [](Context Ctx, decltype(OnUpdated) OnUpdated,
-         decltype(DeferredRebuild) DeferredRebuild) {
-        auto Diags = DeferredRebuild(Ctx);
-        OnUpdated(std::move(Ctx), Diags);
-      },
-      std::move(Ctx), std::move(OnUpdated), std::move(DeferredRebuild));
+  It->second->Worker->update(std::move(Ctx), Inputs, std::move(OnUpdated));
 }
 
 void TUScheduler::remove(PathRef File,
                          UniqueFunction<void(llvm::Error)> Action) {
-  CachedInputs.erase(File);
-
-  auto Resources = Files.removeIfPresent(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to remove non-added document", llvm::errc::invalid_argument));
     return;
   }
 
-  auto DeferredCancel = Resources->deferCancelRebuild();
-  Threads.addToFront(
-      [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) {
-        DeferredCancel();
-        Action(llvm::Error::success());
-      },
-      std::move(Action), std::move(DeferredCancel));
+  std::unique_ptr<FileData> Data = std::move(It->second);
+  Files.erase(It);
+
+  GCThread.cleanupFile(std::move(Data->Worker));
 }
 
 void TUScheduler::runWithAST(
     PathRef File, UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
-  auto Resources = Files.getFile(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to get AST for non-added document",
         llvm::errc::invalid_argument));
     return;
   }
 
-  const ParseInputs &Inputs = getInputs(File);
-  // We currently block the calling thread until AST is available and run the
-  // action on the calling thread to avoid inconsistent states coming from
-  // subsequent updates.
-  // FIXME(ibiryukov): this should be moved to the worker threads.
-  Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
-    if (AST)
-      Action(InputsAndAST{Inputs, *AST});
-    else
-      Action(llvm::make_error<llvm::StringError>(
-          "Could not build AST for the latest file update",
-          llvm::errc::invalid_argument));
-  });
+  It->second->Worker->enqueueRead(std::move(Action));
 }
 
 void TUScheduler::runWithPreamble(
     PathRef File,
     UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action) {
-  std::shared_ptr<CppFile> Resources = Files.getFile(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to get preamble for non-added document",
         llvm::errc::invalid_argument));
     return;
   }
 
-  const ParseInputs &Inputs = getInputs(File);
+  ParseInputs InputsCopy = It->second->Inputs;
   std::shared_ptr<const PreambleData> Preamble =
-      Resources->getPossiblyStalePreamble();
-  Threads.addToFront(
-      [Resources, Preamble, Inputs](decltype(Action) Action) mutable {
-        if (!Preamble)
-          Preamble = Resources->getPossiblyStalePreamble();
+      It->second->Worker->getPossiblyStalePreamble();
 
-        Action(InputsAndPreamble{Inputs, Preamble.get()});
-      },
-      std::move(Action));
-}
+  if (RunSync) {
+    Action(InputsAndPreamble{InputsCopy, Preamble.get()});
+    return;
+  }
 
-const ParseInputs &TUScheduler::getInputs(PathRef File) {
-  auto It = CachedInputs.find(File);
-  assert(It != CachedInputs.end());
-  return It->second;
+  auto Task = [InputsCopy, Preamble, this](decltype(Action) Action) mutable {
+    std::lock_guard<CountingSemaphore> BarrierLock(Barrier);
+    // XXX: what if preamble got built by this time?
+    // if (!Preamble)
+    //   Preamble = Resources->getPossiblyStalePreamble();
+    Action(InputsAndPreamble{InputsCopy, Preamble.get()});
+  };
+
+  GCThread.waitTillFinished(
+      std::async(std::launch::async, Task, std::move(Action)));
 }
 
 std::vector<std::pair<Path, std::size_t>>
 TUScheduler::getUsedBytesPerFile() const {
-  return Files.getUsedBytesPerFile();
+  std::vector<std::pair<Path, std::size_t>> Result;
+  Result.reserve(Files.size());
+  for (auto &&PathAndFile : Files)
+    Result.push_back(
+        {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()});
+  return Result;
 }
+
+TUScheduler::FileData::FileData(ParseInputs Inputs,
+                                std::unique_ptr<FileASTThread> Worker)
+    : Inputs(std::move(Inputs)), Worker(std::move(Worker)) {}
 } // namespace clangd
 } // namespace clang
Index: clangd/ClangdUnitStore.h
===================================================================
--- clangd/ClangdUnitStore.h
+++ /dev/null
@@ -1,73 +0,0 @@
-//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===//
-//
-//                     The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===---------------------------------------------------------------------===//
-
-#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-
-#include "ClangdUnit.h"
-#include "GlobalCompilationDatabase.h"
-#include "Logger.h"
-#include "Path.h"
-#include "clang/Tooling/CompilationDatabase.h"
-#include <mutex>
-
-namespace clang {
-namespace clangd {
-
-class Logger;
-
-/// Thread-safe mapping from FileNames to CppFile.
-class CppFileCollection {
-public:
-  /// \p ASTCallback is called when a file is parsed synchronously. This should
-  /// not be expensive since it blocks diagnostics.
-  explicit CppFileCollection(bool StorePreamblesInMemory,
-                             std::shared_ptr<PCHContainerOperations> PCHs,
-                             ASTParsedCallback ASTCallback)
-      : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)),
-        StorePreamblesInMemory(StorePreamblesInMemory) {}
-
-  std::shared_ptr<CppFile> getOrCreateFile(PathRef File) {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    auto It = OpenedFiles.find(File);
-    if (It == OpenedFiles.end()) {
-      It = OpenedFiles
-               .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory,
-                                                  PCHs, ASTCallback))
-               .first;
-    }
-    return It->second;
-  }
-
-  std::shared_ptr<CppFile> getFile(PathRef File) const {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    auto It = OpenedFiles.find(File);
-    if (It == OpenedFiles.end())
-      return nullptr;
-    return It->second;
-  }
-
-  /// Removes a CppFile, stored for \p File, if it's inside collection and
-  /// returns it.
-  std::shared_ptr<CppFile> removeIfPresent(PathRef File);
-
-  /// Gets used memory for each of the stored files.
-  std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
-
-private:
-  mutable std::mutex Mutex;
-  llvm::StringMap<std::shared_ptr<CppFile>> OpenedFiles;
-  ASTParsedCallback ASTCallback;
-  std::shared_ptr<PCHContainerOperations> PCHs;
-  bool StorePreamblesInMemory;
-};
-} // namespace clangd
-} // namespace clang
-
-#endif
Index: clangd/ClangdUnitStore.cpp
===================================================================
--- clangd/ClangdUnitStore.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===//
-//
-//                     The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===----------------------------------------------------------------------===//
-
-#include "ClangdUnitStore.h"
-#include "llvm/Support/Path.h"
-#include <algorithm>
-
-using namespace clang::clangd;
-using namespace clang;
-
-std::shared_ptr<CppFile> CppFileCollection::removeIfPresent(PathRef File) {
-  std::lock_guard<std::mutex> Lock(Mutex);
-
-  auto It = OpenedFiles.find(File);
-  if (It == OpenedFiles.end())
-    return nullptr;
-
-  std::shared_ptr<CppFile> Result = It->second;
-  OpenedFiles.erase(It);
-  return Result;
-}
-std::vector<std::pair<Path, std::size_t>>
-CppFileCollection::getUsedBytesPerFile() const {
-  std::lock_guard<std::mutex> Lock(Mutex);
-  std::vector<std::pair<Path, std::size_t>> Result;
-  Result.reserve(OpenedFiles.size());
-  for (auto &&PathAndFile : OpenedFiles)
-    Result.push_back(
-        {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()});
-  return Result;
-}
Index: clangd/ClangdServer.h
===================================================================
--- clangd/ClangdServer.h
+++ clangd/ClangdServer.h
@@ -11,7 +11,6 @@
 #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H
 
 #include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
 #include "CodeComplete.h"
 #include "CompileArgsCache.h"
 #include "DraftStore.h"
@@ -95,8 +94,6 @@
   getTaggedFileSystem(PathRef File) override;
 };
 
-class ClangdServer;
-
 /// Provides API to manage ASTs for a collection of C++ files and request
 /// various language features.
 /// Currently supports async diagnostics, code completion, formatting and goto
Index: clangd/CMakeLists.txt
===================================================================
--- clangd/CMakeLists.txt
+++ clangd/CMakeLists.txt
@@ -6,7 +6,6 @@
   ClangdLSPServer.cpp
   ClangdServer.cpp
   ClangdUnit.cpp
-  ClangdUnitStore.cpp
   CodeComplete.cpp
   CodeCompletionStrings.cpp
   CompileArgsCache.cpp
@@ -32,6 +31,8 @@
   index/Merge.cpp
   index/SymbolCollector.cpp
   index/SymbolYAML.cpp
+  threading/Cancellation.cpp
+  threading/RequestQueue.cpp
 
   LINK_LIBS
   clangAST
_______________________________________________
cfe-commits mailing list
cfe-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits

Reply via email to