ilya-biryukov created this revision.
ilya-biryukov added a reviewer: sammccall.
Herald added subscribers: cfe-commits, hintonda, ioeric, jkorous-apple, mgorny, 
klimek.

DO NOT SUBMIT (yet). This is a non-final version of the patch.


Repository:
  rCTE Clang Tools Extra

https://reviews.llvm.org/D42573

Files:
  clangd/CMakeLists.txt
  clangd/ClangdServer.cpp
  clangd/ClangdServer.h
  clangd/ClangdUnitStore.cpp
  clangd/ClangdUnitStore.h
  clangd/threading/Cancellation.cpp
  clangd/threading/Cancellation.h
  clangd/threading/RequestQueue.cpp
  clangd/threading/RequestQueue.h

Index: clangd/threading/RequestQueue.h
===================================================================
--- /dev/null
+++ clangd/threading/RequestQueue.h
@@ -0,0 +1,50 @@
+//===--- RequestQueue.h -----------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+
+#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H
+#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H
+
+#include "Function.h"
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+
+namespace clang {
+namespace clangd {
+
+/// A thread-safe request queue managed by ThreadPool. This is an implementation
+/// detail, see QueueHandle and ThreadPool for user-facing APIs.
+class RequestQueue {
+public:
+  RequestQueue(std::condition_variable &Condition);
+
+  void addToFront(UniqueFunction<void()> Req);
+  void addToBack(UniqueFunction<void()> Req);
+
+  llvm::Optional<UniqueFunction<void()>> pop();
+
+  bool needsProcessing() const;
+
+  bool isScheduledForRemoval() const;
+  void setScheduledForRemoval();
+
+  void startProcessing();
+  void stopProcessing();
+
+private:
+  std::condition_variable &Condition;
+  mutable std::mutex Mutex;
+  bool IsProcessing;
+  bool IsScheduledForRemoval;
+  std::deque<UniqueFunction<void()>> Requests;
+};
+
+} // namespace clangd
+} // namespace clang
+#endif
Index: clangd/threading/RequestQueue.cpp
===================================================================
--- /dev/null
+++ clangd/threading/RequestQueue.cpp
@@ -0,0 +1,78 @@
+//===--- RequestQueue.cpp ---------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+#include "RequestQueue.h"
+
+namespace clang {
+namespace clangd {
+
+RequestQueue::RequestQueue(std::condition_variable &Condition)
+    : Condition(Condition), IsProcessing(false), IsScheduledForRemoval(false) {}
+
+void RequestQueue::addToFront(UniqueFunction<void()> Req) {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  Requests.emplace_front(std::move(Req));
+
+  Lock.unlock();
+  Condition.notify_one();
+}
+
+void RequestQueue::addToBack(UniqueFunction<void()> Req) {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  Requests.emplace_back(std::move(Req));
+
+  Lock.unlock();
+  Condition.notify_one();
+}
+
+llvm::Optional<UniqueFunction<void()>> RequestQueue::pop() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  if (Requests.empty())
+    return llvm::None;
+
+  UniqueFunction<void()> Result = std::move(Requests.front());
+  Requests.pop_front();
+  return Result;
+}
+
+bool RequestQueue::needsProcessing() const {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  if (IsProcessing)
+    return false;
+  return !Requests.empty() || IsScheduledForRemoval;
+}
+
+bool RequestQueue::isScheduledForRemoval() const {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  return IsScheduledForRemoval;
+}
+
+void RequestQueue::setScheduledForRemoval() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(!IsScheduledForRemoval);
+  IsScheduledForRemoval = true;
+}
+
+void RequestQueue::startProcessing() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(!IsProcessing);
+  assert(!Requests.empty() || IsScheduledForRemoval);
+  IsProcessing = true;
+}
+
+void RequestQueue::stopProcessing() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  assert(IsProcessing);
+  IsProcessing = false;
+
+  Lock.unlock();
+  Condition.notify_one();
+}
+
+} // namespace clangd
+} // namespace clang
Index: clangd/threading/Cancellation.h
===================================================================
--- /dev/null
+++ clangd/threading/Cancellation.h
@@ -0,0 +1,44 @@
+//===--- Cancellation.h -----------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+
+#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H
+#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H
+
+#include <atomic>
+#include <cassert>
+#include <memory>
+
+namespace clang {
+namespace clangd {
+
+/// A shared boolean flag indicating if the computation was cancelled.
+/// Once cancelled, cannot be returned to the previous state.
+/// FIXME: We should split this class it into consumers and producers of the
+/// cancellation flags.
+class CancellationFlag {
+public:
+  CancellationFlag();
+
+  void setCancelled() {
+    assert(WasCancelled && "the object was moved");
+    WasCancelled->store(true);
+  }
+
+  bool isCancelled() const {
+    assert(WasCancelled && "the object was moved");
+    return WasCancelled->load();
+  }
+
+private:
+  std::shared_ptr<std::atomic<bool>> WasCancelled;
+};
+} // namespace clangd
+} // namespace clang
+
+#endif
Index: clangd/threading/Cancellation.cpp
===================================================================
--- /dev/null
+++ clangd/threading/Cancellation.cpp
@@ -0,0 +1,18 @@
+//===--- Cancellation.cpp ---------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===---------------------------------------------------------------------===//
+
+#include "Cancellation.h"
+namespace clang {
+namespace clangd {
+
+CancellationFlag::CancellationFlag()
+    : WasCancelled(std::make_shared<std::atomic<bool>>(false)) {}
+
+} // namespace clangd
+} // namespace clang
Index: clangd/ClangdUnitStore.h
===================================================================
--- clangd/ClangdUnitStore.h
+++ /dev/null
@@ -1,73 +0,0 @@
-//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===//
-//
-//                     The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===---------------------------------------------------------------------===//
-
-#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-
-#include "ClangdUnit.h"
-#include "GlobalCompilationDatabase.h"
-#include "Logger.h"
-#include "Path.h"
-#include "clang/Tooling/CompilationDatabase.h"
-#include <mutex>
-
-namespace clang {
-namespace clangd {
-
-class Logger;
-
-/// Thread-safe mapping from FileNames to CppFile.
-class CppFileCollection {
-public:
-  /// \p ASTCallback is called when a file is parsed synchronously. This should
-  /// not be expensive since it blocks diagnostics.
-  explicit CppFileCollection(bool StorePreamblesInMemory,
-                             std::shared_ptr<PCHContainerOperations> PCHs,
-                             ASTParsedCallback ASTCallback)
-      : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)),
-        StorePreamblesInMemory(StorePreamblesInMemory) {}
-
-  std::shared_ptr<CppFile> getOrCreateFile(PathRef File) {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    auto It = OpenedFiles.find(File);
-    if (It == OpenedFiles.end()) {
-      It = OpenedFiles
-               .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory,
-                                                  PCHs, ASTCallback))
-               .first;
-    }
-    return It->second;
-  }
-
-  std::shared_ptr<CppFile> getFile(PathRef File) const {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    auto It = OpenedFiles.find(File);
-    if (It == OpenedFiles.end())
-      return nullptr;
-    return It->second;
-  }
-
-  /// Removes a CppFile, stored for \p File, if it's inside collection and
-  /// returns it.
-  std::shared_ptr<CppFile> removeIfPresent(PathRef File);
-
-  /// Gets used memory for each of the stored files.
-  std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
-
-private:
-  mutable std::mutex Mutex;
-  llvm::StringMap<std::shared_ptr<CppFile>> OpenedFiles;
-  ASTParsedCallback ASTCallback;
-  std::shared_ptr<PCHContainerOperations> PCHs;
-  bool StorePreamblesInMemory;
-};
-} // namespace clangd
-} // namespace clang
-
-#endif
Index: clangd/ClangdUnitStore.cpp
===================================================================
--- clangd/ClangdUnitStore.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===//
-//
-//                     The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===----------------------------------------------------------------------===//
-
-#include "ClangdUnitStore.h"
-#include "llvm/Support/Path.h"
-#include <algorithm>
-
-using namespace clang::clangd;
-using namespace clang;
-
-std::shared_ptr<CppFile> CppFileCollection::removeIfPresent(PathRef File) {
-  std::lock_guard<std::mutex> Lock(Mutex);
-
-  auto It = OpenedFiles.find(File);
-  if (It == OpenedFiles.end())
-    return nullptr;
-
-  std::shared_ptr<CppFile> Result = It->second;
-  OpenedFiles.erase(It);
-  return Result;
-}
-std::vector<std::pair<Path, std::size_t>>
-CppFileCollection::getUsedBytesPerFile() const {
-  std::lock_guard<std::mutex> Lock(Mutex);
-  std::vector<std::pair<Path, std::size_t>> Result;
-  Result.reserve(OpenedFiles.size());
-  for (auto &&PathAndFile : OpenedFiles)
-    Result.push_back(
-        {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()});
-  return Result;
-}
Index: clangd/ClangdServer.h
===================================================================
--- clangd/ClangdServer.h
+++ clangd/ClangdServer.h
@@ -11,14 +11,15 @@
 #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H
 
 #include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
 #include "CodeComplete.h"
 #include "CompileArgsCache.h"
 #include "DraftStore.h"
 #include "Function.h"
 #include "GlobalCompilationDatabase.h"
 #include "Protocol.h"
 #include "index/FileIndex.h"
+#include "threading/Cancellation.h"
+#include "threading/RequestQueue.h"
 #include "clang/Tooling/CompilationDatabase.h"
 #include "clang/Tooling/Core/Replacement.h"
 #include "llvm/ADT/IntrusiveRefCntPtr.h"
@@ -97,14 +98,48 @@
   getTaggedFileSystem(PathRef File) override;
 };
 
-class ClangdServer;
-
 /// Returns a number of a default async threads to use for Scheduler.
 /// Returned value is always >= 1 (i.e. will not cause requests to be processed
 /// synchronously).
 unsigned getDefaultAsyncThreadsCount();
 
-/// A simple fixed-size thread pool implementation.
+/// A handle to a queue managed by ThreadPool. Can be used to schedule
+/// computation on a queue by calling ThreadPool::scheduleOnQueue or signal that
+/// the queue is not used anymore by calling ThreadPool::removeQueue.
+class QueueHandle {
+  friend class ThreadPool;
+
+public:
+  QueueHandle() = default;
+
+  QueueHandle(const QueueHandle &) = delete;
+  QueueHandle &operator=(const QueueHandle &) = delete;
+
+  QueueHandle(QueueHandle &&Other) { *this = std::move(Other); }
+
+  QueueHandle &operator=(QueueHandle &&Other) {
+    assert(Other.isValid());
+    Queue = Other.Queue;
+    Other.Queue = nullptr;
+    return *this;
+  }
+
+  bool isValid() const { return Queue != nullptr; }
+
+private:
+  explicit QueueHandle(RequestQueue &Queue) : Queue(&Queue) {}
+
+private:
+  RequestQueue *Queue = nullptr;
+};
+
+/// Asynchronous tasks executor backed by a fixed size thread pool.
+/// There are two ways to schedule computations in ThreadPool:
+///    - schedule will schedule computation to be executed on one of
+///      the working threads as soon as an idle thread is available.
+///    - scheduleOnQueue will schedule to a specific queue. Requests from the
+///      same queue are not processed concurrently. Requests in each queue are
+///      executed in the FIFO order.
 class ThreadPool {
 public:
   /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
@@ -116,37 +151,43 @@
 
   /// Add a new request to run function \p F with args \p As to the start of the
   /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args>
-  void addToFront(Func &&F, Args &&... As) {
+  template <class Func, class... Args> void schedule(Func &&F, Args &&... As) {
     if (RunSynchronously) {
       std::forward<Func>(F)(std::forward<Args>(As)...);
       return;
     }
 
     {
       std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.push_front(
+      Requests.addToFront(
           BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
     }
     RequestCV.notify_one();
   }
 
-  /// Add a new request to run function \p F with args \p As to the end of the
-  /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
+  /// Add a new request to the end of the queue specified by \p Q. Requests on
+  /// the same queue will not be processed concurrently.
+  template <class Func, class... Args>
+  void scheduleOnQueue(const QueueHandle &Q, Func &&F, Args &&... As) {
+    assert(Q.isValid());
+
     if (RunSynchronously) {
       std::forward<Func>(F)(std::forward<Args>(As)...);
       return;
     }
 
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.push_back(
-          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
-    }
-    RequestCV.notify_one();
+    Q.Queue->addToBack(
+        BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
   }
 
+  /// Create a new queue and return a handle to it. You can schedule new
+  /// requests on the queue by passing the handle to scheduleOnQueue.
+  QueueHandle createQueue();
+  /// Remove a queue, corresponding to the passed handle \p Q. No new requests
+  /// can be added the queue after calling this function, but the pending
+  /// requests will be executed.
+  void removeQueue(QueueHandle &&Q);
+
 private:
   bool RunSynchronously;
   mutable std::mutex Mutex;
@@ -156,21 +197,22 @@
   std::vector<std::thread> Workers;
   /// Setting Done to true will make the worker threads terminate.
   bool Done = false;
-  /// A queue of requests.
-  std::deque<UniqueFunction<void()>> RequestQueue;
+  /// Separate queues, created for the process.
+  std::map<RequestQueue *, std::unique_ptr<RequestQueue>> ExclusiveQueues;
   /// Condition variable to wake up worker threads.
   std::condition_variable RequestCV;
+  /// A queue of requests.
+  RequestQueue Requests;
 };
 
-
 struct InputsAndAST {
   const ParseInputs &Inputs;
   ParsedAST &AST;
 };
 
 struct InputsAndPreamble {
   const ParseInputs &Inputs;
-  const PreambleData* Preamble;
+  const PreambleData *Preamble;
 };
 
 /// Handles running tasks for ClangdServer and managing the resources (e.g.,
@@ -180,6 +222,10 @@
   Scheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory,
             ASTParsedCallback ASTCallback);
 
+  /// Returns estimated memory usage for each of the currently open files.
+  /// The order of results is unspecified.
+  std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
+
   /// Schedule an update for \p File. Adds \p File to a list of tracked files if
   /// \p File was not part of it before.
   void scheduleUpdate(
@@ -212,11 +258,26 @@
       UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action);
 
 private:
-  const ParseInputs& getInputs(PathRef File);
-
-private:
-  llvm::StringMap<ParseInputs> CachedInputs;
-  CppFileCollection Units;
+  struct FileData {
+    FileData() = default;
+    FileData(ParseInputs Inputs, std::shared_ptr<CppFile> Resources,
+             QueueHandle Queue);
+
+    ParseInputs Inputs;
+    std::shared_ptr<CppFile> Resources;
+    QueueHandle Queue;
+    bool LastRequestIsUpdate;
+    CancellationFlag LastUpdateCF;
+  };
+
+  struct AuxData {
+    bool StorePreamblesInMemory;
+    std::shared_ptr<PCHContainerOperations> PCHs;
+    ASTParsedCallback ASTCallback;
+  };
+
+  llvm::StringMap<std::unique_ptr<FileData>> Files;
+  AuxData Data;
   ThreadPool Executor;
 };
 
@@ -398,7 +459,6 @@
   scheduleReparseAndDiags(Context Ctx, PathRef File, VersionedDraft Contents,
                           Tagged<IntrusiveRefCntPtr<vfs::FileSystem>> TaggedFS);
 
-
   CompileArgsCache CompileArgs;
   DiagnosticsConsumer &DiagConsumer;
   FileSystemProvider &FSProvider;
Index: clangd/ClangdServer.cpp
===================================================================
--- clangd/ClangdServer.cpp
+++ clangd/ClangdServer.cpp
@@ -97,7 +97,7 @@
 }
 
 ThreadPool::ThreadPool(unsigned AsyncThreadsCount)
-    : RunSynchronously(AsyncThreadsCount == 0) {
+    : RunSynchronously(AsyncThreadsCount == 0), Requests(RequestCV) {
   if (RunSynchronously) {
     // Don't start the worker thread if we're running synchronously
     return;
@@ -109,27 +109,52 @@
       llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
       while (true) {
         UniqueFunction<void()> Request;
+        RequestQueue *ExclQueue = nullptr;
+        std::unique_ptr<RequestQueue> QueueToRemove;
 
-        // Pick request from the queue
+        // Pick request from the queue.
         {
           std::unique_lock<std::mutex> Lock(Mutex);
           // Wait for more requests.
-          RequestCV.wait(Lock,
-                         [this] { return !RequestQueue.empty() || Done; });
+          RequestCV.wait(Lock, [&] {
+            if (Done)
+              return true;
+            if (auto PriorityReq = Requests.pop()) {
+              Request = std::move(*PriorityReq);
+              return true;
+            }
+            for (auto It = ExclusiveQueues.begin(), End = ExclusiveQueues.end();
+                 It != End; ++It) {
+              if (!It->first->needsProcessing())
+                continue;
+
+              ExclQueue = It->first;
+              ExclQueue->startProcessing();
+              if (ExclQueue->isScheduledForRemoval()) {
+                QueueToRemove = std::move(It->second);
+                ExclusiveQueues.erase(It);
+              }
+              return true;
+            }
+            return false;
+          });
+
           if (Done)
             return;
-
-          assert(!RequestQueue.empty() && "RequestQueue was empty");
-
-          // We process requests starting from the front of the queue. Users of
-          // ThreadPool have a way to prioritise their requests by putting
-          // them to the either side of the queue (using either addToEnd or
-          // addToFront).
-          Request = std::move(RequestQueue.front());
-          RequestQueue.pop_front();
+          assert(Request || ExclQueue);
         } // unlock Mutex
 
-        Request();
+        // We're processing a foreground request.
+        if (Request) {
+          Request();
+          continue;
+        }
+
+        // We're processing a non-empty request queue.
+        assert(ExclQueue);
+        while (auto ExclRequest = ExclQueue->pop())
+          (*ExclRequest)();
+        ExclQueue->stopProcessing();
       }
     }));
   }
@@ -150,106 +175,161 @@
     Worker.join();
 }
 
+QueueHandle ThreadPool::createQueue() {
+  auto NewQueueOwner = llvm::make_unique<RequestQueue>(RequestCV);
+  auto NewQueue = NewQueueOwner.get();
+  {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    ExclusiveQueues.emplace(NewQueue, std::move(NewQueueOwner));
+  }
+  return QueueHandle(*NewQueue);
+}
+
+void ThreadPool::removeQueue(QueueHandle &&Q) {
+  Q.Queue->setScheduledForRemoval();
+}
+
 Scheduler::Scheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory,
                      ASTParsedCallback ASTCallback)
-    : Units(StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
-            std::move(ASTCallback)),
+    : Data{StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
+           std::move(ASTCallback)},
       Executor(AsyncThreadsCount) {}
 
 void Scheduler::scheduleUpdate(
     Context Ctx, PathRef File, ParseInputs Inputs,
     UniqueFunction<void(Context Ctx,
                         llvm::Optional<std::vector<DiagWithFixIts>>)>
         OnUpdated) {
-  CachedInputs[File] = Inputs;
-
-  auto Resources = Units.getOrCreateFile(File);
-  auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
+  auto It = Files.find(File);
+  if (It == Files.end()) {
+    QueueHandle Queue = Executor.createQueue();
+    It = Files
+             .insert(
+                 {File, std::unique_ptr<FileData>(new FileData{
+                            Inputs,
+                            CppFile::Create(File, Data.StorePreamblesInMemory,
+                                            Data.PCHs, Data.ASTCallback),
+                            std::move(Queue)})})
+             .first;
+  } else {
+    It->second->Inputs = Inputs;
+  }
 
-  Executor.addToFront(
-      [](Context Ctx, decltype(OnUpdated) OnUpdated,
-         decltype(DeferredRebuild) DeferredRebuild) {
-        auto Diags = DeferredRebuild(Ctx);
-        OnUpdated(std::move(Ctx), Diags);
-      },
-      std::move(Ctx), std::move(OnUpdated), std::move(DeferredRebuild));
-}
+  CancellationFlag CF;
+  CppFile *Resources = It->second->Resources.get();
+  auto Task = [Inputs, Resources, CF](Context Ctx,
+                                      decltype(OnUpdated) OnUpdated) mutable {
+    if (CF.isCancelled()) {
+      // Our request got cancelled, report it to the caller and don't do the
+      // rebuild.
+      OnUpdated(std::move(Ctx), llvm::None);
+      return;
+    }
+    auto Diags = Resources->rebuild(Ctx, std::move(Inputs));
+    OnUpdated(std::move(Ctx), Diags);
+  };
 
-void Scheduler::scheduleRemove(PathRef File,
-                               UniqueFunction<void(llvm::Error)> Action) {
-  CachedInputs.erase(File);
+  FileData *FD = It->second.get();
+  Executor.scheduleOnQueue(FD->Queue, Task, std::move(Ctx),
+                           std::move(OnUpdated));
 
-  auto Resources = Units.removeIfPresent(File);
-  if (!Resources) {
-    Action(llvm::make_error<llvm::StringError>(
-        "trying to remove non-added document", llvm::errc::invalid_argument));
-    return;
-  }
-
-  auto DeferredCancel = Resources->deferCancelRebuild();
-  Executor.addToFront(
-      [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) {
-        DeferredCancel();
-        Action(llvm::Error::success());
-      },
-      std::move(Action), std::move(DeferredCancel));
+  if (FD->LastRequestIsUpdate)
+    FD->LastUpdateCF.setCancelled();
+  FD->LastRequestIsUpdate = true;
+  FD->LastUpdateCF = std::move(CF);
 }
 
 void Scheduler::scheduleASTRead(
     PathRef File, UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
-  auto Resources = Units.getFile(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to get AST for non-added document",
         llvm::errc::invalid_argument));
     return;
   }
 
-  const ParseInputs &Inputs = getInputs(File);
-  // We currently block the calling thread until AST is available and run the
-  // action on the calling thread to avoid inconsistent states coming from
-  // subsequent updates.
-  // FIXME(ibiryukov): this should be moved to the worker threads.
-  Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
-    if (AST)
+  ParseInputs Inputs = It->second->Inputs;
+  CppFile *Resources = It->second->Resources.get();
+  auto Task = [Inputs, Resources](decltype(Action) Action) {
+    Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
+      if (!AST) {
+        Action(llvm::make_error<llvm::StringError>(
+            "invalid AST", llvm::errc::invalid_argument));
+      }
       Action(InputsAndAST{Inputs, *AST});
-    else
-      Action(llvm::make_error<llvm::StringError>(
-          "Could not build AST for the latest file update",
-          llvm::errc::invalid_argument));
-  });
+    });
+  };
+
+  Executor.scheduleOnQueue(It->second->Queue, std::move(Task),
+                           std::move(Action));
+  It->second->LastRequestIsUpdate = false;
 }
 
 void Scheduler::schedulePreambleRead(
     PathRef File,
     UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action) {
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to get preamble for non-added document",
         llvm::errc::invalid_argument));
     return;
   }
 
-  ParseInputs Inputs = getInputs(File);
+  ParseInputs InputsCopy = It->second->Inputs;
+  std::shared_ptr<CppFile> Resources = It->second->Resources;
   std::shared_ptr<const PreambleData> Preamble =
       Resources->getPossiblyStalePreamble();
-  Executor.addToFront(
-      [Resources, Preamble, Inputs](decltype(Action) Action) mutable {
-        if (!Preamble)
-          Preamble = Resources->getPossiblyStalePreamble();
+  auto Task = [InputsCopy, Resources,
+               Preamble](decltype(Action) Action) mutable {
+    if (!Preamble)
+      Preamble = Resources->getPossiblyStalePreamble();
+    Action(InputsAndPreamble{InputsCopy, Preamble.get()});
+  };
 
-        Action(InputsAndPreamble{Inputs, Preamble.get()});
-      },
-      std::move(Action));
+  Executor.schedule(std::move(Task), std::move(Action));
+}
+
+void Scheduler::scheduleRemove(PathRef File,
+                               UniqueFunction<void(llvm::Error)> Action) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
+    Action(llvm::make_error<llvm::StringError>(
+        "trying to remove non-added document", llvm::errc::invalid_argument));
+    return;
+  }
+
+  std::unique_ptr<FileData> Data = std::move(It->second);
+  Files.erase(It);
+
+  if (Data->LastRequestIsUpdate)
+    Data->LastUpdateCF.setCancelled();
+
+  auto CleanupTask = [](std::unique_ptr<FileData> Data) {
+    // Destroy data, nothing else to do.
+  };
+  QueueHandle Queue = std::move(Data->Queue);
+  Executor.scheduleOnQueue(Queue, CleanupTask, std::move(Data));
+  Executor.removeQueue(std::move(Queue));
 }
 
-const ParseInputs &Scheduler::getInputs(PathRef File) {
-  auto It = CachedInputs.find(File);
-  assert(It != CachedInputs.end());
-  return It->second;
+std::vector<std::pair<Path, std::size_t>>
+Scheduler::getUsedBytesPerFile() const {
+  std::vector<std::pair<Path, std::size_t>> Result;
+  Result.reserve(Files.size());
+  for (auto &&PathAndFile : Files)
+    Result.push_back(
+        {PathAndFile.first(), PathAndFile.second->Resources->getUsedBytes()});
+  return Result;
 }
 
+Scheduler::FileData::FileData(ParseInputs Inputs,
+                              std::shared_ptr<CppFile> Resources,
+                              QueueHandle Queue)
+    : Inputs(std::move(Inputs)), Resources(std::move(Resources)),
+      Queue(std::move(Queue)), LastRequestIsUpdate(false) {}
+
 ClangdServer::ClangdServer(GlobalCompilationDatabase &CDB,
                            DiagnosticsConsumer &DiagConsumer,
                            FileSystemProvider &FSProvider,
@@ -706,5 +786,5 @@
 
 std::vector<std::pair<Path, std::size_t>>
 ClangdServer::getUsedBytesPerFile() const {
-  return Units.getUsedBytesPerFile();
+  return WorkScheduler.getUsedBytesPerFile();
 }
Index: clangd/CMakeLists.txt
===================================================================
--- clangd/CMakeLists.txt
+++ clangd/CMakeLists.txt
@@ -6,7 +6,6 @@
   ClangdLSPServer.cpp
   ClangdServer.cpp
   ClangdUnit.cpp
-  ClangdUnitStore.cpp
   CodeComplete.cpp
   CodeCompletionStrings.cpp
   CompileArgsCache.cpp
@@ -30,6 +29,8 @@
   index/Merge.cpp
   index/SymbolCollector.cpp
   index/SymbolYAML.cpp
+  threading/Cancellation.cpp
+  threading/RequestQueue.cpp
 
   LINK_LIBS
   clangAST
_______________________________________________
cfe-commits mailing list
cfe-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits

Reply via email to