I've cleaned up the patch and got rid of a lot of the cruft that got added while I was debugging.
Also found one more race condition in my original solution that I believe is now fixed. And I believe it's now safe even if different urls map to the same cache file which the previous patch didn't get right.
diff --git a/CMakeLists.txt b/CMakeLists.txt index 024f6a0..011ab42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,7 @@ cmake_minimum_required(VERSION 3.1) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + # try to set the best C++ language level set(CMAKE_CXX_STANDARD 20) # let it take the lowest version, we need some precursor of C++14x diff --git a/src/job.cc b/src/job.cc index a2025cc..c53feb1 100644 --- a/src/job.cc +++ b/src/job.cc @@ -662,6 +662,19 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname else m_sFileLoc=theUrl.sHost+theUrl.sPath; + // Here we serialize multiple clients trying to download the + // same file. Only one thread at a time per URL is allowed to + // proceed further in this function. + + lockuniq g{inProgressLock}; + + if (inProgress.contains(m_sFileLoc)) { + // Check if another job is running. If so link to that. + m_pItem = m_pParentCon.GetItemRegistry()->Create(m_sFileLoc, ESharingHow::ALWAYS_TRY_SHARING, fileitem::tSpecialPurposeAttr{}); + USRDBG("Linked to other job"); + return; + } + fileitem::tSpecialPurposeAttr attr { ! cfg::offlinemode && data_type == FILE_VOLATILE, m_bIsHeadOnly, @@ -697,8 +710,13 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname if(cfg::trackfileuse && fistate >= fileitem::FIST_DLGOTHEAD && fistate < fileitem::FIST_DLERROR) m_pItem.get()->UpdateHeadTimestamp(); - if(fistate==fileitem::FIST_COMPLETE) + if(fistate==fileitem::FIST_COMPLETE) { + // Tell everybody downloading this url that we already + // have a job to download it and register a cleanup + // when this job completes. + setInProgress(m_sFileLoc); return; // perfect, done here + } if(cfg::offlinemode) { // make sure there will be no problems later in SendData or prepare a user message // error or needs download but freshness check was disabled, so it's really not complete. @@ -760,6 +778,10 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname return report_overload(__LINE__); } } + // Tell everybody downloading this url that we already have a + // job to download it and register a cleanup when this job + // completes. + setInProgress(m_sFileLoc); } catch (const std::bad_alloc&) // OOM, may this ever happen here? { @@ -1190,4 +1212,16 @@ void job::AppendMetaHeaders() << "\r\nServer: Debian Apt-Cacher NG/" ACVERSION "\r\n" "\r\n"; } + +job::inProgressCleanup::~inProgressCleanup() { + lockuniq g{inProgressLock}; + LOGSTARTFUNC; + USRDBG("url=" << url); + if (url.size()) { + inProgress.erase(url); + } +} + +std::set<std::string> job::inProgress; +base_with_mutex job::inProgressLock; } diff --git a/src/job.h b/src/job.h index cb162a6..b4df3de 100644 --- a/src/job.h +++ b/src/job.h @@ -16,6 +16,26 @@ class header; class job { +private: + // Lock controlling access to inProgress + static base_with_mutex inProgressLock; + + // If there is an item in here then there is already a job downloading url + static std::set<std::string> inProgress; + + // Simple class which is destroyed when the job is destroyed. It deletes the entry from inProgress. + struct inProgressCleanup { + std::string url; + inProgressCleanup() { } + ~inProgressCleanup(); + }; + + void setInProgress(const std::string& url_) { + m_ipc.url = url_; + inProgress.insert(url_); + } + + inProgressCleanup m_ipc; public: enum eJobResult : short diff --git a/CMakeLists.txt b/CMakeLists.txt index 024f6a0..011ab42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,7 @@ cmake_minimum_required(VERSION 3.1) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + # try to set the best C++ language level set(CMAKE_CXX_STANDARD 20) # let it take the lowest version, we need some precursor of C++14x diff --git a/src/job.cc b/src/job.cc index a2025cc..c53feb1 100644 --- a/src/job.cc +++ b/src/job.cc @@ -662,6 +662,19 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname else m_sFileLoc=theUrl.sHost+theUrl.sPath; + // Here we serialize multiple clients trying to download the + // same file. Only one thread at a time per URL is allowed to + // proceed further in this function. + + lockuniq g{inProgressLock}; + + if (inProgress.contains(m_sFileLoc)) { + // Check if another job is running. If so link to that. + m_pItem = m_pParentCon.GetItemRegistry()->Create(m_sFileLoc, ESharingHow::ALWAYS_TRY_SHARING, fileitem::tSpecialPurposeAttr{}); + USRDBG("Linked to other job"); + return; + } + fileitem::tSpecialPurposeAttr attr { ! cfg::offlinemode && data_type == FILE_VOLATILE, m_bIsHeadOnly, @@ -697,8 +710,13 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname if(cfg::trackfileuse && fistate >= fileitem::FIST_DLGOTHEAD && fistate < fileitem::FIST_DLERROR) m_pItem.get()->UpdateHeadTimestamp(); - if(fistate==fileitem::FIST_COMPLETE) + if(fistate==fileitem::FIST_COMPLETE) { + // Tell everybody downloading this url that we already + // have a job to download it and register a cleanup + // when this job completes. + setInProgress(m_sFileLoc); return; // perfect, done here + } if(cfg::offlinemode) { // make sure there will be no problems later in SendData or prepare a user message // error or needs download but freshness check was disabled, so it's really not complete. @@ -760,6 +778,10 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname return report_overload(__LINE__); } } + // Tell everybody downloading this url that we already have a + // job to download it and register a cleanup when this job + // completes. + setInProgress(m_sFileLoc); } catch (const std::bad_alloc&) // OOM, may this ever happen here? { @@ -1190,4 +1212,16 @@ void job::AppendMetaHeaders() << "\r\nServer: Debian Apt-Cacher NG/" ACVERSION "\r\n" "\r\n"; } + +job::inProgressCleanup::~inProgressCleanup() { + lockuniq g{inProgressLock}; + LOGSTARTFUNC; + USRDBG("url=" << url); + if (url.size()) { + inProgress.erase(url); + } +} + +std::set<std::string> job::inProgress; +base_with_mutex job::inProgressLock; } diff --git a/src/job.h b/src/job.h index cb162a6..c79459b 100644 --- a/src/job.h +++ b/src/job.h @@ -16,6 +16,24 @@ class header; class job { +private: + // Lock controlling access to inProgress + static base_with_mutex inProgressLock; + + // If there is an item in here then there is already a job downloading url + static std::set<std::string> inProgress; + + // Simple class which is destroyed when the job is destroyed. It deletes the entry from inProgress. + struct inProgressCleanup { + std::string url; + inProgressCleanup() { } + ~inProgressCleanup(); + }; + + void setInProgress(const std::string& url_) { + m_ipc.url = url_; + inProgress.insert(url_); + } public: enum eJobResult : short @@ -48,6 +66,7 @@ public: } eActivity; TFileItemHolder m_pItem; + inProgressCleanup m_ipc; // This MUST be destroyed before m_pItem unique_fd m_filefd; bool m_bIsHttp11 = true;