This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 705954872dc86238556456abed0a879bb1462e51
Author: Andrew Wong <[email protected]>
AuthorDate: Thu Jan 30 22:03:07 2020 -0800

    KUDU-1625: background op to GC ancient, fully deleted rowsets
    
    This adds a background op that deletes disk rowsets that have had all of
    their rows deleted. If the most recent update to a rowset is older than
    the ancient history mark, and the rowset contains no live rows, that
    rowset will be deleted.
    
    It'd be nice if we could have the policy work for rowsets that are
    mostly deleted, but such a solution would come with difficult questions
    around write amplification and compatibility with the existing
    compactions strategies. For instance, a more complete solution would
    need to consider whether to rewrite a rowset if it had 25%, 50%, or 75%
    deleted rows: some operators wouldn't mind the write amplification to
    save space. However, picking a good heuristic (or exposing some knobs to
    turn) makes this tricky.
    
    The benefit of the approach in this patch is that no such tradeoff needs
    to be made: the "write amplification" is minimal here because no new
    data blocks are written in performing the operation -- the tablet
    metadata is rewritten to exclude the blocks, and the underlying blocks
    are deleted, which isn't IO intensive either.
    
    There's still room for improvement in this implementation in that,
    currently, a DMS flush will write stats to disk and we'll only read the
    stats if we Init() the DeltaFileReader (e.g. on scan). I'll address this
    in a follow-up patch.
    
    Since the op GCs all viable rowsets in the tablet, a tablet should only
    schedule one deleted rowset GC op at a time. This isn't necessary for
    correctness, but avoids wasting some MM thread cycles.
    
    I ran this on a real cluster, deleting large chunks of keyspace with 4
    MM threads to confirm that space is actually freed, concurrent ops for
    the same tablet aren't scheduled, and the op runs relatively quickly (in
    the tens of ms, compared to hundreds to thousands of ms for other ops).
    
    Change-Id: I696e2a29ea52ad4e54801b495c322bc371787124
    Reviewed-on: http://gerrit.cloudera.org:8080/15145
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <[email protected]>
---
 .../integration-tests/tablet_history_gc-itest.cc   | 110 ++++++++++++++++--
 src/kudu/tablet/delta_tracker.cc                   |  21 ++++
 src/kudu/tablet/delta_tracker.h                    |   6 +
 src/kudu/tablet/deltamemstore.cc                   |   5 +-
 src/kudu/tablet/deltamemstore.h                    |  17 ++-
 src/kudu/tablet/diskrowset.cc                      |  16 +++
 src/kudu/tablet/diskrowset.h                       |   3 +
 src/kudu/tablet/memrowset.h                        |   7 ++
 src/kudu/tablet/mock-rowsets.h                     |  78 +++++++------
 src/kudu/tablet/mt-tablet-test.cc                  |  58 +++++++---
 src/kudu/tablet/rowset.h                           |  14 +++
 src/kudu/tablet/tablet-test-base.h                 |   8 ++
 src/kudu/tablet/tablet.cc                          |  89 +++++++++++++++
 src/kudu/tablet/tablet.h                           |  23 +++-
 src/kudu/tablet/tablet_bootstrap.cc                |  16 ++-
 src/kudu/tablet/tablet_history_gc-test.cc          | 125 ++++++++++++++++-----
 src/kudu/tablet/tablet_metrics.cc                  |  29 +++++
 src/kudu/tablet/tablet_metrics.h                   |   4 +
 src/kudu/tablet/tablet_mm_ops.cc                   |  48 ++++++++
 src/kudu/tablet/tablet_mm_ops.h                    |  38 ++++++-
 src/kudu/tablet/tablet_replica-test.cc             |  59 ++++++++--
 21 files changed, 664 insertions(+), 110 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc 
b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 53a7aae..02928cc 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -71,6 +71,7 @@
 #include "kudu/util/test_util.h"
 
 using kudu::client::KuduClient;
+using kudu::client::KuduInsert;
 using kudu::client::KuduScanner;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
@@ -87,6 +88,7 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_bool(enable_flush_deltamemstores);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(enable_rowset_compaction);
 DECLARE_string(time_source);
@@ -112,6 +114,20 @@ class TabletHistoryGcITest : public MiniClusterITestBase {
     auto* ntp = down_cast<clock::MockNtp*>(clock->time_service());
     ntp->SetMockClockWallTimeForTests(new_time);
   }
+
+  // Inserts to the given default workload table the given number of rows.
+  static Status InsertRowsToTable(KuduTable* table, KuduSession* session,
+                           int start_key, int num_rows) {
+    for (int32_t row_idx = 0; row_idx < num_rows; row_idx++) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      KuduPartialRow* row = insert->mutable_row();
+      RETURN_NOT_OK(row->SetInt32(0, start_key + row_idx));
+      RETURN_NOT_OK(row->SetInt32(1, 0));
+      RETURN_NOT_OK(row->SetString(2, ""));
+      RETURN_NOT_OK(session->Apply(insert.release()));
+    }
+    return session->Flush();
+  }
 };
 
 // Check that attempts to scan prior to the ancient history mark fail.
@@ -183,18 +199,8 @@ TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
   ASSERT_EQ(1, tablet_replicas.size());
   std::shared_ptr<Tablet> tablet = tablet_replicas[0]->shared_tablet();
 
-  const int32_t kNumRows = AllowSlowTests() ? 100 : 10;
-
-  // Insert a few rows.
-  for (int32_t row_key = 0; row_key < kNumRows; row_key++) {
-    unique_ptr<client::KuduInsert> insert(table->NewInsert());
-    KuduPartialRow* row = insert->mutable_row();
-    ASSERT_OK_FAST(row->SetInt32(0, row_key));
-    ASSERT_OK_FAST(row->SetInt32(1, 0));
-    ASSERT_OK_FAST(row->SetString(2, ""));
-    ASSERT_OK_FAST(session->Apply(insert.release()));
-  }
-  ASSERT_OK_FAST(session->Flush());
+  const int32_t kNumRows = 100;
+  ASSERT_OK(InsertRowsToTable(table.get(), session.get(), /*start_key*/0, 
kNumRows));
 
   // Update rows in a loop; wait until some undo deltas are generated.
   int32_t row_value = 0;
@@ -275,6 +281,76 @@ TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
   });
 }
 
+TEST_F(TabletHistoryGcITest, TestDeletedRowsetGc) {
+  // Disable merge compactions, since they may also cull deleted rowsets.
+  FLAGS_enable_rowset_compaction = false;
+  FLAGS_enable_flush_deltamemstores = false; // Don't flush DMS so we don't 
have to init deltafiles.
+  FLAGS_flush_threshold_secs = 0; // Flush as aggressively as possible.
+  FLAGS_maintenance_manager_num_threads = 4; // Encourage concurrency.
+  FLAGS_maintenance_manager_polling_interval_ms = 1; // Spin on MM for a quick 
test.
+  FLAGS_tablet_history_max_age_sec = 1000;
+  FLAGS_time_source = "mock"; // Allow moving the clock.
+  NO_FATALS(StartCluster(1)); // Single-node cluster.
+
+  // Create a tablet.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.Setup();
+  MiniTabletServer* mts = cluster_->mini_tablet_server(0);
+  vector<scoped_refptr<TabletReplica>> tablet_replicas;
+  mts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas);
+  ASSERT_EQ(1, tablet_replicas.size());
+  std::shared_ptr<Tablet> tablet = tablet_replicas[0]->shared_tablet();
+
+  // Insert some rows.
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
+  shared_ptr<KuduSession> session = client_->NewSession();
+  const int32_t kNumRows = 100;
+  ASSERT_OK(InsertRowsToTable(table.get(), session.get(), /*start_key*/0, 
kNumRows));
+
+  // Flush what's in memory to ensure we have at least one DRS.
+  ASSERT_OK(tablet->Flush());
+  ASSERT_GT(tablet->num_rowsets(), 0);
+
+  // Now delete and flush to ensure we have some deltafiles.
+  for (int32_t row_key = 0; row_key < kNumRows; row_key++) {
+    unique_ptr<client::KuduDelete> del(table->NewDelete());
+    KuduPartialRow* row = del->mutable_row();
+    ASSERT_OK(row->SetInt32(0, row_key));
+    ASSERT_OK(session->Apply(del.release()));
+  }
+  ASSERT_OK(session->Flush());
+  uint64_t measured_size_before_gc;
+  
ASSERT_OK(Env::Default()->GetFileSizeOnDiskRecursively(cluster_->GetTabletServerFsRoot(0),
+                                                         
&measured_size_before_gc));
+  // Move forward the clock so our rowsets are all considered ancient.
+  HybridClock* c = down_cast<HybridClock*>(tablet->clock());
+  AddTimeToHybridClock(c, 
MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+
+  // Eventually a deleted rowset GC op will run in the background.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, tablet->num_rowsets());
+  });
+  // We should see a reduction in space.
+  // NOTE: we ASSERT_EVENTUALLY because hole punching is asynchronous and so we
+  // might not see an immediate decrease in file size.
+  ASSERT_EVENTUALLY([&] {
+    uint64_t measured_size_after_gc;
+    
ASSERT_OK(Env::Default()->GetFileSizeOnDiskRecursively(cluster_->GetTabletServerFsRoot(0),
+                                                           
&measured_size_after_gc));
+    ASSERT_LT(measured_size_after_gc, measured_size_before_gc);
+  });
+
+  // With no DRSs, we shouldn't see running ops.
+  ASSERT_EQ(0, 
tablet->metrics()->deleted_rowset_estimated_retained_bytes->value());
+  ASSERT_EQ(0, tablet->metrics()->deleted_rowset_gc_running->value());
+  ASSERT_GT(tablet->metrics()->deleted_rowset_gc_duration->TotalCount(), 0);
+  // NOTE: we could try checking this matches the before/after sizes, but
+  // there's a chance the delete raced with other delta background ops.
+  ASSERT_GT(tablet->metrics()->deleted_rowset_gc_bytes_deleted->value(), 0);
+}
+
 // Whether a MaterializedTestRow is deleted or not.
 enum IsDeleted {
   NOT_DELETED,
@@ -314,6 +390,7 @@ class RandomizedTabletHistoryGcITest : public 
TabletHistoryGcITest {
     kMergeCompaction,
     kRedoDeltaCompaction,
     kUndoDeltaBlockGc,
+    kDeletedRowsetGc,
     kMoveTimeForward,
     kStartScan,
     kNumActions, // Count of items in this enum. Keep as last entry.
@@ -850,6 +927,15 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
                 << " blocks and " << bytes_deleted << " bytes";
         break;
       }
+      case kDeletedRowsetGc: {
+        VLOG(1) << "Running deleted rowset GC";
+        int64_t bytes_to_delete = 0;
+        ASSERT_OK(tablet->GetBytesInAncientDeletedRowsets(&bytes_to_delete));
+        VLOG(1) << Substitute("Found $0 bytes in ancient, deleted rowsets",
+                              bytes_to_delete);
+        ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+        break;
+      }
       case kMoveTimeForward: {
         VLOG(1) << "Moving clock forward";
         AddTimeToHybridClock(clock_, MonoDelta::FromSeconds(200));
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 00d47d7..7b53fc8 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -24,6 +24,7 @@
 #include <string>
 #include <utility>
 
+#include <boost/optional/optional.hpp>
 #include <boost/range/adaptor/reversed.hpp>
 #include <glog/logging.h>
 
@@ -446,6 +447,26 @@ Status DeltaTracker::CompactStores(const IOContext* 
io_context, int start_idx, i
   return Status::OK();
 }
 
+bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) {
+  shared_ptr<DeltaStore> newest_redo;
+  std::lock_guard<rw_spinlock> lock(component_lock_);
+  const boost::optional<Timestamp> dms_highest_timestamp =
+      dms_ ? dms_->highest_timestamp() : boost::none;
+  if (dms_highest_timestamp) {
+    return *dms_highest_timestamp < ancient_history_mark;
+  }
+
+  // If we don't have a DMS or our DMS hasn't been written to at all, look at
+  // the newest redo store.
+  if (!redo_delta_stores_.empty()) {
+    newest_redo = redo_delta_stores_.back();
+  }
+  // TODO(awong): keep the delta stats cached after flushing a DMS so a flush
+  // doesn't invalidate this rowset.
+  return newest_redo && newest_redo->Initted() &&
+      newest_redo->delta_stats().max_timestamp() < ancient_history_mark;
+}
+
 Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
ancient_history_mark,
                                                                  int64_t* 
bytes) {
   DCHECK_NE(Timestamp::kInvalidTimestamp, ancient_history_mark);
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 5ae97a2..b317788 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -175,6 +175,12 @@ class DeltaTracker {
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
ancient_history_mark,
                                                      int64_t* bytes);
 
+  // Returns whether all redo (DMS and newest redo delta file) are ancient
+  // (i.e. that the redo with the highest timestamp is older than the AHM).
+  // This is an estimate, since if the newest redo file has not yet been
+  // initted, this will return a false negative.
+  bool EstimateAllRedosAreAncient(Timestamp ancient_history_mark);
+
   // See RowSet::InitUndoDeltas().
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index b78a108..d6aaddb 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -17,9 +17,9 @@
 
 #include "kudu/tablet/deltamemstore.h"
 
+#include <algorithm>
 #include <memory>
 #include <ostream>
-#include <utility>
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
@@ -74,6 +74,7 @@ DeltaMemStore::DeltaMemStore(int64_t id,
                              shared_ptr<MemTracker> parent_tracker)
   : id_(id),
     rs_id_(rs_id),
+    highest_timestamp_(Timestamp::kMin),
     allocator_(new MemoryTrackingBufferAllocator(
         HeapBufferAllocator::Get(), std::move(parent_tracker))),
     arena_(new ThreadSafeMemoryTrackingArena(kInitialArenaSize, allocator_)),
@@ -123,6 +124,8 @@ Status DeltaMemStore::Update(Timestamp timestamp,
     deleted_row_count_.Increment();
   }
 
+  std::lock_guard<simple_spinlock> l(ts_lock_);
+  highest_timestamp_ = std::max(highest_timestamp_, timestamp);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index e287b68..4a42551 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -19,10 +19,14 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
 #include "kudu/common/rowid.h"
+#include "kudu/common/timestamp.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
@@ -31,6 +35,7 @@
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/status.h"
 
@@ -42,7 +47,6 @@ class MemoryTrackingBufferAllocator;
 class RowChangeList;
 class ScanSpec;
 class SelectionVector;
-class Timestamp;
 struct ColumnId;
 
 namespace consensus {
@@ -148,6 +152,14 @@ class DeltaMemStore : public DeltaStore,
   // Returns the number of deleted rows in this DMS.
   int64_t deleted_row_count() const;
 
+  // Returns the highest timestamp of any updates applied to this DMS. Returns
+  // 'none' if no updates have been applied.
+  boost::optional<Timestamp> highest_timestamp() const {
+    std::lock_guard<simple_spinlock> l(ts_lock_);
+    return highest_timestamp_ == Timestamp::kMin ?
+        boost::none : boost::make_optional(highest_timestamp_);
+  }
+
  private:
   friend class DMSIterator;
 
@@ -163,6 +175,9 @@ class DeltaMemStore : public DeltaStore,
   const int64_t id_;    // DeltaMemStore ID.
   const int64_t rs_id_; // Rowset ID.
 
+  mutable simple_spinlock ts_lock_;
+  Timestamp highest_timestamp_;
+
   std::shared_ptr<MemoryTrackingBufferAllocator> allocator_;
 
   std::shared_ptr<ThreadSafeMemoryTrackingArena> arena_;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index e7c3d59..8a939fb 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -873,6 +873,22 @@ Status 
DiskRowSet::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient
   return 
delta_tracker_->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark,
 bytes);
 }
 
+Status DiskRowSet::IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
+                                            bool* deleted_and_ancient) {
+  uint64_t live_row_count = 0;
+  RETURN_NOT_OK(CountLiveRows(&live_row_count));
+  if (live_row_count > 0) {
+    *deleted_and_ancient = false;
+    return Status::OK();
+  }
+  // NOTE: this estimate might not read from disk and may thus return false
+  // despite having ancient on-disk data. That's sufficient because false
+  // negatives are OK for the purposes of GCing deleted rowsets -- we just
+  // won't delete them.
+  *deleted_and_ancient = 
delta_tracker_->EstimateAllRedosAreAncient(ancient_history_mark);
+  return Status::OK();
+}
+
 Status DiskRowSet::InitUndoDeltas(Timestamp ancient_history_mark,
                                   MonoTime deadline,
                                   const IOContext* io_context,
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 045826b..5e27adf 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -398,6 +398,9 @@ class DiskRowSet : public RowSet {
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
ancient_history_mark,
                                                      int64_t* bytes) override;
 
+  Status IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
+                                  bool* deleted_and_ancient) override;
+
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
                         const fs::IOContext* io_context,
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index ecd8e03..261f411 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -393,6 +393,13 @@ class MemRowSet : public RowSet,
     return Status::OK();
   }
 
+  Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                  bool* deleted_and_ancient) override {
+    DCHECK(deleted_and_ancient);
+    *deleted_and_ancient = false;
+    return Status::OK();
+  }
+
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
                         const fs::IOContext* /*io_context*/,
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index b86153f..b3b8930 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -39,7 +39,7 @@ class MockRowSet : public RowSet {
  public:
   virtual Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
                                  const fs::IOContext* /*io_context*/,
-                                 bool* /*present*/, ProbeStats* /*stats*/) 
const OVERRIDE {
+                                 bool* /*present*/, ProbeStats* /*stats*/) 
const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -49,35 +49,35 @@ class MockRowSet : public RowSet {
                            const consensus::OpId& /*op_id_*/,
                            const fs::IOContext* /*io_context*/,
                            ProbeStats* /*stats*/,
-                           OperationResultPB* /*result*/) OVERRIDE {
+                           OperationResultPB* /*result*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
   virtual Status NewRowIterator(const RowIteratorOptions& /*opts*/,
-                                std::unique_ptr<RowwiseIterator>* /*out*/) 
const OVERRIDE {
+                                std::unique_ptr<RowwiseIterator>* /*out*/) 
const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
   virtual Status NewCompactionInput(const Schema* /*projection*/,
                                     const MvccSnapshot& /*snap*/,
                                     const fs::IOContext* /*io_context*/,
-                                    std::unique_ptr<CompactionInput>* /*out*/) 
const OVERRIDE {
+                                    std::unique_ptr<CompactionInput>* /*out*/) 
const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* 
/*count*/) const OVERRIDE {
+  virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* 
/*count*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountLiveRows(uint64_t* /*count*/) const OVERRIDE {
+  virtual Status CountLiveRows(uint64_t* /*count*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual std::string ToString() const OVERRIDE {
+  virtual std::string ToString() const override {
     LOG(FATAL) << "Unimplemented";
     return "";
   }
-  virtual Status DebugDump(std::vector<std::string>* /*lines*/) OVERRIDE {
+  virtual Status DebugDump(std::vector<std::string>* /*lines*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -85,70 +85,76 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual uint64_t OnDiskSize() const OVERRIDE {
+  virtual uint64_t OnDiskSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataSize() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const 
OVERRIDE {
+  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const 
override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual std::mutex *compact_flush_lock() OVERRIDE {
+  virtual std::mutex *compact_flush_lock() override {
     LOG(FATAL) << "Unimplemented";
-    return NULL;
+    return nullptr;
   }
-  virtual bool has_been_compacted() const OVERRIDE {
+  virtual bool has_been_compacted() const override {
     LOG(FATAL) << "Unimplemented";
     return false;
   }
-  virtual void set_has_been_compacted() OVERRIDE {
+  virtual void set_has_been_compacted() override {
     LOG(FATAL) << "Unimplemented";
   }
-  virtual std::shared_ptr<RowSetMetadata> metadata() OVERRIDE {
-    return NULL;
+  virtual std::shared_ptr<RowSetMetadata> metadata() override {
+    return nullptr;
   }
 
-  virtual size_t DeltaMemStoreSize() const OVERRIDE {
+  virtual size_t DeltaMemStoreSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual bool DeltaMemStoreEmpty() const OVERRIDE {
+  virtual bool DeltaMemStoreEmpty() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual int64_t MinUnflushedLogIndex() const OVERRIDE {
+  virtual int64_t MinUnflushedLogIndex() const override {
     LOG(FATAL) << "Unimplemented";
     return -1;
   }
 
   virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType 
/*type*/)
-      const OVERRIDE {
+      const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) OVERRIDE {
+  virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) 
OVERRIDE {
+  virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) 
override {
+    LOG(FATAL) << "Unimplemented";
+    return Status::OK();
+  }
+
+  virtual Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                          bool* /*deleted_and_ancient*/) 
override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
   virtual Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
/*ancient_history_mark*/,
-                                                             int64_t* 
/*bytes*/) OVERRIDE {
+                                                             int64_t* 
/*bytes*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -157,7 +163,7 @@ class MockRowSet : public RowSet {
                                 MonoTime /*deadline*/,
                                 const fs::IOContext* /*io_context*/,
                                 int64_t* /*delta_blocks_initialized*/,
-                                int64_t* /*bytes_in_ancient_undos*/) OVERRIDE {
+                                int64_t* /*bytes_in_ancient_undos*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -165,12 +171,12 @@ class MockRowSet : public RowSet {
   virtual Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
                                          const fs::IOContext* /*io_context*/,
                                          int64_t* /*blocks_deleted*/,
-                                         int64_t* /*bytes_deleted*/) OVERRIDE {
+                                         int64_t* /*bytes_deleted*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual bool IsAvailableForCompaction() OVERRIDE {
+  virtual bool IsAvailableForCompaction() override {
     return true;
   }
 };
@@ -186,29 +192,29 @@ class MockDiskRowSet : public MockRowSet {
         column_size_(column_size) {}
 
   virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const OVERRIDE {
+                           std::string* max_encoded_key) const override {
     *min_encoded_key = first_key_;
     *max_encoded_key = last_key_;
     return Status::OK();
   }
 
-  virtual uint64_t OnDiskSize() const OVERRIDE {
+  virtual uint64_t OnDiskSize() const override {
     return size_;
   }
 
-  virtual uint64_t OnDiskBaseDataSize() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSize() const override {
     return size_;
   }
 
-  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const 
OVERRIDE {
+  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const 
override {
     return column_size_;
   }
 
-  virtual uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
     return size_;
   }
 
-  virtual std::string ToString() const OVERRIDE {
+  virtual std::string ToString() const override {
     return strings::Substitute("mock[$0, $1]",
                                Slice(first_key_).ToDebugString(),
                                Slice(last_key_).ToDebugString());
@@ -224,8 +230,8 @@ class MockDiskRowSet : public MockRowSet {
 // Mock which acts like a MemRowSet and has no known bounds.
 class MockMemRowSet : public MockRowSet {
  public:
-  virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const OVERRIDE {
+  virtual Status GetBounds(std::string* /*min_encoded_key*/,
+                           std::string* /*max_encoded_key*/) const override {
     return Status::NotSupported("");
   }
 
diff --git a/src/kudu/tablet/mt-tablet-test.cc 
b/src/kudu/tablet/mt-tablet-test.cc
index 2321efe..f40b579 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -37,10 +37,12 @@
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/local_tablet_writer.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet-test-base.h"
+#include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/faststring.h"
@@ -61,6 +63,7 @@ DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' 
reader threads to laun
 DEFINE_int32(num_flush_threads, 1, "Number of flusher reader threads to 
launch");
 DEFINE_int32(num_compact_threads, 1, "Number of compactor threads to launch");
 DEFINE_int32(num_undo_delta_gc_threads, 1, "Number of undo delta gc threads to 
launch");
+DEFINE_int32(num_deleted_rowset_gc_threads, 1, "Number of deleted rowset gc 
threads to launch");
 DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DEFINE_int32(num_flush_delta_threads, 1, "Number of delta flusher reader 
threads to launch");
 DEFINE_int32(num_minor_compact_deltas_threads, 1,
@@ -82,6 +85,10 @@ using std::vector;
 namespace kudu {
 namespace tablet {
 
+namespace {
+const MonoDelta kBackgroundOpInterval = MonoDelta::FromMilliseconds(100);
+} // anonymous namespace
+
 template<class SETUP>
 class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   // Import some names from superclass, since C++ is stingy about
@@ -113,8 +120,7 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
                                    
TabletHarness::Options::ClockType::LOGICAL_CLOCK)
     : TabletTestBase<SETUP>(clock_type),
       running_insert_count_(FLAGS_num_insert_threads),
-      
ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name())
 {
-  }
+      
ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name())
 {}
 
   void InsertThread(int tid) {
     CountDownOnScopeExit dec_count(&running_insert_count_);
@@ -308,13 +314,12 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     }
   }
 
-  void FlushDeltasThread(int tid) {
-    int wait_time = 100;
+  void FlushDeltasThread(int /*tid*/) {
     while (running_insert_count_.count() > 0) {
       CHECK_OK(tablet()->FlushBiggestDMS());
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
@@ -327,30 +332,27 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
   }
 
   void CompactDeltas(RowSet::DeltaCompactionType type) {
-    int wait_time = 100;
     while (running_insert_count_.count() > 0) {
       VLOG(1) << "Compacting worst deltas";
       CHECK_OK(tablet()->CompactWorstDeltas(type));
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
-  void CompactThread(int tid) {
-    int wait_time = 100;
+  void CompactThread(int /*tid*/) {
     while (running_insert_count_.count() > 0) {
       CHECK_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
   void DeleteAncientUndoDeltasThread(int /*tid*/) {
-    int wait_time = 100;
     while (running_insert_count_.count() > 0) {
-      MonoDelta time_budget = MonoDelta::FromMilliseconds(wait_time);
+      MonoDelta time_budget = kBackgroundOpInterval;
       int64_t bytes_in_ancient_undos = 0;
       CHECK_OK(tablet()->InitAncientUndoDeltas(time_budget, 
&bytes_in_ancient_undos));
       VLOG(1) << "Found " << bytes_in_ancient_undos << " bytes of ancient 
delta undos";
@@ -364,10 +366,26 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
       }
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
+  // Thread that looks for rowsets that are ancient and fully deleted, GCing
+  // those that are.
+  void DeleteAncientDeletedRowsetsThreads(int /*tid*/) {
+    do {
+      int64_t bytes_in_ancient_deleted_rowsets = 0;
+      
CHECK_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes_in_ancient_deleted_rowsets));
+      VLOG(1) << Substitute("Found $0 bytes in ancient, fully deleted rowsets",
+                            bytes_in_ancient_deleted_rowsets);
+      if (bytes_in_ancient_deleted_rowsets > 0) {
+        CHECK_OK(tablet()->DeleteAncientDeletedRowsets());
+        LOG(INFO) << Substitute("Deleted $0 bytes found in ancient, fully 
deleted rowsets",
+                                bytes_in_ancient_deleted_rowsets);
+      }
+    } while (!running_insert_count_.WaitFor(kBackgroundOpInterval));
+  }
+
   // Thread which cycles between inserting and deleting a test row, each time
   // with a different value.
   void DeleteAndReinsertCycleThread(int tid) {
@@ -556,18 +574,28 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest, 
UpdateNoMergeCompaction) {
   FLAGS_flusher_initial_frequency_ms = 1;
   FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F;
   FLAGS_tablet_delta_store_minor_compact_max = 10;
+
+  // Start up our background op threads, targeting the creation of delta files.
   this->StartThreads(FLAGS_num_flush_threads,
                      [this](int i) { this->FlushThread(i); });
   this->StartThreads(FLAGS_num_flush_delta_threads,
                      [this](int i) { this->FlushDeltasThread(i); });
   this->StartThreads(FLAGS_num_major_compact_deltas_threads,
                      [this](int i) { this->MajorCompactDeltasThread(i); });
+  this->StartThreads(FLAGS_num_undo_delta_gc_threads,
+                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); 
});
+  this->StartThreads(FLAGS_num_deleted_rowset_gc_threads,
+                     [this](int i) { 
this->DeleteAncientDeletedRowsetsThreads(i); });
+  // Start our workload threads, targeting the creation of deltas that we can
+  // eventually GC.
   this->StartThreads(10,
                      [this](int i) { this->DeleteAndReinsertCycleThread(i); });
   this->StartThreads(10,
                      [this](int i) { this->StubbornlyUpdateSameRowThread(i); 
});
-  this->StartThreads(FLAGS_num_undo_delta_gc_threads,
-                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); 
});
+
+  // For good measure, we'll also start a thread that scans.
+  this->StartThreads(FLAGS_num_summer_threads,
+                     [this](int i) { this->SummerThread(i); });
 
   // Run very quickly in dev builds, longer in slow builds.
   float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 524572a..85c8c99 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -219,6 +219,13 @@ class RowSet {
   // Compact delta stores if more than one.
   virtual Status MinorCompactDeltaStores(const fs::IOContext* io_context) = 0;
 
+  // Returns whether the rowset contains no live rows and is fully ancient (its
+  // newest update is older than 'ancient_history_mark').
+  //
+  // This may return false negatives, but should not return false positives.
+  virtual Status IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
+                                          bool* deleted_and_ancient) = 0;
+
   // Estimate the number of bytes in ancient undo delta stores. This may be an
   // overestimate. The argument 'ancient_history_mark' must be valid (it may
   // not be equal to Timestamp::kInvalidTimestamp).
@@ -474,6 +481,13 @@ class DuplicatingRowSet : public RowSet {
     return Status::OK();
   }
 
+  Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                  bool* deleted_and_ancient) override {
+    DCHECK(deleted_and_ancient);
+    *deleted_and_ancient = false;
+    return Status::OK();
+  }
+
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
                         const fs::IOContext* /*io_context*/,
diff --git a/src/kudu/tablet/tablet-test-base.h 
b/src/kudu/tablet/tablet-test-base.h
index 763b694..90f544e 100644
--- a/src/kudu/tablet/tablet-test-base.h
+++ b/src/kudu/tablet/tablet-test-base.h
@@ -339,6 +339,14 @@ class TabletTestBase : public KuduTabletTest {
     InsertOrUpsertTestRows(RowOperationsPB::UPSERT, first_row, count, val, ts);
   }
 
+  // Deletes 'count' rows, starting with 'first_row'.
+  void DeleteTestRows(int64_t first_row, int64_t count) {
+    LocalTabletWriter writer(tablet().get(), &client_schema_);
+    for (auto i = first_row; i < first_row + count; i++) {
+      CHECK_OK(DeleteTestRow(&writer, i));
+    }
+  }
+
   void InsertOrUpsertTestRows(RowOperationsPB::Type type,
                               int64_t first_row,
                               int64_t count,
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 06661f8..36ac2ff 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1476,6 +1476,14 @@ void Tablet::RegisterMaintenanceOps(MaintenanceManager* 
maint_mgr) {
   maint_mgr->RegisterOp(undo_delta_block_gc_op.get());
   maintenance_ops.push_back(undo_delta_block_gc_op.release());
 
+  // The deleted rowset GC operation relies on live rowset counting. If this
+  // tablet doesn't support such counting, do not register the op.
+  if (metadata_->supports_live_row_count()) {
+    unique_ptr<MaintenanceOp> deleted_rowset_gc_op(new 
DeletedRowsetGCOp(this));
+    maint_mgr->RegisterOp(deleted_rowset_gc_op.get());
+    maintenance_ops.push_back(deleted_rowset_gc_op.release());
+  }
+
   std::lock_guard<simple_spinlock> l(state_lock_);
   maintenance_ops_.swap(maintenance_ops);
 }
@@ -2308,6 +2316,87 @@ Status Tablet::InitAncientUndoDeltas(MonoDelta 
time_budget, int64_t* bytes_in_an
   return Status::OK();
 }
 
+Status Tablet::GetBytesInAncientDeletedRowsets(int64_t* 
bytes_in_ancient_deleted_rowsets) {
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+    VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
+                           "The clock is likely not a hybrid clock";
+    *bytes_in_ancient_deleted_rowsets = 0;
+    return Status::OK();
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+  int64_t bytes = 0;
+  {
+    std::lock_guard<std::mutex> csl(compact_select_lock_);
+    for (const auto& rowset : comps->rowsets->all_rowsets()) {
+      if (!rowset->IsAvailableForCompaction()) {
+        continue;
+      }
+      bool deleted_and_ancient = false;
+      RETURN_NOT_OK(rowset->IsDeletedAndFullyAncient(ancient_history_mark, 
&deleted_and_ancient));
+      if (deleted_and_ancient) {
+        bytes += rowset->OnDiskSize();
+      }
+    }
+  }
+  metrics_->deleted_rowset_estimated_retained_bytes->set_value(bytes);
+  *bytes_in_ancient_deleted_rowsets = bytes;
+  return Status::OK();
+}
+
+Status Tablet::DeleteAncientDeletedRowsets() {
+  RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
+  const MonoTime start_time = MonoTime::Now();
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+    VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
+                           "The clock is likely not a hybrid clock";
+    return Status::OK();
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+
+  // We'll take our the rowsets' locks to ensure we don't GC the rowsets while
+  // they're being compacted.
+  RowSetVector to_delete;
+  int num_unavailable_for_delete = 0;
+  vector<std::unique_lock<std::mutex>> rowset_locks;
+  int64_t bytes_deleted = 0;
+  {
+    std::lock_guard<std::mutex> csl(compact_select_lock_);
+    for (const auto& rowset : comps->rowsets->all_rowsets()) {
+      // Check if this rowset has been locked by a compaction. If so, we
+      // shouldn't attempt to delete it.
+      if (!rowset->IsAvailableForCompaction()) {
+        num_unavailable_for_delete++;
+        continue;
+      }
+      bool deleted_and_empty = false;
+      RETURN_NOT_OK(rowset->IsDeletedAndFullyAncient(ancient_history_mark, 
&deleted_and_empty));
+      if (deleted_and_empty) {
+        // If we intend on deleting the rowset, take its lock so concurrent
+        // compactions don't try to select it for compactions.
+        std::unique_lock<std::mutex> l(*rowset->compact_flush_lock(), 
std::try_to_lock);
+        CHECK(l.owns_lock());
+        to_delete.emplace_back(rowset);
+        rowset_locks.emplace_back(std::move(l));
+        bytes_deleted += rowset->OnDiskSize();
+      }
+    }
+  }
+  if (to_delete.empty()) {
+    return Status::OK();
+  }
+  RETURN_NOT_OK(HandleEmptyCompactionOrFlush(
+      to_delete, TabletMetadata::kNoMrsFlushed));
+  metrics_->deleted_rowset_gc_bytes_deleted->IncrementBy(bytes_deleted);
+  metrics_->deleted_rowset_gc_duration->Increment((MonoTime::Now() - 
start_time).ToMilliseconds());
+  return Status::OK();
+}
+
 Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* 
bytes_deleted) {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   MonoTime tablet_delete_start = MonoTime::Now();
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 6950cc0..6177420 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -333,10 +333,31 @@ class Tablet {
   // Find and delete all undo delta blocks that have a maximum op timestamp
   // prior to the current ancient history mark. If this method returns OK, the
   // number of blocks and bytes deleted are returned in the out-parameters.
+  //
+  // Returns an error if the metadata update fails. Upon failure, no in-memory
+  // state is change.
   Status DeleteAncientUndoDeltas(int64_t* blocks_deleted = nullptr,
                                  int64_t* bytes_deleted = nullptr);
 
-  // Count the number of deltas in the tablet. Only used for tests.
+  // Returns the number of bytes potentially used by rowsets that have no live
+  // rows and are entirely ancient.
+  //
+  // These checks may not touch on-disk block data if we can determine from the
+  // live row count that the rowsets aren't fully deleted, or from the DMS that
+  // the latest update is not considered ancient. If there is no DMS, looks at
+  // the newest redo but doesn't initialize it. As such, since we may miss out
+  // on counting rowsets we haven't initialized yet, this may be an
+  // underestimate.
+  Status GetBytesInAncientDeletedRowsets(int64_t* 
bytes_in_ancient_deleted_rowsets);
+
+  // Finds and GCs all fully deleted rowsets that have a maximum op timestamp
+  // prior to the current ancient history mark.
+  //
+  // Returns an error if the metadata update fails. Upon failure, no in-memory
+  // state is change.
+  Status DeleteAncientDeletedRowsets();
+
+  // Counts the number of deltas in the tablet. Only used for tests.
   int64_t CountUndoDeltasForTests() const;
   int64_t CountRedoDeltasForTests() const;
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc 
b/src/kudu/tablet/tablet_bootstrap.cc
index b4d980f..32dedbe 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1706,13 +1706,17 @@ bool FlushedStoresSnapshot::IsMemStoreActive(const 
MemStoreTargetPB& target) con
       // If we have no data about this DRS, then there are two cases:
       //
       // 1) The DRS has already been flushed, but then later got removed 
because
-      // it got compacted away. Since it was flushed, we don't need to replay 
it.
+      //    it got compacted away or culled because it was empty. In the former
+      //    case, the deltas should have been reflected in the new compaction
+      //    output, and in the latter, all the rows in the rowset have been
+      //    deleted and there's nothing to replay.
       //
-      // 2) The DRS was in the process of being written, but haven't yet 
flushed the
-      // TabletMetadata update that includes it. We only write to an 
in-progress DRS like
-      // this when we are in the 'duplicating' phase of a compaction. In that 
case,
-      // the other duplicated 'target' should still be present in the 
metadata, and we
-      // can base our decision based on that one.
+      // 2) The DRS was in the process of being written, but haven't yet
+      //    flushed the TabletMetadata update that includes it. We only write
+      //    to an in-progress DRS like this when we are in the 'duplicating'
+      //    phase of a compaction. In that case, the other duplicated 'target'
+      //    should still be present in the metadata, and we can base our
+      //    decision based on that one.
       return false;
     }
 
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc 
b/src/kudu/tablet/tablet_history_gc-test.cc
index 4969900..b5fa40b 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -97,8 +98,17 @@ class TabletHistoryGcTest : public 
TabletTestBase<IntKeyTestSetup<INT64>> {
     NO_FLUSH
   };
 
+  // Helper functions that mutate rows in batches of keys:
+  //   [0, rows_per_rowset)
+  //   [rows_per_rowset, 2*rows_per_rowset)
+  //   ...
+  //   [(num_rowsets - 1)*rows_per_rowset, num_rowsets*rows_per_rowset)
+  //
+  // ...flushing MRS or DMS (depending on the workload) in between batches.
   void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset);
   void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, 
int32_t val);
+  void DeleteOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, bool 
flush_dms);
+
   void AddTimeToHybridClock(MonoDelta delta) {
     uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
     uint64_t new_time = now + delta.ToMicroseconds();
@@ -107,29 +117,42 @@ class TabletHistoryGcTest : public 
TabletTestBase<IntKeyTestSetup<INT64>> {
   // Specify row regex to match on. Empty string means don't match anything.
   void VerifyDebugDumpRowsMatch(const string& pattern) const;
 
-  int64_t TotalNumRows() const { return num_rowsets_ * rows_per_rowset_; }
+  // Returns the total number of rows there should be after inserting rows.
+  int64_t TotalNumRows() const { return kNumRowsets * rows_per_rowset_; }
 
-  TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) {
-    return [=](int32_t key, int32_t val) -> bool { return val == expected_val; 
};
+  // Returns a functor that returns whether all rows have 'expected_val' for
+  // their values.
+  static TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) {
+    return [=](int32_t /*key*/, int32_t val) -> bool { return val == 
expected_val; };
   }
-
   const TestRowVerifier kRowsEqual0 = GenRowsEqualVerifier(0);
   const TestRowVerifier kRowsEqual1 = GenRowsEqualVerifier(1);
   const TestRowVerifier kRowsEqual2 = GenRowsEqualVerifier(2);
 
   const int kStartRow = 0;
-  int num_rowsets_ = 3;
+  const int kNumRowsets = 3;
   int64_t rows_per_rowset_ = 300;
 };
 
 void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t 
rows_per_rowset) {
   for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
-    InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0);
+    InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, /*val*/0);
     ASSERT_OK(tablet()->Flush());
   }
   ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
+void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_rowsets,
+    int64_t rows_per_rowset, bool flush_dms) {
+  for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+    NO_FATALS(DeleteTestRows(rowset_id * rows_per_rowset, rows_per_rowset));
+    if (flush_dms) {
+      ASSERT_OK(tablet()->FlushAllDMSForTests());
+    }
+  }
+  ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+}
+
 void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t 
rows_per_rowset,
                                              int32_t val) {
   for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
@@ -155,7 +178,7 @@ void TabletHistoryGcTest::VerifyDebugDumpRowsMatch(const 
string& pattern) const
 TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
   FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual0));
   Timestamp time_after_insert = clock()->Now();
 
@@ -169,7 +192,7 @@ TEST_F(TabletHistoryGcTest, 
TestNoGenerateUndoOnMajorDeltaCompaction) {
       ASSERT_OK(UpdateTestRow(&writer, row_idx, val));
     }
     // We must flush the DMS before major compaction can operate on these 
REDOs.
-    for (int i = 0; i < num_rowsets_; i++) {
+    for (int i = 0; i < kNumRowsets; i++) {
       tablet()->FlushBiggestDMS();
     }
     post_update_ts[val - 1] = clock()->Now();
@@ -189,7 +212,7 @@ TEST_F(TabletHistoryGcTest, 
TestNoGenerateUndoOnMajorDeltaCompaction) {
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual2));
 
   // Run major delta compaction.
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
   }
 
@@ -209,10 +232,9 @@ TEST_F(TabletHistoryGcTest, 
TestNoGenerateUndoOnMajorDeltaCompaction) {
 TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
   FLAGS_tablet_history_max_age_sec = 100;
 
-  num_rowsets_ = 3;
   rows_per_rowset_ = 20;
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual0));
 
   LocalTabletWriter writer(tablet().get(), &client_schema_);
@@ -223,7 +245,7 @@ TEST_F(TabletHistoryGcTest, 
TestMajorDeltaCompactionOnSubsetOfColumns) {
     ASSERT_OK_FAST(row.SetInt32(2, 2));
     ASSERT_OK_FAST(writer.Update(row));
   }
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     tablet()->FlushBiggestDMS();
   }
 
@@ -231,7 +253,7 @@ TEST_F(TabletHistoryGcTest, 
TestMajorDeltaCompactionOnSubsetOfColumns) {
 
   vector<std::shared_ptr<RowSet>> rowsets;
   tablet()->GetRowSetsForTests(&rowsets);
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     DiskRowSet* drs = down_cast<DiskRowSet*>(rowsets[i].get());
     vector<ColumnId> col_ids_to_compact = { schema_.column_id(2) };
     ASSERT_OK(drs->MajorCompactDeltaStoresWithColumnIds(col_ids_to_compact, 
nullptr,
@@ -306,7 +328,7 @@ TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) {
   FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
 
   Timestamp time_before_insert = clock()->Now();
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual0));
 
   // The earliest thing we can see is an empty tablet.
@@ -327,7 +349,7 @@ TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) {
 TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) {
   FLAGS_tablet_history_max_age_sec = 100; // Keep history for 100 seconds.
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual0));
 
   Timestamp prev_time = clock()->Now();
@@ -366,7 +388,7 @@ TEST_F(TabletHistoryGcTest, 
TestRowRemovalGCOnMergeCompaction) {
 TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) {
   FLAGS_tablet_history_max_age_sec = 1000; // 1000 seconds before we GC 
history.
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
 
   Timestamp prev_time = clock()->Now();
   NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2)));
@@ -383,7 +405,7 @@ TEST_F(TabletHistoryGcTest, 
TestNoUndoGCUntilAncientHistoryMark) {
   NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), 
prev_time,
                                                    kRowsEqual0));
 
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
     ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
   }
@@ -429,11 +451,10 @@ TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) {
 // non-GCed rows in each rowset.
 TEST_F(TabletHistoryGcTest, TestGcOnAlternatingRows) {
   FLAGS_tablet_history_max_age_sec = 100;
-  num_rowsets_ = 3;
   rows_per_rowset_ = 5;
 
   LocalTabletWriter writer(tablet().get(), &client_schema_);
-  for (int rowset_id = 0; rowset_id < num_rowsets_; rowset_id++) {
+  for (int rowset_id = 0; rowset_id < kNumRowsets; rowset_id++) {
     for (int i = 0; i < rows_per_rowset_; i++) {
       int32_t row_key = rowset_id * rows_per_rowset_ + i;
       ASSERT_OK(InsertTestRow(&writer, row_key, 0));
@@ -580,22 +601,22 @@ class TabletHistoryGcNoMaintMgrTest : public 
TabletHistoryGcTest {
 TEST_F(TabletHistoryGcNoMaintMgrTest, TestUndoDeltaBlockGc) {
   FLAGS_tablet_history_max_age_sec = 1000;
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
-  ASSERT_EQ(num_rowsets_, tablet()->CountUndoDeltasForTests());
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests());
 
   // Generate a bunch of redo deltas and then compact them into undo deltas.
   constexpr int kNumMutationsPerRow = 5;
   for (int i = 0; i < kNumMutationsPerRow; i++) {
     SCOPED_TRACE(i);
-    ASSERT_EQ((i + 1) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+    ASSERT_EQ((i + 1) * kNumRowsets, tablet()->CountUndoDeltasForTests());
     NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(1)));
-    NO_FATALS(UpdateOriginalRows(num_rowsets_, rows_per_rowset_, i));
+    NO_FATALS(UpdateOriginalRows(kNumRowsets, rows_per_rowset_, i));
     ASSERT_OK(tablet()->MajorCompactAllDeltaStoresForTests());
-    ASSERT_EQ((i + 2) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+    ASSERT_EQ((i + 2) * kNumRowsets, tablet()->CountUndoDeltasForTests());
   }
 
   ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
-  const int expected_undo_blocks = (kNumMutationsPerRow + 1) * num_rowsets_;
+  const int expected_undo_blocks = (kNumMutationsPerRow + 1) * kNumRowsets;
   ASSERT_EQ(expected_undo_blocks, tablet()->CountUndoDeltasForTests());
 
   // There will be uninitialized undos so we will estimate that there may be
@@ -639,5 +660,59 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, 
TestUndoDeltaBlockGc) {
   ASSERT_EQ(1, 
tablet()->metrics()->undo_delta_block_gc_delete_duration->TotalCount());
 }
 
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests());
+
+  NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, 
/*flush_dms*/true));
+  ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
+
+  // TODO(awong): keep the delta stats cached after flushing a DMS so we don't
+  // have to scan to read stats.
+  NO_FATALS(VerifyTestRows(0, 0));
+
+  // We shouldn't have any ancient rowsets since we haven't passed the AHM.
+  int64_t bytes = 0;
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_EQ(0, bytes);
+
+  // Try to delete ancient deleted rowsets. This should effectively no-op.
+  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests());
+  ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_EQ(0, bytes);
+  const auto* metrics = tablet()->metrics();
+  ASSERT_EQ(0, metrics->deleted_rowset_gc_bytes_deleted->value());
+  ASSERT_EQ(0, metrics->deleted_rowset_gc_duration->TotalCount());
+
+  // Move the clock so all rowsets are ancient. Our GC should succeed and we
+  // should be left with no rowsets.
+  
NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec
 + 1)));
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_LT(0, bytes);
+  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(0, tablet()->CountUndoDeltasForTests());
+  ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
+
+  // Check that these show up.
+  ASSERT_EQ(bytes, metrics->deleted_rowset_gc_bytes_deleted->value());
+  ASSERT_EQ(1, metrics->deleted_rowset_gc_duration->TotalCount());
+}
+
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, 
/*flush_dms*/false));
+  
NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec
 + 1)));
+  int64_t bytes = 0;
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_LT(0, bytes);
+  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(bytes, 
tablet()->metrics()->deleted_rowset_gc_bytes_deleted->value());
+  ASSERT_EQ(1, tablet()->metrics()->deleted_rowset_gc_duration->TotalCount());
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/tablet_metrics.cc 
b/src/kudu/tablet/tablet_metrics.cc
index a135f8d..a8cccbc 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -151,6 +151,12 @@ METRIC_DEFINE_counter(tablet, 
undo_delta_block_gc_bytes_deleted,
                       "Does not include bytes garbage collected during 
compactions.",
                       kudu::MetricLevel::kDebug);
 
+METRIC_DEFINE_counter(tablet, deleted_rowset_gc_bytes_deleted,
+                      "Deleted Rowsets GC Bytes Deleted",
+                      kudu::MetricUnit::kBytes,
+                      "Number of bytes deleted by garbage-collecting deleted 
rowsets.",
+                      kudu::MetricLevel::kDebug);
+
 METRIC_DEFINE_histogram(tablet, bloom_lookups_per_op, "Bloom Lookups per 
Operation",
                         kudu::MetricUnit::kProbes,
                         "Tracks the number of bloom filter lookups performed 
by each "
@@ -249,6 +255,18 @@ METRIC_DEFINE_gauge_int64(tablet, 
undo_delta_block_estimated_retained_bytes,
   "May be an overestimate.",
   kudu::MetricLevel::kDebug);
 
+METRIC_DEFINE_gauge_uint32(tablet, deleted_rowset_gc_running,
+  "Deleted Rowset GC Running",
+  kudu::MetricUnit::kMaintenanceOperations,
+  "Number of deleted rowset GC operations currently running.",
+  kudu::MetricLevel::kDebug);
+
+METRIC_DEFINE_gauge_int64(tablet, deleted_rowset_estimated_retained_bytes,
+  "Estimated Deletable Bytes Retained in Deleted Rowsets",
+  kudu::MetricUnit::kBytes,
+  "Estimated bytes of deletable data in deleted rowsets for this tablet.",
+  kudu::MetricLevel::kDebug);
+
 METRIC_DEFINE_histogram(tablet, flush_dms_duration,
   "DeltaMemStore Flush Duration",
   kudu::MetricUnit::kMilliseconds,
@@ -305,6 +323,13 @@ METRIC_DEFINE_histogram(tablet, 
undo_delta_block_gc_perform_duration,
   kudu::MetricLevel::kInfo,
   60000LU, 1);
 
+METRIC_DEFINE_histogram(tablet, deleted_rowset_gc_duration,
+  "Deleted Rowset GC Duration",
+  kudu::MetricUnit::kMilliseconds,
+  "Time spent running the maintenance operation to GC deleted rowsets.",
+  kudu::MetricLevel::kInfo,
+  60000LU, 1);
+
 METRIC_DEFINE_counter(tablet, leader_memory_pressure_rejections,
   "Leader Memory Pressure Rejections",
   kudu::MetricUnit::kRequests,
@@ -347,6 +372,7 @@ TabletMetrics::TabletMetrics(const 
scoped_refptr<MetricEntity>& entity)
     MINIT(delta_file_lookups),
     MINIT(mrs_lookups),
     MINIT(bytes_flushed),
+    MINIT(deleted_rowset_gc_bytes_deleted),
     MINIT(undo_delta_block_gc_bytes_deleted),
     MINIT(bloom_lookups_per_op),
     MINIT(key_file_lookups_per_op),
@@ -358,6 +384,8 @@ TabletMetrics::TabletMetrics(const 
scoped_refptr<MetricEntity>& entity)
     GINIT(flush_dms_running),
     GINIT(flush_mrs_running),
     GINIT(compact_rs_running),
+    GINIT(deleted_rowset_estimated_retained_bytes),
+    GINIT(deleted_rowset_gc_running),
     GINIT(delta_minor_compact_rs_running),
     GINIT(delta_major_compact_rs_running),
     GINIT(undo_delta_block_gc_running),
@@ -365,6 +393,7 @@ TabletMetrics::TabletMetrics(const 
scoped_refptr<MetricEntity>& entity)
     MINIT(flush_dms_duration),
     MINIT(flush_mrs_duration),
     MINIT(compact_rs_duration),
+    MINIT(deleted_rowset_gc_duration),
     MINIT(delta_minor_compact_rs_duration),
     MINIT(delta_major_compact_rs_duration),
     MINIT(undo_delta_block_gc_init_duration),
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 7dab1ea..1975779 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -70,6 +70,7 @@ struct TabletMetrics {
 
   // Operation stats.
   scoped_refptr<Counter> bytes_flushed;
+  scoped_refptr<Counter> deleted_rowset_gc_bytes_deleted;
   scoped_refptr<Counter> undo_delta_block_gc_bytes_deleted;
 
   scoped_refptr<Histogram> bloom_lookups_per_op;
@@ -84,6 +85,8 @@ struct TabletMetrics {
   scoped_refptr<AtomicGauge<uint32_t> > flush_dms_running;
   scoped_refptr<AtomicGauge<uint32_t> > flush_mrs_running;
   scoped_refptr<AtomicGauge<uint32_t> > compact_rs_running;
+  scoped_refptr<AtomicGauge<int64_t> > deleted_rowset_estimated_retained_bytes;
+  scoped_refptr<AtomicGauge<uint32_t> > deleted_rowset_gc_running;
   scoped_refptr<AtomicGauge<uint32_t> > delta_minor_compact_rs_running;
   scoped_refptr<AtomicGauge<uint32_t> > delta_major_compact_rs_running;
   scoped_refptr<AtomicGauge<uint32_t> > undo_delta_block_gc_running;
@@ -92,6 +95,7 @@ struct TabletMetrics {
   scoped_refptr<Histogram> flush_dms_duration;
   scoped_refptr<Histogram> flush_mrs_duration;
   scoped_refptr<Histogram> compact_rs_duration;
+  scoped_refptr<Histogram> deleted_rowset_gc_duration;
   scoped_refptr<Histogram> delta_minor_compact_rs_duration;
   scoped_refptr<Histogram> delta_major_compact_rs_duration;
   scoped_refptr<Histogram> undo_delta_block_gc_init_duration;
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 849d74a..c0f3caf 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -76,6 +76,13 @@ DEFINE_bool(enable_undo_delta_block_gc, true,
 TAG_FLAG(enable_undo_delta_block_gc, runtime);
 TAG_FLAG(enable_undo_delta_block_gc, unsafe);
 
+DEFINE_bool(enable_deleted_rowset_gc, true,
+    "Whether to enable garbage collection of fully deleted rowsets. Disabling "
+    "deleted rowset garbage collection may increase disk space usage for 
workloads "
+    "that involve a high number of deletes. Only deleted rowsets that are 
entirely "
+    "considered ancient history (see --tablet_history_max_age_sec) are 
deleted.");
+TAG_FLAG(enable_deleted_rowset_gc, runtime);
+
 using std::string;
 using strings::Substitute;
 
@@ -387,5 +394,46 @@ std::string UndoDeltaBlockGCOp::LogPrefix() const {
   return tablet_->LogPrefix();
 }
 
+DeletedRowsetGCOp::DeletedRowsetGCOp(Tablet* tablet)
+    : TabletOpBase(Substitute("DeletedRowSetGCOp($0)", tablet->tablet_id()),
+                   MaintenanceOp::HIGH_IO_USAGE, tablet),
+      running_(false) {
+}
+
+void DeletedRowsetGCOp::UpdateStats(MaintenanceOpStats* stats) {
+  if (!FLAGS_enable_deleted_rowset_gc) {
+    stats->set_runnable(false);
+    return;
+  }
+  if (running_.load()) {
+    VLOG(1) << LogPrefix() << " not updating stats: already running";
+    stats->set_runnable(false);
+    return;
+  }
+  int64_t estimated_retained_bytes = 0;
+  
WARN_NOT_OK(tablet_->GetBytesInAncientDeletedRowsets(&estimated_retained_bytes),
+              "Unable to count bytes in ancient, deleted rowsets");
+  stats->set_data_retained_bytes(estimated_retained_bytes);
+  stats->set_runnable(estimated_retained_bytes > 0);
+}
+
+void DeletedRowsetGCOp::Perform() {
+  WARN_NOT_OK(tablet_->DeleteAncientDeletedRowsets(),
+      Substitute("$0GC of deleted rowsets failed", LogPrefix()));
+  running_.store(false);
+}
+
+scoped_refptr<Histogram> DeletedRowsetGCOp::DurationHistogram() const {
+  return tablet_->metrics()->deleted_rowset_gc_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t>> DeletedRowsetGCOp::RunningGauge() const {
+  return tablet_->metrics()->deleted_rowset_gc_running;
+}
+
+std::string DeletedRowsetGCOp::LogPrefix() const {
+  return tablet_->LogPrefix();
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index e85dcf0..114e551 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -14,10 +14,9 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#pragma once
 
-#ifndef KUDU_TABLET_TABLET_MM_OPS_H_
-#define KUDU_TABLET_TABLET_MM_OPS_H_
-
+#include <atomic>
 #include <cstdint>
 #include <string>
 
@@ -159,8 +158,39 @@ class UndoDeltaBlockGCOp : public TabletOpBase {
   DISALLOW_COPY_AND_ASSIGN(UndoDeltaBlockGCOp);
 };
 
+// MaintenanceOp to garbage-collect entire rowsets that are fully deleted and
+// older than the ancient history mark.
+class DeletedRowsetGCOp : public TabletOpBase {
+ public:
+  explicit DeletedRowsetGCOp(Tablet* tablet);
+
+  // Estimate the number of bytes from rowsets that have been fully deleted and
+  // exist entirely before the AHM (i.e. their most recent update happened
+  // before the AHM).
+  void UpdateStats(MaintenanceOpStats* stats) override;
+
+  // If this op is already running, we shouldn't run it again.
+  bool Prepare() override {
+    bool false_ref = false;
+    return running_.compare_exchange_strong(false_ref, true);
+  }
+
+  // Deletes ancient deleted rowsets from disk.
+  void Perform() override;
+
+  // Metrics for this op.
+  scoped_refptr<Histogram> DurationHistogram() const override;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
+ private:
+  std::string LogPrefix() const;
+
+  // Used to ensure only a single instance of this op is scheduled per tablet
+  // at a time.
+  std::atomic<bool> running_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeletedRowsetGCOp);
+};
 
 } // namespace tablet
 } // namespace kudu
 
-#endif /* KUDU_TABLET_TABLET_MM_OPS_H_ */
diff --git a/src/kudu/tablet/tablet_replica-test.cc 
b/src/kudu/tablet/tablet_replica-test.cc
index d805735..c4b88d3 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -52,6 +52,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
+#include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
@@ -76,7 +77,9 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/threadpool.h"
 
+DECLARE_bool(enable_maintenance_manager);
 DECLARE_int32(flush_threshold_mb);
+DECLARE_int32(tablet_history_max_age_sec);
 
 METRIC_DECLARE_entity(tablet);
 
@@ -115,7 +118,8 @@ static Schema GetTestSchema() {
 class TabletReplicaTest : public KuduTabletTest {
  public:
   TabletReplicaTest()
-      : KuduTabletTest(GetTestSchema()),
+      : KuduTabletTest(GetTestSchema(),
+                       TabletHarness::Options::ClockType::HYBRID_CLOCK),
         insert_counter_(0),
         delete_counter_(0),
         dns_resolver_(new DnsResolver) {
@@ -341,22 +345,20 @@ class TabletReplicaTest : public KuduTabletTest {
   // Execute insert requests and roll log after each one.
   Status ExecuteInsertsAndRollLogs(int num_inserts) {
     for (int i = 0; i < num_inserts; i++) {
-      unique_ptr<WriteRequestPB> req(new WriteRequestPB());
-      RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), 
req.get()));
-      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+      WriteRequestPB req;
+      RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), &req));
+      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), req));
     }
-
     return Status::OK();
   }
 
   // Execute delete requests and roll log after each one.
   Status ExecuteDeletesAndRollLogs(int num_deletes) {
     for (int i = 0; i < num_deletes; i++) {
-      unique_ptr<WriteRequestPB> req(new WriteRequestPB());
-      CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
-      CHECK_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+      WriteRequestPB req;
+      RETURN_NOT_OK(GenerateSequentialDeleteRequest(&req));
+      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), req));
     }
-
     return Status::OK();
   }
 
@@ -797,5 +799,44 @@ TEST_F(TabletReplicaTest, TestLiveRowCountMetric) {
   ASSERT_EQ(kNumInsert - kNumDelete, live_row_count->value());
 }
 
+TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
+  FLAGS_enable_maintenance_manager = false;
+  FLAGS_tablet_history_max_age_sec = 1;
+  const int kNumRows = 10;
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  auto* tablet = tablet_replica_->tablet();
+  // Metrics are already registered so pass a dummy lambda.
+  auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
+      tablet->GetMetricEntity(), [] () { return 0; });
+
+  // Insert some rows and flush so we get a DRS.
+  ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
+  ASSERT_OK(tablet->Flush());
+  ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
+  ASSERT_EQ(1, tablet->num_rowsets());
+  ASSERT_EQ(0, live_row_count->value());
+  SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+
+  // Insert some fresh rows so we can validate that we don't GC everything.
+  ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
+  ASSERT_OK(tablet->Flush());
+  ASSERT_EQ(2, tablet->num_rowsets());
+  ASSERT_EQ(kNumRows, live_row_count->value());
+
+  // Now GC what we can. The first rowset should be gone.
+  ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(1, tablet->num_rowsets());
+  ASSERT_EQ(kNumRows, live_row_count->value());
+
+  // Restart and ensure we can get online okay.
+  NO_FATALS(RestartReplica());
+  tablet = tablet_replica_->tablet();
+  ASSERT_EQ(1, tablet->num_rowsets());
+  live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
+      tablet->GetMetricEntity(), [] () { return 0; });
+  ASSERT_EQ(kNumRows, live_row_count->value());
+}
+
 } // namespace tablet
 } // namespace kudu

Reply via email to