This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch v2.5
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/v2.5 by this push:
     new 1a2a7c659 feat(slog): flush and remove all shared logs for garbage 
collection (#1594) (#1638)
1a2a7c659 is described below

commit 1a2a7c659e108497dbc15053d8e6a8032605dfd3
Author: Dan Wang <[email protected]>
AuthorDate: Thu Oct 19 19:42:23 2023 +0800

    feat(slog): flush and remove all shared logs for garbage collection (#1594) 
(#1638)
    
    https://github.com/apache/incubator-pegasus/issues/1593
    
    In https://github.com/XiaoMi/rdsn/pull/1019 we've written private logs as
    WAL instead of shared logs, which also means shared log files would never
    be appended with new mutations.
    
    However, obsolete shared logs that had been applied to rocksdb were not
    removed since then. There is at least 1 shared log file which is never 
removed.
    We should change this policy of garbage collection, to delete all obsolete
    shared log files.
    
    To facilitate the garbage collection of shared log files, we also trigger 
checkpoints
    which would flush rocksdb data to disk for each replica that has prevented 
shared
    log files from being removed for garbage collection. It is necessary to 
limit the
    number of submitted replicas that would be flushed at a time to avoid too 
much
    consumption of I/O resources which might affect the processing of read/write
    requests from clients.
    
    This PR is to cherry-pick #1594 into v2.5 to solve issue #1593.
---
 src/replica/duplication/load_from_private_log.cpp  |  14 +-
 src/replica/duplication/load_from_private_log.h    |   3 +-
 .../duplication/test/duplication_test_base.h       |   4 +-
 src/replica/log_file.cpp                           |  18 +-
 src/replica/log_file.h                             |  19 +-
 src/replica/mutation_log.cpp                       | 501 +++++++++------------
 src/replica/mutation_log.h                         | 153 +++++--
 src/replica/mutation_log_replay.cpp                |   4 +-
 src/replica/mutation_log_utils.cpp                 |   2 +-
 src/replica/mutation_log_utils.h                   |   9 +-
 src/replica/replica_stub.cpp                       | 321 +++++++------
 src/replica/replica_stub.h                         |  39 +-
 src/replica/replication_app_base.cpp               |   1 -
 src/replica/test/mock_utils.h                      |   2 +-
 src/replica/test/mutation_log_test.cpp             | 266 ++++++++++-
 src/server/config.ini                              |   2 +-
 src/server/pegasus_server_impl.cpp                 |   3 +-
 src/server/test/config.ini                         |   2 +-
 src/utils/autoref_ptr.h                            |   2 +
 src/utils/fmt_logging.h                            |   3 +-
 20 files changed, 872 insertions(+), 496 deletions(-)

diff --git a/src/replica/duplication/load_from_private_log.cpp 
b/src/replica/duplication/load_from_private_log.cpp
index aa458b22a..d3d88a075 100644
--- a/src/replica/duplication/load_from_private_log.cpp
+++ b/src/replica/duplication/load_from_private_log.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <iterator>
+#include <map>
 #include <utility>
 
 #include "common/duplication_common.h"
@@ -57,11 +58,11 @@ bool load_from_private_log::will_fail_fast() const
 // we try to list all files and select a new one to start 
(find_log_file_to_start).
 bool load_from_private_log::switch_to_next_log_file()
 {
-    auto file_map = _private_log->get_log_file_map();
-    auto next_file_it = file_map.find(_current->index() + 1);
+    const auto &file_map = _private_log->get_log_file_map();
+    const auto &next_file_it = file_map.find(_current->index() + 1);
     if (next_file_it != file_map.end()) {
         log_file_ptr file;
-        error_s es = log_utils::open_read(next_file_it->second->path(), file);
+        const auto &es = log_utils::open_read(next_file_it->second->path(), 
file);
         if (!es.is_ok()) {
             LOG_ERROR_PREFIX("{}", es);
             _current = nullptr;
@@ -123,11 +124,11 @@ void load_from_private_log::run()
 void load_from_private_log::find_log_file_to_start()
 {
     // `file_map` has already excluded the useless log files during replica 
init.
-    auto file_map = _private_log->get_log_file_map();
+    const auto &file_map = _private_log->get_log_file_map();
 
     // Reopen the files. Because the internal file handle of `file_map`
     // is cleared once WAL replay finished. They are unable to read.
-    std::map<int, log_file_ptr> new_file_map;
+    mutation_log::log_file_map_by_index new_file_map;
     for (const auto &pr : file_map) {
         log_file_ptr file;
         error_s es = log_utils::open_read(pr.second->path(), file);
@@ -141,7 +142,8 @@ void load_from_private_log::find_log_file_to_start()
     find_log_file_to_start(std::move(new_file_map));
 }
 
-void load_from_private_log::find_log_file_to_start(std::map<int, log_file_ptr> 
log_file_map)
+void load_from_private_log::find_log_file_to_start(
+    const mutation_log::log_file_map_by_index &log_file_map)
 {
     _current = nullptr;
     if (dsn_unlikely(log_file_map.empty())) {
diff --git a/src/replica/duplication/load_from_private_log.h 
b/src/replica/duplication/load_from_private_log.h
index 523002b54..56651aabf 100644
--- a/src/replica/duplication/load_from_private_log.h
+++ b/src/replica/duplication/load_from_private_log.h
@@ -21,7 +21,6 @@
 #include <stddef.h>
 #include <stdint.h>
 #include <chrono>
-#include <map>
 
 #include "common/replication_other_types.h"
 #include "mutation_batch.h"
@@ -61,7 +60,7 @@ public:
 
     /// Find the log file that contains `_start_decree`.
     void find_log_file_to_start();
-    void find_log_file_to_start(std::map<int, log_file_ptr> log_files);
+    void find_log_file_to_start(const mutation_log::log_file_map_by_index 
&log_files);
 
     void replay_log_block();
 
diff --git a/src/replica/duplication/test/duplication_test_base.h 
b/src/replica/duplication/test/duplication_test_base.h
index fb0c7ea4f..4152a9a37 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -68,9 +68,9 @@ public:
         return duplicator;
     }
 
-    std::map<int, log_file_ptr> open_log_file_map(const std::string &log_dir)
+    mutation_log::log_file_map_by_index open_log_file_map(const std::string 
&log_dir)
     {
-        std::map<int, log_file_ptr> log_file_map;
+        mutation_log::log_file_map_by_index log_file_map;
         error_s err = log_utils::open_log_file_map(log_dir, log_file_map);
         EXPECT_EQ(err, error_s::ok());
         return log_file_map;
diff --git a/src/replica/log_file.cpp b/src/replica/log_file.cpp
index 7362aa7c4..e239d4dae 100644
--- a/src/replica/log_file.cpp
+++ b/src/replica/log_file.cpp
@@ -166,15 +166,15 @@ log_file::~log_file() { close(); }
 
 log_file::log_file(
     const char *path, disk_file *handle, int index, int64_t start_offset, bool 
is_read)
-    : _is_read(is_read)
+    : _crc32(0),
+      _start_offset(start_offset),
+      _end_offset(start_offset),
+      _handle(handle),
+      _is_read(is_read),
+      _path(path),
+      _index(index),
+      _last_write_time(0)
 {
-    _start_offset = start_offset;
-    _end_offset = start_offset;
-    _handle = handle;
-    _path = path;
-    _index = index;
-    _crc32 = 0;
-    _last_write_time = 0;
     memset(&_header, 0, sizeof(_header));
 
     if (is_read) {
@@ -357,7 +357,7 @@ void log_file::reset_stream(size_t offset /*default = 0*/)
     }
 }
 
-decree log_file::previous_log_max_decree(const dsn::gpid &pid)
+decree log_file::previous_log_max_decree(const dsn::gpid &pid) const
 {
     auto it = _previous_log_max_decrees.find(pid);
     return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree;
diff --git a/src/replica/log_file.h b/src/replica/log_file.h
index fb1fe3b6d..fcaff2187 100644
--- a/src/replica/log_file.h
+++ b/src/replica/log_file.h
@@ -65,9 +65,9 @@ struct log_file_header
 // a structure to record replica's log info
 struct replica_log_info
 {
-    int64_t max_decree;
+    decree max_decree;
     int64_t valid_start_offset; // valid start offset in global space
-    replica_log_info(int64_t d, int64_t o)
+    replica_log_info(decree d, int64_t o)
     {
         max_decree = d;
         valid_start_offset = o;
@@ -184,11 +184,14 @@ public:
     // file path
     const std::string &path() const { return _path; }
     // previous decrees
-    const replica_log_info_map &previous_log_max_decrees() { return 
_previous_log_max_decrees; }
+    const replica_log_info_map &previous_log_max_decrees() const
+    {
+        return _previous_log_max_decrees;
+    }
     // previous decree for speicified gpid
-    decree previous_log_max_decree(const gpid &pid);
+    decree previous_log_max_decree(const gpid &pid) const;
     // file header
-    log_file_header &header() { return _header; }
+    const log_file_header &header() const { return _header; }
 
     // read file header from reader, return byte count consumed
     int read_file_header(binary_reader &reader);
@@ -213,7 +216,7 @@ private:
     friend class mock_log_file;
 
     uint32_t _crc32;
-    int64_t _start_offset; // start offset in the global space
+    const int64_t _start_offset; // start offset in the global space
     std::atomic<int64_t>
         _end_offset; // end offset in the global space: end_offset = 
start_offset + file_size
     class file_streamer;
@@ -221,8 +224,8 @@ private:
     std::unique_ptr<file_streamer> _stream;
     disk_file *_handle;        // file handle
     const bool _is_read;       // if opened for read or write
-    std::string _path;         // file path
-    int _index;                // file index
+    const std::string _path;   // file path
+    const int _index;          // file index
     log_file_header _header;   // file header
     uint64_t _last_write_time; // seconds from epoch time
 
diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp
index c618c7cd4..ad745670b 100644
--- a/src/replica/mutation_log.cpp
+++ b/src/replica/mutation_log.cpp
@@ -527,8 +527,8 @@ error_code mutation_log::open(replay_callback read_callback,
                               io_failure_callback write_error_callback,
                               const std::map<gpid, decree> &replay_condition)
 {
-    CHECK(!_is_opened, "cannot open a opened mutation_log");
-    CHECK(nullptr == _current_log_file, "");
+    CHECK(!_is_opened, "cannot open an opened mutation_log");
+    CHECK_NULL(_current_log_file, "");
 
     // create dir if necessary
     if (!dsn::utils::filesystem::path_exists(_dir)) {
@@ -562,9 +562,8 @@ error_code mutation_log::open(replay_callback read_callback,
                 err == ERR_INVALID_PARAMETERS) {
                 LOG_WARNING("skip file {} during log init, err = {}", fpath, 
err);
                 continue;
-            } else {
-                return err;
             }
+            return err;
         }
 
         if (_is_private) {
@@ -592,8 +591,8 @@ error_code mutation_log::open(replay_callback read_callback,
     file_list.clear();
 
     // filter useless log
-    std::map<int, log_file_ptr>::iterator replay_begin = _log_files.begin();
-    std::map<int, log_file_ptr>::iterator replay_end = _log_files.end();
+    log_file_map_by_index::iterator replay_begin = _log_files.begin();
+    log_file_map_by_index::iterator replay_end = _log_files.end();
     if (!replay_condition.empty()) {
         if (_is_private) {
             auto find = replay_condition.find(_private_gpid);
@@ -609,7 +608,7 @@ error_code mutation_log::open(replay_callback read_callback,
         } else {
             // find the largest file which can be ignored.
             // after iterate, the 'mark_it' will point to the largest file 
which can be ignored.
-            std::map<int, log_file_ptr>::reverse_iterator mark_it;
+            log_file_map_by_index::reverse_iterator mark_it;
             std::set<gpid> kickout_replicas;
             replica_log_info_map max_decrees; // max_decrees for log file at 
mark_it.
             for (mark_it = _log_files.rbegin(); mark_it != _log_files.rend(); 
++mark_it) {
@@ -666,7 +665,7 @@ error_code mutation_log::open(replay_callback read_callback,
     }
 
     // replay with the found files
-    std::map<int, log_file_ptr> replay_logs(replay_begin, replay_end);
+    log_file_map_by_index replay_logs(replay_begin, replay_end);
     int64_t end_offset = 0;
     err = replay(
         replay_logs,
@@ -863,11 +862,8 @@ decree mutation_log::max_decree(gpid gpid) const
         CHECK_EQ(gpid, _private_gpid);
         return _private_log_info.max_decree;
     } else {
-        auto it = _shared_log_info_map.find(gpid);
-        if (it != _shared_log_info_map.end())
-            return it->second.max_decree;
-        else
-            return 0;
+        const auto &it = _shared_log_info_map.find(gpid);
+        return it != _shared_log_info_map.end() ? it->second.max_decree : 0;
     }
 }
 
@@ -923,7 +919,7 @@ int64_t mutation_log::total_size() const
 
 int64_t mutation_log::total_size_no_lock() const
 {
-    return _log_files.size() > 0 ? _global_end_offset - _global_start_offset : 
0;
+    return _log_files.empty() ? 0 : _global_end_offset - _global_start_offset;
 }
 
 error_code mutation_log::reset_from(const std::string &dir,
@@ -1095,7 +1091,7 @@ bool mutation_log::get_learn_state(gpid gpid, decree 
start, /*out*/ learn_state
         state.meta = temp_writer.get_buffer();
     }
 
-    std::map<int, log_file_ptr> files;
+    log_file_map_by_index files;
     {
         zauto_lock l(_lock);
 
@@ -1202,13 +1198,13 @@ void mutation_log::get_parent_mutations_and_logs(gpid 
pid,
         // no memory data and no disk data
         return;
     }
-    std::map<int, log_file_ptr> file_map = get_log_file_map();
+    const auto &file_map = get_log_file_map();
 
     bool skip_next = false;
     std::list<std::string> learn_files;
     decree last_max_decree = 0;
     for (auto itr = file_map.rbegin(); itr != file_map.rend(); ++itr) {
-        log_file_ptr &log = itr->second;
+        const log_file_ptr &log = itr->second;
         if (log->end_offset() <= _private_log_info.valid_start_offset)
             break;
 
@@ -1287,7 +1283,7 @@ int mutation_log::garbage_collection(gpid gpid,
 {
     CHECK(_is_private, "this method is only valid for private log");
 
-    std::map<int, log_file_ptr> files;
+    log_file_map_by_index files;
     decree max_decree = invalid_decree;
     int current_file_index = -1;
 
@@ -1295,23 +1291,24 @@ int mutation_log::garbage_collection(gpid gpid,
         zauto_lock l(_lock);
         files = _log_files;
         max_decree = _private_log_info.max_decree;
-        if (_current_log_file != nullptr)
+        if (_current_log_file != nullptr) {
             current_file_index = _current_log_file->index();
+        }
     }
 
     if (files.size() <= 1) {
         // nothing to do
         return 0;
-    } else {
-        // the last one should be the current log file
-        CHECK(current_file_index == -1 || files.rbegin()->first == 
current_file_index,
-              "invalid current_file_index, index = {}",
-              current_file_index);
     }
 
+    // the last one should be the current log file
+    CHECK(current_file_index == -1 || files.rbegin()->first == 
current_file_index,
+          "invalid current_file_index, index = {}",
+          current_file_index);
+
     // find the largest file which can be deleted.
     // after iterate, the 'mark_it' will point to the largest file which can 
be deleted.
-    std::map<int, log_file_ptr>::reverse_iterator mark_it;
+    log_file_map_by_index::reverse_iterator mark_it;
     int64_t already_reserved_size = 0;
     for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
         log_file_ptr log = mark_it->second;
@@ -1403,315 +1400,253 @@ int mutation_log::garbage_collection(gpid gpid,
     return deleted;
 }
 
-int mutation_log::garbage_collection(const replica_log_info_map &gc_condition,
-                                     int file_count_limit,
-                                     std::set<gpid> &prevent_gc_replicas)
+struct gc_summary_info
 {
-    CHECK(!_is_private, "this method is only valid for shared log");
-
-    std::map<int, log_file_ptr> files;
-    replica_log_info_map max_decrees;
-    int current_log_index = -1;
-    int64_t total_log_size = 0;
+    dsn::gpid pid;
+    int min_file_index = 0;
+    dsn::replication::decree max_decree_gap = 0;
+    dsn::replication::decree garbage_max_decree = 0;
+    dsn::replication::decree slog_max_decree = 0;
 
+    std::string to_string() const
     {
-        zauto_lock l(_lock);
-        files = _log_files;
-        max_decrees = _shared_log_info_map;
-        if (_current_log_file != nullptr)
-            current_log_index = _current_log_file->index();
-        total_log_size = total_size_no_lock();
+        return fmt::format("gc_summary_info = [pid = {}, min_file_index = {}, 
max_decree_gap = {}, "
+                           "garbage_max_decree = {}, slog_max_decree = {}]",
+                           pid,
+                           min_file_index,
+                           max_decree_gap,
+                           garbage_max_decree,
+                           slog_max_decree);
     }
 
-    if (files.size() <= 1) {
-        // nothing to do
-        LOG_INFO("gc_shared: too few files to delete, file_count_limit = {}, 
reserved_log_count "
-                 "= {}, reserved_log_size = {}, current_log_index = {}",
-                 file_count_limit,
-                 files.size(),
-                 total_log_size,
-                 current_log_index);
-        return (int)files.size();
-    } else {
-        // the last one should be the current log file
-        CHECK(-1 == current_log_index || files.rbegin()->first == 
current_log_index,
-              "invalid current_log_index, index = {}",
-              current_log_index);
+    friend std::ostream &operator<<(std::ostream &os, const gc_summary_info 
&gc_summary)
+    {
+        return os << gc_summary.to_string();
     }
+};
 
-    int reserved_log_count = files.size();
-    int64_t reserved_log_size = total_log_size;
-    int reserved_smallest_log = files.begin()->first;
-    int reserved_largest_log = current_log_index;
-
-    // find the largest file which can be deleted.
-    // after iterate, the 'mark_it' will point to the largest file which can 
be deleted.
-    std::map<int, log_file_ptr>::reverse_iterator mark_it;
-    std::set<gpid> kickout_replicas;
-    gpid stop_gc_replica;
-    int stop_gc_log_index = 0;
-    decree stop_gc_decree_gap = 0;
-    decree stop_gc_garbage_max_decree = 0;
-    decree stop_gc_log_max_decree = 0;
-    int file_count = 0;
-    for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
-        log_file_ptr log = mark_it->second;
-        CHECK_EQ(mark_it->first, log->index());
-        file_count++;
-
-        bool delete_ok = true;
-
-        // skip current file
-        if (current_log_index == log->index()) {
-            delete_ok = false;
-        }
+namespace {
 
-        if (delete_ok) {
-            std::set<gpid> prevent_gc_replicas_for_this_log;
+bool can_gc_replica_slog(const dsn::replication::replica_log_info_map 
&slog_max_decrees,
+                         const dsn::replication::log_file_ptr &file,
+                         const dsn::gpid &pid,
+                         const dsn::replication::replica_log_info 
&replica_durable_info,
+                         dsn::replication::gc_summary_info &gc_summary)
+{
+    const auto &garbage_max_decree = replica_durable_info.max_decree;
+    const auto &valid_start_offset = replica_durable_info.valid_start_offset;
+
+    const auto &it = slog_max_decrees.find(pid);
+    if (it == slog_max_decrees.end()) {
+        // There's no log found in this file for this replica, thus all 
decrees of
+        // this replica in this file could deleted.
+        //
+        // `valid_start_offset` might be reset to 0 if initialize_on_load() 
returned
+        // `ERR_INCOMPLETE_DATA`, thus it's possible that `valid_start_offset 
== 0`.
+        CHECK(valid_start_offset == 0 || file->end_offset() <= 
valid_start_offset,
+              "valid start offset must be 0 or greater than the end of this 
log file");
+        LOG_DEBUG("gc @ {}: max_decree for {} is missing vs {} as garbage max 
decree, it's "
+                  "safe to delete this and all older logs for this replica",
+                  pid,
+                  file->path(),
+                  garbage_max_decree);
+        return true;
+    } else if (file->end_offset() <= valid_start_offset) {
+        // This file has been invalid for this replica, since 
`valid_start_offset` was reset
+        // to a file with larger index than this file. Thus all decrees of 
this replica in
+        // this file could be deleted.
+        LOG_DEBUG("gc @ {}: log is invalid for {}, as valid start offset vs 
log end offset = "
+                  "{} vs {}, it is therefore safe to delete this and all older 
logs for this "
+                  "replica",
+                  pid,
+                  file->path(),
+                  valid_start_offset,
+                  file->end_offset());
+        return true;
+    } else if (it->second.max_decree <= garbage_max_decree) {
+        // All decrees are no more than the garbage max decree. Since all 
decrees less than
+        // garbage max decree would be deleted, all decrees of this replica in 
this file
+        // could be deleted.
+        LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max 
decree, it is "
+                  "therefore safe to delete this and all older logs for this 
replica",
+                  pid,
+                  file->path(),
+                  it->second.max_decree,
+                  garbage_max_decree);
+        return true;
+    }
 
-            for (auto &kv : gc_condition) {
-                if (kickout_replicas.find(kv.first) != kickout_replicas.end()) 
{
-                    // no need to consider this replica
-                    continue;
-                }
+    // it->second.max_decree > garbage_max_decree
+    //
+    // Some decrees are more than garbage max decree, thus this file should 
not be deleted
+    // for now.
+    LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, 
it "
+              "is therefore not allowed to delete this and all older logs",
+              pid,
+              file->path(),
+              it->second.max_decree,
+              garbage_max_decree);
+
+    auto gap = it->second.max_decree - garbage_max_decree;
+    if (file->index() < gc_summary.min_file_index || gap > 
gc_summary.max_decree_gap) {
+        // Find the oldest file of this round of iteration for gc of slog 
files, with the
+        // max decree gap between the garbage max decree and the oldest slog 
file.
+        gc_summary.pid = pid;
+        gc_summary.min_file_index = file->index();
+        gc_summary.max_decree_gap = gap;
+        gc_summary.garbage_max_decree = garbage_max_decree;
+        gc_summary.slog_max_decree = it->second.max_decree;
+    }
 
-                gpid gpid = kv.first;
-                decree garbage_max_decree = kv.second.max_decree;
-                int64_t valid_start_offset = kv.second.valid_start_offset;
+    return false;
+}
 
-                bool delete_ok_for_this_replica = false;
-                bool kickout_this_replica = false;
-                auto it3 = max_decrees.find(gpid);
+} // anonymous namespace
 
-                // log not found for this replica, ok to delete
-                if (it3 == max_decrees.end()) {
-                    // valid_start_offset may be reset to 0 if 
initialize_on_load() returns
-                    // ERR_INCOMPLETE_DATA
-                    CHECK(valid_start_offset == 0 || valid_start_offset >= 
log->end_offset(),
-                          "valid start offset must be 0 or greater than the 
end of this log file");
+void mutation_log::garbage_collection(const replica_log_info_map 
&replica_durable_decrees,
+                                      std::set<gpid> &prevent_gc_replicas)
+{
+    CHECK(!_is_private, "this method is only valid for shared log");
 
-                    LOG_DEBUG(
-                        "gc @ {}: max_decree for {} is missing vs {} as 
garbage max decree, it's "
-                        "safe to delete this and all older logs for this 
replica",
-                        gpid,
-                        log->path(),
-                        garbage_max_decree);
-                    delete_ok_for_this_replica = true;
-                    kickout_this_replica = true;
-                }
+    // Fetch the snapshot of the latest states of the slog, such as the max 
decree it maintains
+    // for each partition.
+    log_file_map_by_index files;
+    replica_log_info_map slog_max_decrees;
+    int64_t total_log_size = 0;
+    {
+        zauto_lock l(_lock);
+        total_log_size = total_size_no_lock();
+        if (_log_files.empty()) {
+            CHECK_EQ(total_log_size, 0);
+            LOG_INFO("gc_shared: slog file not found");
+            return;
+        }
 
-                // log is invalid for this replica, ok to delete
-                else if (log->end_offset() <= valid_start_offset) {
-                    LOG_DEBUG(
-                        "gc @ {}: log is invalid for {}, as valid start offset 
vs log end offset = "
-                        "{} vs {}, it is therefore safe to delete this and all 
older logs for this "
-                        "replica",
-                        gpid,
-                        log->path(),
-                        valid_start_offset,
-                        log->end_offset());
-                    delete_ok_for_this_replica = true;
-                    kickout_this_replica = true;
-                }
+        CHECK_NULL(_current_log_file,
+                   "shared logs have been deprecated, thus could not be 
created");
+        files = _log_files;
+        slog_max_decrees = _shared_log_info_map;
+    }
 
-                // all decrees are no more than garbage max decree, ok to 
delete
-                else if (it3->second.max_decree <= garbage_max_decree) {
-                    LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as 
garbage max decree, it is "
-                              "therefore safe to delete this and all older 
logs for this replica",
-                              gpid,
-                              log->path(),
-                              it3->second.max_decree,
-                              garbage_max_decree);
-                    delete_ok_for_this_replica = true;
-                    kickout_this_replica = true;
-                }
+    reserved_slog_info reserved_slog = {
+        files.size(), total_log_size, files.begin()->first, 
files.rbegin()->first};
 
-                else // it3->second.max_decree > garbage_max_decree
-                {
-                    // should not delete this file
-                    LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as 
garbage max decree, it "
-                              "is therefore not allowed to delete this and all 
older logs",
-                              gpid,
-                              log->path(),
-                              it3->second.max_decree,
-                              garbage_max_decree);
-                    prevent_gc_replicas_for_this_log.insert(gpid);
-                    decree gap = it3->second.max_decree - garbage_max_decree;
-                    if (log->index() < stop_gc_log_index || gap > 
stop_gc_decree_gap) {
-                        // record the max gap replica for the smallest log
-                        stop_gc_replica = gpid;
-                        stop_gc_log_index = log->index();
-                        stop_gc_decree_gap = gap;
-                        stop_gc_garbage_max_decree = garbage_max_decree;
-                        stop_gc_log_max_decree = it3->second.max_decree;
-                    }
-                }
+    // Iterate over the slog files from the newest to the oldest in descending 
order(i.e.
+    // file index in descending order), to find the newest file that could be 
deleted(after
+    // iterating, `mark_it` would point to the newest file that could be 
deleted).
+    log_file_map_by_index::reverse_iterator mark_it;
+    std::set<gpid> kickout_replicas;
+    gc_summary_info gc_summary;
+    for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
+        const auto &file = mark_it->second;
+        CHECK_EQ(mark_it->first, file->index());
 
-                if (kickout_this_replica) {
-                    // files before this file is useless for this replica,
-                    // so from now on, this replica will not be considered 
anymore
-                    kickout_replicas.insert(gpid);
-                }
+        bool can_gc_all_replicas_slog = true;
 
-                if (!delete_ok_for_this_replica) {
-                    // can not delete this file, mark it, and continue to 
check other replicas
-                    delete_ok = false;
-                }
+        for (const auto &replica_durable_info : replica_durable_decrees) {
+            if (kickout_replicas.find(replica_durable_info.first) != 
kickout_replicas.end()) {
+                // There's no need to consider this replica.
+                continue;
             }
 
-            // update prevent_gc_replicas
-            if (file_count > file_count_limit && 
!prevent_gc_replicas_for_this_log.empty()) {
-                
prevent_gc_replicas.insert(prevent_gc_replicas_for_this_log.begin(),
-                                           
prevent_gc_replicas_for_this_log.end());
+            if (can_gc_replica_slog(slog_max_decrees,
+                                    file,
+                                    replica_durable_info.first,
+                                    replica_durable_info.second,
+                                    gc_summary)) {
+                // Log files before this file is useless for this replica,
+                // so from now on, this replica would not be considered any 
more.
+                kickout_replicas.insert(replica_durable_info.first);
+                continue;
             }
+
+            // For now, this file could not be deleted.
+            can_gc_all_replicas_slog = false;
+            prevent_gc_replicas.insert(replica_durable_info.first);
         }
 
-        if (delete_ok) {
-            // found the largest file which can be deleted
+        if (can_gc_all_replicas_slog) {
+            // The newest file that could be deleted has been found.
             break;
         }
 
-        // update max_decrees for the next log file
-        max_decrees = log->previous_log_max_decrees();
+        // Fetch max decrees of the next slog file.
+        slog_max_decrees = file->previous_log_max_decrees();
     }
 
     if (mark_it == files.rend()) {
-        // no file to delete
-        if (stop_gc_decree_gap > 0) {
-            LOG_INFO("gc_shared: no file can be deleted, file_count_limit = 
{}, "
-                     "reserved_log_count = {}, reserved_log_size = {}, "
-                     "reserved_smallest_log = {}, reserved_largest_log = {}, "
-                     "stop_gc_log_index = {}, stop_gc_replica_count = {}, "
-                     "stop_gc_replica = {}, stop_gc_decree_gap = {}, "
-                     "stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree 
= {}",
-                     file_count_limit,
-                     reserved_log_count,
-                     reserved_log_size,
-                     reserved_smallest_log,
-                     reserved_largest_log,
-                     stop_gc_log_index,
-                     prevent_gc_replicas.size(),
-                     stop_gc_replica,
-                     stop_gc_decree_gap,
-                     stop_gc_garbage_max_decree,
-                     stop_gc_log_max_decree);
-        } else {
-            LOG_INFO("gc_shared: no file can be deleted, file_count_limit = 
{}, "
-                     "reserved_log_count = {}, reserved_log_size = {}, "
-                     "reserved_smallest_log = {}, reserved_largest_log = {}",
-                     file_count_limit,
-                     reserved_log_count,
-                     reserved_log_size,
-                     reserved_smallest_log,
-                     reserved_largest_log);
-        }
-
-        return reserved_log_count;
+        // There's no file that could be deleted.
+        LOG_INFO("gc_shared: no file can be deleted: {}, {}, 
prevent_gc_replicas = {}",
+                 reserved_slog,
+                 gc_summary,
+                 prevent_gc_replicas.size());
+        return;
     }
 
-    // ok, let's delete files in increasing order of file index
-    // to avoid making a hole in the file list
-    int largest_log_to_delete = mark_it->second->index();
-    int to_delete_log_count = 0;
-    int64_t to_delete_log_size = 0;
-    int deleted_log_count = 0;
-    int64_t deleted_log_size = 0;
-    int deleted_smallest_log = 0;
-    int deleted_largest_log = 0;
-    for (auto it = files.begin(); it != files.end() && it->second->index() <= 
largest_log_to_delete;
+    slog_deletion_info slog_deletion;
+
+    // Delete files in ascending order of file index. Otherwise, deleting 
files in descending
+    // order would lead to a hole in the file list once a file failed to be 
deleted.
+    remove_obsolete_slog_files(mark_it->second->index(), files, reserved_slog, 
slog_deletion);
+    LOG_INFO("gc_shared: deleted some files: {}, {}, {}, prevent_gc_replicas = 
{}",
+             reserved_slog,
+             slog_deletion,
+             gc_summary,
+             prevent_gc_replicas.size());
+}
+
+void mutation_log::remove_obsolete_slog_files(const int 
max_file_index_to_delete,
+                                              log_file_map_by_index &files,
+                                              reserved_slog_info 
&reserved_slog,
+                                              slog_deletion_info 
&slog_deletion)
+{
+    for (auto it = files.begin();
+         it != files.end() && it->second->index() <= max_file_index_to_delete;
          ++it) {
-        log_file_ptr log = it->second;
-        CHECK_EQ(it->first, log->index());
-        to_delete_log_count++;
-        to_delete_log_size += log->end_offset() - log->start_offset();
+        auto &file = it->second;
+        CHECK_EQ(it->first, file->index());
+        slog_deletion.to_delete_file_count++;
+        slog_deletion.to_delete_log_size += file->end_offset() - 
file->start_offset();
 
-        // close first
-        log->close();
+        // Firstly close the log file.
+        file->close();
 
-        // delete file
-        auto &fpath = log->path();
+        // Delete the log file.
+        const auto &fpath = file->path();
         if (!dsn::utils::filesystem::remove_path(fpath)) {
             LOG_ERROR("gc_shared: fail to remove {}, stop current gc cycle 
...", fpath);
             break;
         }
 
-        // delete succeed
+        // The log file is deleted successfully.
         LOG_INFO("gc_shared: log file {} is removed", fpath);
-        deleted_log_count++;
-        deleted_log_size += log->end_offset() - log->start_offset();
-        if (deleted_smallest_log == 0)
-            deleted_smallest_log = log->index();
-        deleted_largest_log = log->index();
+        slog_deletion.deleted_file_count++;
+        slog_deletion.deleted_log_size += file->end_offset() - 
file->start_offset();
+        if (slog_deletion.deleted_min_file_index == 0) {
+            slog_deletion.deleted_min_file_index = file->index();
+        }
+        slog_deletion.deleted_max_file_index = file->index();
 
-        // erase from _log_files
+        // Remove the log file from _log_files.
         {
             zauto_lock l(_lock);
             _log_files.erase(it->first);
             _global_start_offset =
                 _log_files.size() > 0 ? 
_log_files.begin()->second->start_offset() : 0;
-            reserved_log_count = _log_files.size();
-            reserved_log_size = total_size_no_lock();
-            if (reserved_log_count > 0) {
-                reserved_smallest_log = _log_files.begin()->first;
-                reserved_largest_log = _log_files.rbegin()->first;
+            reserved_slog.file_count = _log_files.size();
+            reserved_slog.log_size = total_size_no_lock();
+            if (reserved_slog.file_count > 0) {
+                reserved_slog.min_file_index = _log_files.begin()->first;
+                reserved_slog.max_file_index = _log_files.rbegin()->first;
             } else {
-                reserved_smallest_log = -1;
-                reserved_largest_log = -1;
+                reserved_slog.min_file_index = -1;
+                reserved_slog.max_file_index = -1;
             }
         }
     }
-
-    if (stop_gc_decree_gap > 0) {
-        LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, "
-                 "reserved_log_count = {}, reserved_log_size = {}, "
-                 "reserved_smallest_log = {}, reserved_largest_log = {}, "
-                 "to_delete_log_count = {}, to_delete_log_size = {}, "
-                 "deleted_log_count = {}, deleted_log_size = {}, "
-                 "deleted_smallest_log = {}, deleted_largest_log = {}, "
-                 "stop_gc_log_index = {}, stop_gc_replica_count = {}, "
-                 "stop_gc_replica = {}, stop_gc_decree_gap = {}, "
-                 "stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = 
{}",
-                 file_count_limit,
-                 reserved_log_count,
-                 reserved_log_size,
-                 reserved_smallest_log,
-                 reserved_largest_log,
-                 to_delete_log_count,
-                 to_delete_log_size,
-                 deleted_log_count,
-                 deleted_log_size,
-                 deleted_smallest_log,
-                 deleted_largest_log,
-                 stop_gc_log_index,
-                 prevent_gc_replicas.size(),
-                 stop_gc_replica,
-                 stop_gc_decree_gap,
-                 stop_gc_garbage_max_decree,
-                 stop_gc_log_max_decree);
-    } else {
-        LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, "
-                 "reserved_log_count = {}, reserved_log_size = {}, "
-                 "reserved_smallest_log = {}, reserved_largest_log = {}, "
-                 "to_delete_log_count = {}, to_delete_log_size = {}, "
-                 "deleted_log_count = {}, deleted_log_size = {}, "
-                 "deleted_smallest_log = {}, deleted_largest_log = {}",
-                 file_count_limit,
-                 reserved_log_count,
-                 reserved_log_size,
-                 reserved_smallest_log,
-                 reserved_largest_log,
-                 to_delete_log_count,
-                 to_delete_log_size,
-                 deleted_log_count,
-                 deleted_log_size,
-                 deleted_smallest_log,
-                 deleted_largest_log);
-    }
-
-    return reserved_log_count;
 }
 
-std::map<int, log_file_ptr> mutation_log::get_log_file_map() const
+mutation_log::log_file_map_by_index mutation_log::get_log_file_map() const
 {
     zauto_lock l(_lock);
     return _log_files;
@@ -1719,3 +1654,5 @@ std::map<int, log_file_ptr> 
mutation_log::get_log_file_map() const
 
 } // namespace replication
 } // namespace dsn
+
+USER_DEFINED_STRUCTURE_FORMATTER(dsn::replication::gc_summary_info);
diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h
index 6636f808a..b6223e847 100644
--- a/src/replica/mutation_log.h
+++ b/src/replica/mutation_log.h
@@ -26,11 +26,13 @@
 
 #pragma once
 
+#include <fmt/core.h>
 #include <stddef.h>
 #include <stdint.h>
 #include <algorithm>
 #include <atomic>
 #include <functional>
+#include <iosfwd>
 #include <map>
 #include <memory>
 #include <set>
@@ -50,6 +52,7 @@
 #include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
+#include "utils/fmt_utils.h"
 #include "utils/zlocks.h"
 
 namespace dsn {
@@ -221,19 +224,17 @@ public:
                            int64_t reserve_max_size,
                            int64_t reserve_max_time);
 
-    // garbage collection for shared log, returns reserved file count.
-    // `prevent_gc_replicas' will store replicas which prevent log files out 
of `file_count_limit'
-    // to be deleted.
-    // remove log files if satisfy:
-    //  - for each replica "r":
-    //         r is not in file.max_decree
-    //      || file.max_decree[r] <= gc_condition[r].max_decree
-    //      || file.end_offset[r] <= gc_condition[r].valid_start_offset
-    //  - the current log file should not be removed
-    // thread safe
-    int garbage_collection(const replica_log_info_map &gc_condition,
-                           int file_count_limit,
-                           std::set<gpid> &prevent_gc_replicas);
+    // Garbage collection for shared log.
+    // `prevent_gc_replicas' will store replicas which prevent log files from 
being deleted
+    // for gc.
+    //
+    // Since slog had been deprecated, no new slog files would be created. 
Therefore, our
+    // target is to remove all of the existing slog files according to the 
progressive durable
+    // decree for each replica.
+    //
+    // Thread safe.
+    void garbage_collection(const replica_log_info_map 
&replica_durable_decrees,
+                            std::set<gpid> &prevent_gc_replicas);
 
     //
     // when this is a private log, log files are learned by remote replicas
@@ -285,8 +286,10 @@ public:
     decree max_gced_decree(gpid gpid) const;
     decree max_gced_decree_no_lock(gpid gpid) const;
 
+    using log_file_map_by_index = std::map<int, log_file_ptr>;
+
     // thread-safe
-    std::map<int, log_file_ptr> get_log_file_map() const;
+    log_file_map_by_index get_log_file_map() const;
 
     // check the consistence of valid_start_offset
     // thread safe
@@ -300,6 +303,58 @@ public:
 
     task_tracker *tracker() { return &_tracker; }
 
+    struct reserved_slog_info
+    {
+        size_t file_count = 0;
+        int64_t log_size = 0;
+        int min_file_index = 0;
+        int max_file_index = 0;
+
+        std::string to_string() const
+        {
+            return fmt::format("reserved_slog_info = [file_count = {}, 
log_size = {}, "
+                               "min_file_index = {}, max_file_index = {}]",
+                               file_count,
+                               log_size,
+                               min_file_index,
+                               max_file_index);
+        }
+
+        friend std::ostream &operator<<(std::ostream &os, const 
reserved_slog_info &reserved_log)
+        {
+            return os << reserved_log.to_string();
+        }
+    };
+
+    struct slog_deletion_info
+    {
+        int to_delete_file_count = 0;
+        int64_t to_delete_log_size = 0;
+        int deleted_file_count = 0;
+        int64_t deleted_log_size = 0;
+        int deleted_min_file_index = 0;
+        int deleted_max_file_index = 0;
+
+        std::string to_string() const
+        {
+            return fmt::format("slog_deletion_info = [to_delete_file_count = 
{}, "
+                               "to_delete_log_size = {}, deleted_file_count = 
{}, "
+                               "deleted_log_size = {}, deleted_min_file_index 
= {}, "
+                               "deleted_max_file_index = {}]",
+                               to_delete_file_count,
+                               to_delete_log_size,
+                               deleted_file_count,
+                               deleted_log_size,
+                               deleted_min_file_index,
+                               deleted_max_file_index);
+        }
+
+        friend std::ostream &operator<<(std::ostream &os, const 
slog_deletion_info &log_deletion)
+        {
+            return os << log_deletion.to_string();
+        }
+    };
+
 protected:
     // thread-safe
     // 'size' is data size to write; the '_global_end_offset' will be updated 
by 'size'.
@@ -325,7 +380,7 @@ private:
                              replay_callback callback,
                              /*out*/ int64_t &end_offset);
 
-    static error_code replay(std::map<int, log_file_ptr> &log_files,
+    static error_code replay(log_file_map_by_index &log_files,
                              replay_callback callback,
                              /*out*/ int64_t &end_offset);
 
@@ -345,6 +400,13 @@ private:
     // get total size ithout lock.
     int64_t total_size_no_lock() const;
 
+    // Closing and remove all of slog files whose indexes are less than (i.e. 
older) or equal to
+    // `max_file_index_to_delete`.
+    void remove_obsolete_slog_files(const int max_file_index_to_delete,
+                                    log_file_map_by_index &files,
+                                    reserved_slog_info &reserved_log,
+                                    slog_deletion_info &log_deletion);
+
 protected:
     std::string _dir;
     bool _is_private;
@@ -373,12 +435,12 @@ private:
     bool _switch_file_demand;
 
     // logs
-    int _last_file_index;                   // new log file index = 
_last_file_index + 1
-    std::map<int, log_file_ptr> _log_files; // index -> log_file_ptr
-    log_file_ptr _current_log_file;         // current log file
-    int64_t _global_start_offset;           // global start offset of all 
files.
-                                            // invalid if _log_files.size() == 
0.
-    int64_t _global_end_offset;             // global end offset currently
+    int _last_file_index;             // new log file index = _last_file_index 
+ 1
+    log_file_map_by_index _log_files; // index -> log_file_ptr
+    log_file_ptr _current_log_file;   // current log file
+    int64_t _global_start_offset;     // global start offset of all files.
+                                      // invalid if _log_files.size() == 0.
+    int64_t _global_end_offset;       // global end offset currently
 
     // replica log info
     // - log_info.max_decree: the max decree of mutations up to now
@@ -410,21 +472,21 @@ public:
     {
     }
 
-    virtual ~mutation_log_shared() override
+    ~mutation_log_shared() override
     {
         close();
         _tracker.cancel_outstanding_tasks();
     }
 
-    virtual ::dsn::task_ptr append(mutation_ptr &mu,
-                                   dsn::task_code callback_code,
-                                   dsn::task_tracker *tracker,
-                                   aio_handler &&callback,
-                                   int hash = 0,
-                                   int64_t *pending_size = nullptr) override;
+    ::dsn::task_ptr append(mutation_ptr &mu,
+                           dsn::task_code callback_code,
+                           dsn::task_tracker *tracker,
+                           aio_handler &&callback,
+                           int hash = 0,
+                           int64_t *pending_size = nullptr) override;
 
-    virtual void flush() override;
-    virtual void flush_once() override;
+    void flush() override;
+    void flush_once() override;
 
 private:
     // async write pending mutations into log file
@@ -467,24 +529,22 @@ public:
         _tracker.cancel_outstanding_tasks();
     }
 
-    virtual ::dsn::task_ptr append(mutation_ptr &mu,
-                                   dsn::task_code callback_code,
-                                   dsn::task_tracker *tracker,
-                                   aio_handler &&callback,
-                                   int hash = 0,
-                                   int64_t *pending_size = nullptr) override;
+    ::dsn::task_ptr append(mutation_ptr &mu,
+                           dsn::task_code callback_code,
+                           dsn::task_tracker *tracker,
+                           aio_handler &&callback,
+                           int hash = 0,
+                           int64_t *pending_size = nullptr) override;
 
-    virtual bool get_learn_state_in_memory(decree start_decree,
-                                           binary_writer &writer) const 
override;
+    bool get_learn_state_in_memory(decree start_decree, binary_writer &writer) 
const override;
 
     // get in-memory mutations, including pending and writing mutations
-    virtual void
-    get_in_memory_mutations(decree start_decree,
-                            ballot start_ballot,
-                            /*out*/ std::vector<mutation_ptr> &mutation_list) 
const override;
+    void get_in_memory_mutations(decree start_decree,
+                                 ballot start_ballot,
+                                 /*out*/ std::vector<mutation_ptr> 
&mutation_list) const override;
 
-    virtual void flush() override;
-    virtual void flush_once() override;
+    void flush() override;
+    void flush_once() override;
 
 private:
     // async write pending mutations into log file
@@ -499,7 +559,7 @@ private:
                                   std::shared_ptr<log_appender> &pending,
                                   decree max_commit);
 
-    virtual void init_states() override;
+    void init_states() override;
 
     // flush at most count times
     // if count <= 0, means flush until all data is on disk
@@ -521,3 +581,6 @@ private:
 
 } // namespace replication
 } // namespace dsn
+
+USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::mutation_log::reserved_slog_info);
+USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::mutation_log::slog_deletion_info);
diff --git a/src/replica/mutation_log_replay.cpp 
b/src/replica/mutation_log_replay.cpp
index 261ff4706..316c12396 100644
--- a/src/replica/mutation_log_replay.cpp
+++ b/src/replica/mutation_log_replay.cpp
@@ -133,7 +133,7 @@ namespace replication {
                                            replay_callback callback,
                                            /*out*/ int64_t &end_offset)
 {
-    std::map<int, log_file_ptr> logs;
+    log_file_map_by_index logs;
     for (auto &fpath : log_files) {
         error_code err;
         log_file_ptr log = log_file::open_read(fpath.c_str(), err);
@@ -154,7 +154,7 @@ namespace replication {
     return replay(logs, callback, end_offset);
 }
 
-/*static*/ error_code mutation_log::replay(std::map<int, log_file_ptr> &logs,
+/*static*/ error_code mutation_log::replay(log_file_map_by_index &logs,
                                            replay_callback callback,
                                            /*out*/ int64_t &end_offset)
 {
diff --git a/src/replica/mutation_log_utils.cpp 
b/src/replica/mutation_log_utils.cpp
index 508389749..46c32b4df 100644
--- a/src/replica/mutation_log_utils.cpp
+++ b/src/replica/mutation_log_utils.cpp
@@ -64,7 +64,7 @@ namespace log_utils {
 }
 
 /*extern*/
-error_s check_log_files_continuity(const std::map<int, log_file_ptr> &logs)
+error_s check_log_files_continuity(const mutation_log::log_file_map_by_index 
&logs)
 {
     if (logs.empty()) {
         return error_s::ok();
diff --git a/src/replica/mutation_log_utils.h b/src/replica/mutation_log_utils.h
index 4b6184669..9673546f8 100644
--- a/src/replica/mutation_log_utils.h
+++ b/src/replica/mutation_log_utils.h
@@ -31,6 +31,7 @@
 #include <vector>
 
 #include "replica/log_file.h"
+#include "replica/mutation_log.h"
 #include "utils/autoref_ptr.h"
 #include "utils/errors.h"
 #include "utils/string_view.h"
@@ -44,7 +45,7 @@ extern error_s open_read(string_view path, /*out*/ 
log_file_ptr &file);
 extern error_s list_all_files(const std::string &dir, /*out*/ 
std::vector<std::string> &files);
 
 inline error_s open_log_file_map(const std::vector<std::string> &log_files,
-                                 /*out*/ std::map<int, log_file_ptr> 
&log_file_map)
+                                 /*out*/ mutation_log::log_file_map_by_index 
&log_file_map)
 {
     for (const std::string &fname : log_files) {
         log_file_ptr lf;
@@ -58,7 +59,7 @@ inline error_s open_log_file_map(const 
std::vector<std::string> &log_files,
 }
 
 inline error_s open_log_file_map(const std::string &dir,
-                                 /*out*/ std::map<int, log_file_ptr> 
&log_file_map)
+                                 /*out*/ mutation_log::log_file_map_by_index 
&log_file_map)
 {
     std::vector<std::string> log_files;
     error_s es = list_all_files(dir, log_files);
@@ -68,11 +69,11 @@ inline error_s open_log_file_map(const std::string &dir,
     return open_log_file_map(log_files, log_file_map) << 
"open_log_file_map(dir)";
 }
 
-extern error_s check_log_files_continuity(const std::map<int, log_file_ptr> 
&logs);
+extern error_s check_log_files_continuity(const 
mutation_log::log_file_map_by_index &logs);
 
 inline error_s check_log_files_continuity(const std::string &dir)
 {
-    std::map<int, log_file_ptr> log_file_map;
+    mutation_log::log_file_map_by_index log_file_map;
     error_s es = open_log_file_map(dir, log_file_map);
     if (!es.is_ok()) {
         return es << "check_log_files_continuity(dir)";
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index a96b8386f..c5bd5000a 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -36,6 +36,7 @@
 #include <boost/algorithm/string/replace.hpp>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <fmt/core.h>
+#include <fmt/format.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -43,6 +44,7 @@
 #include <chrono>
 #include <cstdint>
 #include <deque>
+#include <limits>
 #include <mutex>
 #include <ostream>
 #include <set>
@@ -167,7 +169,13 @@ DSN_DEFINE_int32(replication,
                  32,
                  "shared log maximum segment file size (MB)");
 
-DSN_DEFINE_int32(replication, log_shared_file_count_limit, 100, "shared log 
maximum file count");
+DSN_DEFINE_uint64(
+    replication,
+    log_shared_gc_flush_replicas_limit,
+    64,
+    "The number of submitted replicas that are flushed for gc shared logs; 0 
means no limit");
+DSN_TAG_VARIABLE(log_shared_gc_flush_replicas_limit, FT_MUTABLE);
+
 DSN_DEFINE_int32(
     replication,
     mem_release_check_interval_ms,
@@ -192,6 +200,9 @@ bool replica_stub::s_not_exit_on_log_failure = false;
 replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
                            bool is_long_subscriber /* = true*/)
     : serverlet("replica_stub"),
+      _last_prevent_gc_replica_count(0),
+      _real_log_shared_gc_flush_replicas_limit(0),
+      _mock_flush_replicas_for_test(0),
       _deny_client(false),
       _verbose_client_log(false),
       _verbose_commit_log(false),
@@ -578,13 +589,7 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
     LOG_INFO("primary_address = {}", _primary_address_str);
 
     set_options(opts);
-    std::ostringstream oss;
-    for (int i = 0; i < _options.meta_servers.size(); ++i) {
-        if (i != 0)
-            oss << ",";
-        oss << _options.meta_servers[i].to_string();
-    }
-    LOG_INFO("meta_servers = {}", oss.str());
+    LOG_INFO("meta_servers = {}", fmt::join(_options.meta_servers, ", "));
 
     _deny_client = FLAGS_deny_client_on_start;
     _verbose_client_log = FLAGS_verbose_client_log_on_start;
@@ -1767,138 +1772,202 @@ void replica_stub::on_gc_replica(replica_stub_ptr 
this_, gpid id)
     }
 }
 
-void replica_stub::on_gc()
+void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map)
 {
-    uint64_t start = dsn_now_ns();
+    if (_log == nullptr) {
+        return;
+    }
 
-    struct gc_info
-    {
-        replica_ptr rep;
-        partition_status::type status;
-        mutation_log_ptr plog;
-        decree last_durable_decree;
-        int64_t init_offset_in_shared_log;
-    };
-
-    std::unordered_map<gpid, gc_info> rs;
-    {
-        zauto_read_lock l(_replicas_lock);
-        // collect info in lock to prevent the case that the replica is closed 
in replica::close()
-        for (auto &kv : _replicas) {
-            const replica_ptr &rep = kv.second;
-            gc_info &info = rs[kv.first];
-            info.rep = rep;
-            info.status = rep->status();
-            info.plog = rep->private_log();
-            info.last_durable_decree = rep->last_durable_decree();
-            info.init_offset_in_shared_log = 
rep->get_app()->init_info().init_offset_in_shared_log;
+    replica_log_info_map replica_durable_decrees;
+    for (auto &replica_gc : replica_gc_map) {
+        replica_log_info replica_log;
+        auto &rep = replica_gc.second.rep;
+        auto &plog = replica_gc.second.plog;
+        if (plog) {
+            // Flush private log to update `plog_max_commit_on_disk`, and just 
flush once
+            // to avoid flushing infinitely.
+            plog->flush_once();
+            auto plog_max_commit_on_disk = plog->max_commit_on_disk();
+
+            replica_log.max_decree =
+                std::min(replica_gc.second.last_durable_decree, 
plog_max_commit_on_disk);
+            LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
+                     "last_durable_decree= {}, plog_max_commit_on_disk = {}",
+                     rep->name(),
+                     enum_to_string(replica_gc.second.status),
+                     replica_log.max_decree,
+                     replica_gc.second.last_durable_decree,
+                     plog_max_commit_on_disk);
+        } else {
+            replica_log.max_decree = replica_gc.second.last_durable_decree;
+            LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
+                     "last_durable_decree = {}",
+                     rep->name(),
+                     enum_to_string(replica_gc.second.status),
+                     replica_log.max_decree,
+                     replica_gc.second.last_durable_decree);
         }
+        replica_log.valid_start_offset = 
replica_gc.second.init_offset_in_shared_log;
+        replica_durable_decrees[replica_gc.first] = replica_log;
     }
 
-    LOG_INFO("start to garbage collection, replica_count = {}", rs.size());
+    // Garbage collection for shared log files.
+    std::set<gpid> prevent_gc_replicas;
+    _log->garbage_collection(replica_durable_decrees, prevent_gc_replicas);
+
+    // Trigger checkpoint to flush memtables once some replicas were found 
that prevent
+    // slog files from being removed for gc.
+    flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
 
-    // gc shared prepare log
+    auto total_size = _log->total_size();
+    _counter_shared_log_size->set(total_size / (1024 * 1024));
+
+    // TODO(wangdan): currently we could not yet call _log.reset() as below to 
close slog and
+    // reset it to nullptr even if it was found that slog had become empty 
(which means there
+    // had not been any file for slog).
+    // if (total_size == 0) {
+    //     _log.reset();
+    // }
     //
-    // Now that checkpoint is very important for gc, we must be able to 
trigger checkpoint when
-    // necessary.
-    // that is, we should be able to trigger memtable flush when necessary.
+    // The reason for this point is that on_gc() is scheduled by timer to run 
asynchronously
+    // during the initialization of replica_stub. It might happen before 
slog.on_partition_reset()
+    // (building slog._shared_log_info_map), which means slog would be closed 
mistakenly before
+    // it was initialized completely.
     //
-    // How to trigger memtable flush?
-    //   we add a parameter `is_emergency' in dsn_app_async_checkpoint() 
function, when set true,
-    //   the undering storage system should flush memtable as soon as 
possiable.
+    // All of slog files would removed on v2.5; thus it is safe to remove all 
of slog code (which
+    // means even slog object would not be created) on the next version 
(namely 2.6), and this
+    // problem would also be resolved.
+}
+
+void replica_stub::limit_flush_replicas_for_slog_gc(size_t 
prevent_gc_replica_count)
+{
+    const size_t log_shared_gc_flush_replicas_limit = 
FLAGS_log_shared_gc_flush_replicas_limit;
+    if (log_shared_gc_flush_replicas_limit == 0) {
+        // 0 for log_shared_gc_flush_replicas_limit means no limit.
+        _real_log_shared_gc_flush_replicas_limit = 
std::numeric_limits<size_t>::max();
+        return;
+    }
+
+    if (_last_prevent_gc_replica_count == 0) {
+        // Initialize it for the 1st time.
+        _real_log_shared_gc_flush_replicas_limit = 
log_shared_gc_flush_replicas_limit;
+        return;
+    }
+
+    CHECK_GE(_last_prevent_gc_replica_count, prevent_gc_replica_count);
+    size_t flushed_replicas = _last_prevent_gc_replica_count - 
prevent_gc_replica_count;
+    if (flushed_replicas == 0) {
+        // It's too busy to process more flush tasks.
+        _real_log_shared_gc_flush_replicas_limit =
+            std::min(2UL, log_shared_gc_flush_replicas_limit);
+        return;
+    }
+
+    if (_real_log_shared_gc_flush_replicas_limit == 0 ||
+        _real_log_shared_gc_flush_replicas_limit == 
std::numeric_limits<size_t>::max()) {
+        // Once it was previously set with some special values, it should be 
reset.
+        _real_log_shared_gc_flush_replicas_limit = 
log_shared_gc_flush_replicas_limit;
+        return;
+    }
+
+    if (flushed_replicas < _real_log_shared_gc_flush_replicas_limit) {
+        // Keep it unchanged.
+        return;
+    }
+
+    // Increase it to process more flush tasks.
+    _real_log_shared_gc_flush_replicas_limit =
+        std::min(log_shared_gc_flush_replicas_limit, 
_real_log_shared_gc_flush_replicas_limit << 1);
+}
+
+void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map 
&replica_gc_map,
+                                              const std::set<gpid> 
&prevent_gc_replicas)
+{
+    // Trigger checkpoints to flush memtables once some replicas were found 
that prevent slog files
+    // from being removed for gc.
     //
-    // When to trigger memtable flush?
-    //   1. Using `[replication].checkpoint_max_interval_hours' option, we can 
set max interval time
-    //   of two adjacent checkpoints; If the time interval is arrived, then 
emergency checkpoint
-    //   will be triggered.
-    //   2. Using `[replication].log_shared_file_count_limit' option, we can 
set max file count of
-    //   shared log; If the limit is exceeded, then emergency checkpoint will 
be triggered; Instead
-    //   of triggering all replicas to do checkpoint, we will only trigger a 
few of necessary
-    //   replicas which block garbage collection of the oldest log file.
+    // How to trigger memtable flush ?
+    //   A parameter `is_emergency' was added for 
`replica::background_async_checkpoint()` function;
+    //   once it's set true, underlying storage engine would flush memtable as 
soon as possiable.
     //
-    if (_log != nullptr) {
-        replica_log_info_map gc_condition;
-        for (auto &kv : rs) {
-            replica_log_info ri;
-            replica_ptr &rep = kv.second.rep;
-            mutation_log_ptr &plog = kv.second.plog;
-            if (plog) {
-                // flush private log to update plog_max_commit_on_disk,
-                // and just flush once to avoid flushing infinitely
-                plog->flush_once();
-
-                decree plog_max_commit_on_disk = plog->max_commit_on_disk();
-                ri.max_decree = std::min(kv.second.last_durable_decree, 
plog_max_commit_on_disk);
-                LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
-                         "last_durable_decree= {}, plog_max_commit_on_disk = 
{}",
-                         rep->name(),
-                         enum_to_string(kv.second.status),
-                         ri.max_decree,
-                         kv.second.last_durable_decree,
-                         plog_max_commit_on_disk);
-            } else {
-                ri.max_decree = kv.second.last_durable_decree;
-                LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
-                         "last_durable_decree = {}",
-                         rep->name(),
-                         enum_to_string(kv.second.status),
-                         ri.max_decree,
-                         kv.second.last_durable_decree);
-            }
-            ri.valid_start_offset = kv.second.init_offset_in_shared_log;
-            gc_condition[kv.first] = ri;
+    // When memtable flush is triggered ?
+    //   1. After a fixed interval (specified by 
`[replication].gc_interval_ms` option), try to find
+    //   if there are some replicas preventing slog files from being removed 
for gc; if any, all of
+    //   them would be deleted "gradually" ("gradually" means the number of 
the replicas whose
+    //   memtables are submitted to storage engine to be flushed would be 
limited).
+    //   2. `[replication].checkpoint_max_interval_hours' option specified the 
max interval between
+    //   the two adjacent checkpoints.
+
+    if (prevent_gc_replicas.empty()) {
+        return;
+    }
+
+    limit_flush_replicas_for_slog_gc(prevent_gc_replicas.size());
+    _last_prevent_gc_replica_count = prevent_gc_replicas.size();
+
+    LOG_INFO("gc_shared: trigger emergency checkpoints to flush replicas for 
gc shared logs: "
+             "log_shared_gc_flush_replicas_limit = {}/{}, 
prevent_gc_replicas({}) = {}",
+             _real_log_shared_gc_flush_replicas_limit,
+             FLAGS_log_shared_gc_flush_replicas_limit,
+             prevent_gc_replicas.size(),
+             fmt::join(prevent_gc_replicas, ", "));
+
+    size_t i = 0;
+    for (const auto &pid : prevent_gc_replicas) {
+        const auto &replica_gc = replica_gc_map.find(pid);
+        if (replica_gc == replica_gc_map.end()) {
+            continue;
         }
 
-        std::set<gpid> prevent_gc_replicas;
-        int reserved_log_count = _log->garbage_collection(
-            gc_condition, FLAGS_log_shared_file_count_limit, 
prevent_gc_replicas);
-        if (reserved_log_count > FLAGS_log_shared_file_count_limit * 2) {
-            LOG_INFO(
-                "gc_shared: trigger emergency checkpoint by 
FLAGS_log_shared_file_count_limit, "
-                "file_count_limit = {}, reserved_log_count = {}, trigger all 
replicas to do "
-                "checkpoint",
-                FLAGS_log_shared_file_count_limit,
-                reserved_log_count);
-            for (auto &kv : rs) {
-                tasking::enqueue(
-                    LPC_PER_REPLICA_CHECKPOINT_TIMER,
-                    kv.second.rep->tracker(),
-                    std::bind(&replica_stub::trigger_checkpoint, this, 
kv.second.rep, true),
-                    kv.first.thread_hash(),
-                    std::chrono::milliseconds(rand::next_u32(0, 
FLAGS_gc_interval_ms / 2)));
-            }
-        } else if (reserved_log_count > FLAGS_log_shared_file_count_limit) {
-            std::ostringstream oss;
-            int c = 0;
-            for (auto &i : prevent_gc_replicas) {
-                if (c != 0)
-                    oss << ", ";
-                oss << i.to_string();
-                c++;
-            }
-            LOG_INFO(
-                "gc_shared: trigger emergency checkpoint by 
FLAGS_log_shared_file_count_limit, "
-                "file_count_limit = {}, reserved_log_count = {}, 
prevent_gc_replica_count = "
-                "{}, trigger them to do checkpoint: {}",
-                FLAGS_log_shared_file_count_limit,
-                reserved_log_count,
-                prevent_gc_replicas.size(),
-                oss.str());
-            for (auto &id : prevent_gc_replicas) {
-                auto find = rs.find(id);
-                if (find != rs.end()) {
-                    tasking::enqueue(
-                        LPC_PER_REPLICA_CHECKPOINT_TIMER,
-                        find->second.rep->tracker(),
-                        std::bind(&replica_stub::trigger_checkpoint, this, 
find->second.rep, true),
-                        id.thread_hash(),
-                        std::chrono::milliseconds(rand::next_u32(0, 
FLAGS_gc_interval_ms / 2)));
-                }
-            }
+        if (++i > _real_log_shared_gc_flush_replicas_limit) {
+            break;
         }
 
-        _counter_shared_log_size->set(_log->total_size() / (1024 * 1024));
+        bool mock_flush = false;
+        FAIL_POINT_INJECT_NOT_RETURN_F(
+            "mock_flush_replicas_for_slog_gc", [&mock_flush, this, 
i](dsn::string_view str) {
+                CHECK(buf2bool(str, mock_flush),
+                      "invalid mock_flush_replicas_for_slog_gc toggle, should 
be true or false: {}",
+                      str);
+                _mock_flush_replicas_for_test = i;
+            });
+        if (dsn_unlikely(mock_flush)) {
+            continue;
+        }
+
+        tasking::enqueue(
+            LPC_PER_REPLICA_CHECKPOINT_TIMER,
+            replica_gc->second.rep->tracker(),
+            std::bind(&replica_stub::trigger_checkpoint, this, 
replica_gc->second.rep, true),
+            pid.thread_hash(),
+            std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 
2)));
     }
+}
+
+void replica_stub::on_gc()
+{
+    uint64_t start = dsn_now_ns();
+
+    replica_gc_info_map replica_gc_map;
+    {
+        zauto_read_lock l(_replicas_lock);
+        // A replica was removed from _replicas before it would be closed by 
replica::close().
+        // Thus it's safe to use the replica after fetching its ref pointer 
from _replicas.
+        for (const auto &rep_pair : _replicas) {
+            const replica_ptr &rep = rep_pair.second;
+
+            auto &replica_gc = replica_gc_map[rep_pair.first];
+            replica_gc.rep = rep;
+            replica_gc.status = rep->status();
+            replica_gc.plog = rep->private_log();
+            replica_gc.last_durable_decree = rep->last_durable_decree();
+            replica_gc.init_offset_in_shared_log =
+                rep->get_app()->init_info().init_offset_in_shared_log;
+        }
+    }
+
+    LOG_INFO("start to garbage collection, replica_count = {}", 
replica_gc_map.size());
+    gc_slog(replica_gc_map);
 
     // statistic learning info
     uint64_t learning_count = 0;
@@ -1914,7 +1983,7 @@ void replica_stub::on_gc()
     uint64_t splitting_max_duration_time_ms = 0;
     uint64_t splitting_max_async_learn_time_ms = 0;
     uint64_t splitting_max_copy_file_size = 0;
-    for (auto &kv : rs) {
+    for (auto &kv : replica_gc_map) {
         replica_ptr &rep = kv.second.rep;
         if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
             learning_count++;
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index eddf524ac..e71e8f864 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -32,19 +32,20 @@
 //   replica_stub(singleton) --> replica --> replication_app_base
 //
 
+#include <gtest/gtest_prod.h>
+#include <stddef.h>
 #include <stdint.h>
 #include <atomic>
 #include <functional>
 #include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <tuple>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
-#include <gtest/gtest_prod.h>
-
 #include "block_service/block_service_manager.h"
 #include "bulk_load_types.h"
 #include "common/bulk_load_common.h"
@@ -361,6 +362,30 @@ private:
     replica_life_cycle get_replica_life_cycle(gpid id);
     void on_gc_replica(replica_stub_ptr this_, gpid id);
 
+    struct replica_gc_info
+    {
+        replica_ptr rep;
+        partition_status::type status;
+        mutation_log_ptr plog;
+        decree last_durable_decree;
+        int64_t init_offset_in_shared_log;
+    };
+    using replica_gc_info_map = std::unordered_map<gpid, replica_gc_info>;
+
+    // Try to remove obsolete files of shared log for garbage collection 
according to the provided
+    // states of all replicas. The purpose is to remove all of the files of 
shared log, since it
+    // has been deprecated, and would not be appended any more.
+    void gc_slog(const replica_gc_info_map &replica_gc_map);
+
+    // The number of flushed replicas for the garbage collection of shared log 
at a time should be
+    // limited.
+    void limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_count);
+
+    // Flush rocksdb data to sst files for replicas to facilitate garbage 
collection of more files
+    // of shared log.
+    void flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map,
+                                    const std::set<gpid> &prevent_gc_replicas);
+
     void response_client(gpid id,
                          bool is_read,
                          dsn::message_ex *request,
@@ -423,6 +448,7 @@ private:
     FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
     FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption);
     FRIEND_TEST(replica_test, test_clear_on_failure);
+    FRIEND_TEST(GcSlogFlushFeplicasTest, FlushReplicas);
 
     typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
     typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, 
app_info, replica_info>>
@@ -436,6 +462,15 @@ private:
     closing_replicas _closing_replicas;
     closed_replicas _closed_replicas;
 
+    // The number of replicas that prevent slog files from being removed for 
gc at the last round.
+    size_t _last_prevent_gc_replica_count;
+
+    // The real limit of flushed replicas for the garbage collection of shared 
log.
+    size_t _real_log_shared_gc_flush_replicas_limit;
+
+    // The number of flushed replicas, mocked only for test.
+    size_t _mock_flush_replicas_for_test;
+
     mutation_log_ptr _log;
     ::dsn::rpc_address _primary_address;
     char _primary_address_str[64];
diff --git a/src/replica/replication_app_base.cpp 
b/src/replica/replication_app_base.cpp
index db92bdf17..4aa7a8e14 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -51,7 +51,6 @@
 #include "runtime/task/task_code.h"
 #include "runtime/task/task_spec.h"
 #include "runtime/task/task_tracker.h"
-#include "utils/autoref_ptr.h"
 #include "utils/binary_reader.h"
 #include "utils/binary_writer.h"
 #include "utils/blob.h"
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 104bd2c07..48072f546 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -445,7 +445,7 @@ public:
                            dsn::task_tracker *tracker,
                            aio_handler &&callback,
                            int hash = 0,
-                           int64_t *pending_size = nullptr)
+                           int64_t *pending_size = nullptr) override
     {
         _mu_list.push_back(mu);
         return nullptr;
diff --git a/src/replica/test/mutation_log_test.cpp 
b/src/replica/test/mutation_log_test.cpp
index 6bca13336..527c1e8d9 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -26,11 +26,16 @@
 
 #include "replica/mutation_log.h"
 
+// IWYU pragma: no_include <ext/alloc_traits.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
 #include <stdio.h>
 #include <sys/types.h>
+#include <cstdint>
+#include <iostream>
+#include <limits>
 #include <unordered_map>
 
 #include "aio/aio_task.h"
@@ -40,12 +45,17 @@
 #include "replica/log_block.h"
 #include "replica/log_file.h"
 #include "replica/mutation.h"
+#include "replica/replica_stub.h"
 #include "replica/test/mock_utils.h"
 #include "replica_test_base.h"
+#include "rrdb/rrdb.code.definition.h"
 #include "utils/binary_reader.h"
 #include "utils/binary_writer.h"
 #include "utils/blob.h"
+#include "utils/defer.h"
+#include "utils/fail_point.h"
 #include "utils/filesystem.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
 
@@ -121,7 +131,7 @@ TEST(replication, log_file)
             lf->write_file_header(temp_writer, mdecrees);
             writer->add(temp_writer.get_buffer());
             ASSERT_EQ(mdecrees, lf->previous_log_max_decrees());
-            log_file_header &h = lf->header();
+            const auto &h = lf->header();
             ASSERT_EQ(100, h.start_global_offset);
         }
 
@@ -450,6 +460,83 @@ public:
             ASSERT_GE(log_files.size(), 1);
         }
     }
+
+    mutation_ptr generate_slog_mutation(const gpid &pid, const decree d, const 
std::string &data)
+    {
+        mutation_ptr mu(new mutation());
+        mu->data.header.ballot = 1;
+        mu->data.header.decree = d;
+        mu->data.header.pid = pid;
+        mu->data.header.last_committed_decree = d - 1;
+        mu->data.header.log_offset = 0;
+        mu->data.header.timestamp = d;
+
+        mu->data.updates.push_back(mutation_update());
+        mu->data.updates.back().code = dsn::apps::RPC_RRDB_RRDB_PUT;
+        mu->data.updates.back().data = 
blob::create_from_bytes(std::string(data));
+
+        mu->client_requests.push_back(nullptr);
+
+        return mu;
+    }
+
+    void generate_slog_file(const std::vector<std::pair<gpid, size_t>> 
&replica_mutations,
+                            mutation_log_ptr &mlog,
+                            decree &d,
+                            std::unordered_map<gpid, int64_t> 
&valid_start_offsets,
+                            std::pair<gpid, int64_t> &slog_file_start_offset)
+    {
+        for (size_t i = 0; i < replica_mutations.size(); ++i) {
+            const auto &pid = replica_mutations[i].first;
+
+            for (size_t j = 0; j < replica_mutations[i].second; ++j) {
+                if (i == 0) {
+                    // Record the start offset of each slog file.
+                    slog_file_start_offset.first = pid;
+                    slog_file_start_offset.second = mlog->get_global_offset();
+                }
+
+                const auto &it = valid_start_offsets.find(pid);
+                if (it == valid_start_offsets.end()) {
+                    // Add new partition with its start offset in slog.
+                    valid_start_offsets.emplace(pid, 
mlog->get_global_offset());
+                    mlog->set_valid_start_offset_on_open(pid, 
mlog->get_global_offset());
+                }
+
+                // Append a mutation.
+                auto mu = generate_slog_mutation(pid, d++, "test data");
+                mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, mlog->tracker(), 
nullptr, 0);
+            }
+        }
+
+        // Wait until all mutations are written into this file.
+        mlog->tracker()->wait_outstanding_tasks();
+    }
+
+    void generate_slog_files(const std::vector<std::vector<std::pair<gpid, 
size_t>>> &files,
+                             mutation_log_ptr &mlog,
+                             std::unordered_map<gpid, int64_t> 
&valid_start_offsets,
+                             std::vector<std::pair<gpid, int64_t>> 
&slog_file_start_offsets)
+    {
+        valid_start_offsets.clear();
+        slog_file_start_offsets.resize(files.size());
+
+        decree d = 1;
+        for (size_t i = 0; i < files.size(); ++i) {
+            generate_slog_file(files[i], mlog, d, valid_start_offsets, 
slog_file_start_offsets[i]);
+            if (i + 1 < files.size()) {
+                // Do not create a new slog file after the last file is 
generated.
+                mlog->create_new_log_file();
+                // Wait until file header is written.
+                mlog->tracker()->wait_outstanding_tasks();
+            }
+        }
+
+        // Close and reset `_current_log_file` since slog has been deprecated 
and would not be
+        // used again.
+        mlog->_current_log_file->close();
+        mlog->_current_log_file = nullptr;
+    }
 };
 
 TEST_F(mutation_log_test, replay_single_file_1000) { 
test_replay_single_file(1000); }
@@ -606,5 +693,182 @@ TEST_F(mutation_log_test, reset_from_while_writing)
     mlog->flush();
     ASSERT_EQ(actual.size(), expected.size());
 }
+
+TEST_F(mutation_log_test, gc_slog)
+{
+    // Remove the slog dir and create a new one.
+    const std::string slog_dir("./slog_test");
+    ASSERT_TRUE(dsn::utils::filesystem::remove_path(slog_dir));
+    ASSERT_TRUE(dsn::utils::filesystem::create_directory(slog_dir));
+
+    // Create and open slog object, which would be closed at the end of the 
scope.
+    mutation_log_ptr mlog = new mutation_log_shared(slog_dir, 1, false);
+    auto cleanup = dsn::defer([mlog]() { mlog->close(); });
+    ASSERT_EQ(ERR_OK, mlog->open(nullptr, nullptr));
+
+    // Each line describes a sequence of mutations written to specified 
replicas by
+    // specified numbers.
+    //
+    // From these sequences the decrees for each partition could be concluded 
as below:
+    // {1, 1}: 9 ~ 15
+    // {1, 2}: 16 ~ 22
+    // {2, 5}: 1 ~ 8, 23 ~ 38
+    // {2, 7}: 39 ~ 46
+    // {5, 6}: 47 ~ 73
+    const std::vector<std::vector<std::pair<gpid, size_t>>> files = {
+        {{{2, 5}, 8}, {{1, 1}, 7}, {{1, 2}, 2}},
+        {{{1, 2}, 5}},
+        {{{2, 5}, 16}, {{2, 7}, 8}, {{5, 6}, 27}}};
+
+    // Each line describes a progress of durable decrees for all of replicas: 
decrees are
+    // continuously being applied and becoming durable.
+    const std::vector<std::unordered_map<gpid, decree>> durable_decrees = {
+        {{{1, 1}, 10}, {{1, 2}, 17}, {{2, 5}, 6}, {{2, 7}, 39}, {{5, 6}, 47}},
+        {{{1, 1}, 15}, {{1, 2}, 18}, {{2, 5}, 7}, {{2, 7}, 40}, {{5, 6}, 57}},
+        {{{1, 1}, 15}, {{1, 2}, 20}, {{2, 5}, 8}, {{2, 7}, 42}, {{5, 6}, 61}},
+        {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 23}, {{2, 7}, 44}, {{5, 6}, 65}},
+        {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 27}, {{2, 7}, 46}, {{5, 6}, 66}},
+        {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 32}, {{2, 7}, 46}, {{5, 6}, 67}},
+        {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 38}, {{2, 7}, 46}, {{5, 6}, 72}},
+        {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 38}, {{2, 7}, 46}, {{5, 6}, 73}},
+    };
+    const std::vector<size_t> remaining_slog_files = {3, 3, 2, 1, 1, 1, 1, 0};
+    const std::vector<std::set<gpid>> expected_prevent_gc_replicas = {
+        {{1, 1}, {1, 2}, {2, 5}, {2, 7}, {5, 6}},
+        {{1, 2}, {2, 5}, {2, 7}, {5, 6}},
+        {{1, 2}, {2, 5}, {2, 7}, {5, 6}},
+        {{2, 5}, {2, 7}, {5, 6}},
+        {{2, 5}, {5, 6}},
+        {{2, 5}, {5, 6}},
+        {{5, 6}},
+        {},
+    };
+
+    // Each line describes an action, that during a round (related to the 
index of
+    // `durable_decrees`), which replica should be reset to the start offset 
of an
+    // slog file (related to the index of `files` and 
`slog_file_start_offsets`).
+    const std::unordered_map<size_t, size_t> set_to_slog_file_start_offsets = {
+        {2, 1},
+    };
+
+    // Create slog files and write some data into them according to test cases.
+    std::unordered_map<gpid, int64_t> valid_start_offsets;
+    std::vector<std::pair<gpid, int64_t>> slog_file_start_offsets;
+    generate_slog_files(files, mlog, valid_start_offsets, 
slog_file_start_offsets);
+
+    for (size_t i = 0; i < durable_decrees.size(); ++i) {
+        std::cout << "Update No." << i << " group of durable decrees" << 
std::endl;
+
+        // Update the progress of durable_decrees for each partition.
+        replica_log_info_map replica_durable_decrees;
+        for (const auto &d : durable_decrees[i]) {
+            replica_durable_decrees.emplace(
+                d.first, replica_log_info(d.second, 
valid_start_offsets[d.first]));
+        }
+
+        // Test condition for `valid_start_offset`, see `can_gc_replica_slog`.
+        const auto &set_to_start = set_to_slog_file_start_offsets.find(i);
+        if (set_to_start != set_to_slog_file_start_offsets.end()) {
+            const auto &start_offset = 
slog_file_start_offsets[set_to_start->second];
+            replica_durable_decrees[start_offset.first].valid_start_offset = 
start_offset.second;
+        }
+
+        // Run garbage collection for a round.
+        std::set<gpid> actual_prevent_gc_replicas;
+        mlog->garbage_collection(replica_durable_decrees, 
actual_prevent_gc_replicas);
+
+        // Check if the number of remaining slog files after garbage 
collection is desired.
+        std::vector<std::string> file_list;
+        ASSERT_TRUE(dsn::utils::filesystem::get_subfiles(slog_dir, file_list, 
false));
+        ASSERT_EQ(remaining_slog_files[i], file_list.size());
+
+        // Check if the replicas that prevent garbage collection (i.e. cannot 
be removed by
+        // garbage collection) is expected.
+        ASSERT_EQ(expected_prevent_gc_replicas[i], actual_prevent_gc_replicas);
+    }
+}
+
+using gc_slog_flush_replicas_case = std::tuple<std::set<gpid>, uint64_t, 
size_t, size_t, size_t>;
+
+class GcSlogFlushFeplicasTest : public 
testing::TestWithParam<gc_slog_flush_replicas_case>
+{
+};
+
+DSN_DECLARE_uint64(log_shared_gc_flush_replicas_limit);
+
+TEST_P(GcSlogFlushFeplicasTest, FlushReplicas)
+{
+    std::set<gpid> prevent_gc_replicas;
+    size_t last_prevent_gc_replica_count;
+    uint64_t limit;
+    size_t last_limit;
+    size_t expected_flush_replicas;
+    std::tie(prevent_gc_replicas,
+             last_prevent_gc_replica_count,
+             limit,
+             last_limit,
+             expected_flush_replicas) = GetParam();
+
+    replica_stub::replica_gc_info_map replica_gc_map;
+    for (const auto &r : prevent_gc_replicas) {
+        replica_gc_map.emplace(r, replica_stub::replica_gc_info());
+    }
+
+    const auto reserved_log_shared_gc_flush_replicas_limit =
+        FLAGS_log_shared_gc_flush_replicas_limit;
+    FLAGS_log_shared_gc_flush_replicas_limit = limit;
+
+    dsn::fail::setup();
+    dsn::fail::cfg("mock_flush_replicas_for_slog_gc", "void(true)");
+
+    replica_stub stub;
+    stub._last_prevent_gc_replica_count = last_prevent_gc_replica_count;
+    stub._real_log_shared_gc_flush_replicas_limit = last_limit;
+
+    stub.flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
+    EXPECT_EQ(expected_flush_replicas, stub._mock_flush_replicas_for_test);
+
+    dsn::fail::teardown();
+
+    FLAGS_log_shared_gc_flush_replicas_limit = 
reserved_log_shared_gc_flush_replicas_limit;
+}
+
+const std::vector<gc_slog_flush_replicas_case> gc_slog_flush_replicas_tests = {
+    // Initially, there is no limit on flushed replicas.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 0, 0, 6},
+    // Initially, there is no limit on flushed replicas.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 1, 0, 5, 6},
+    // Initially, limit is less than the number of replicas.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 1, 0, 1},
+    // Initially, limit is less than the number of replicas.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 2, 0, 2},
+    // Initially, limit is just equal to the number of replicas.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 6, 0, 6},
+    // Initially, limit is more than the number of replicas.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 7, 0, 6},
+    // No replica has been flushed during previous round.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 6, 6, 6, 2},
+    // No replica has been flushed during previous round.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 6, 1, 2, 1},
+    // The previous limit is 0.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 7, 5, 0, 5},
+    // The previous limit is infinite.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 7, 5, 
std::numeric_limits<size_t>::max(), 5},
+    // The number of previously flushed replicas is less than the previous 
limit.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 7, 5, 0, 5},
+    // The number of previously flushed replicas reaches the previous limit.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 8, 6, 2, 4},
+    // The number of previously flushed replicas reaches the previous limit.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 12, 6, 6, 6},
+    // The number of previously flushed replicas is more than the previous 
limit.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 9, 3, 2, 3},
+    // The number of previously flushed replicas is more than the previous 
limit.
+    {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 9, 5, 2, 4},
+};
+
+INSTANTIATE_TEST_CASE_P(MutationLogTest,
+                        GcSlogFlushFeplicasTest,
+                        testing::ValuesIn(gc_slog_flush_replicas_tests));
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/server/config.ini b/src/server/config.ini
index 2d2b7d19d..543fa316f 100644
--- a/src/server/config.ini
+++ b/src/server/config.ini
@@ -278,7 +278,7 @@ stateful = true
   plog_force_flush = false
 
   log_shared_file_size_mb = 128
-  log_shared_file_count_limit = 100
+  log_shared_gc_flush_replicas_limit = 64
   log_shared_batch_buffer_kb = 0
   log_shared_force_flush = false
   log_shared_pending_size_throttling_threshold_kb = 0
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index c34895269..9ad37445e 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2040,7 +2040,8 @@ private:
     }
 
     int64_t checkpoint_decree = 0;
-    ::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), 
&checkpoint_decree);
+    ::dsn::error_code err =
+        copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree, 
flush_memtable);
     if (err != ::dsn::ERR_OK) {
         LOG_ERROR_PREFIX("copy_checkpoint_to_dir_unsafe failed with err = {}", 
err.to_string());
         return ::dsn::ERR_LOCAL_APP_FAILURE;
diff --git a/src/server/test/config.ini b/src/server/test/config.ini
index 018d5b28d..1ec547264 100644
--- a/src/server/test/config.ini
+++ b/src/server/test/config.ini
@@ -187,7 +187,7 @@ log_private_reserve_max_size_mb = 0
 log_private_reserve_max_time_seconds = 0
 
 log_shared_file_size_mb = 32
-log_shared_file_count_limit = 32
+log_shared_gc_flush_replicas_limit = 64
 log_shared_batch_buffer_kb = 0
 log_shared_force_flush = false
 
diff --git a/src/utils/autoref_ptr.h b/src/utils/autoref_ptr.h
index a08ebc6ee..a3a7dfd2b 100644
--- a/src/utils/autoref_ptr.h
+++ b/src/utils/autoref_ptr.h
@@ -159,6 +159,8 @@ public:
 
     void swap(ref_ptr<T> &r) noexcept { std::swap(_obj, r._obj); }
 
+    void reset(T *obj = nullptr) { *this = obj; }
+
     T *get() const { return _obj; }
 
     operator T *() const { return _obj; }
diff --git a/src/utils/fmt_logging.h b/src/utils/fmt_logging.h
index f0c74bd29..5b95794c1 100644
--- a/src/utils/fmt_logging.h
+++ b/src/utils/fmt_logging.h
@@ -64,7 +64,8 @@
     } while (false)
 
 #define CHECK(x, ...) CHECK_EXPRESSION(x, x, __VA_ARGS__)
-#define CHECK_NOTNULL(p, ...) CHECK(p != nullptr, __VA_ARGS__)
+#define CHECK_NOTNULL(p, ...) CHECK((p) != nullptr, __VA_ARGS__)
+#define CHECK_NULL(p, ...) CHECK((p) == nullptr, __VA_ARGS__)
 
 // Macros for writing log message prefixed by log_prefix().
 #define LOG_DEBUG_PREFIX(...) LOG_DEBUG("[{}] {}", log_prefix(), 
fmt::format(__VA_ARGS__))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to