This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6cf7b12 [compaction/flush] Cleanup of compaction and flush code paths
ec6cf7b12 is described below

commit ec6cf7b12047f8a30884a5c4aa6c1dbdb6aabad6
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Nov 21 20:16:20 2023 +0530

    [compaction/flush] Cleanup of compaction and flush code paths
    
    The change tries to add more comments, rename methods for better
    code readability and understanding. It mainly covers two codepaths:
    Memrowset flush and RowsetCompaction.
    
    Change-Id: Ic1c2f20230aa089baf34f418a59b6c97f7351217
    Reviewed-on: http://gerrit.cloudera.org:8080/20720
    Tested-by: Kudu Jenkins
    Reviewed-by: Attila Bukor <[email protected]>
---
 src/kudu/tablet/compaction-test.cc | 118 +++++++++++++++++++------------------
 src/kudu/tablet/compaction.cc      |  76 ++++++++++++++----------
 src/kudu/tablet/compaction.h       |  49 +++++++--------
 src/kudu/tablet/diskrowset.cc      |   8 +--
 src/kudu/tablet/diskrowset.h       |   6 +-
 src/kudu/tablet/memrowset.cc       |   4 +-
 src/kudu/tablet/memrowset.h        |   6 +-
 src/kudu/tablet/mock-rowsets.h     |   2 +-
 src/kudu/tablet/rowset.cc          |   2 +-
 src/kudu/tablet/rowset.h           |   6 +-
 src/kudu/tablet/tablet.cc          |  74 +++++++++++++++--------
 src/kudu/tablet/tablet.h           |  11 ++--
 12 files changed, 207 insertions(+), 155 deletions(-)

diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 83ea39e23..7abc0b4ca 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -302,15 +302,15 @@ class TestCompaction : public KuduRowSetTest {
 
   // Iterate over the given compaction input, stringifying and dumping each
   // yielded row to *out
-  static void IterateInput(CompactionInput* input, vector<string>* out) {
+  static void IterateInput(CompactionOrFlushInput* input, vector<string>* out) 
{
     ASSERT_OK(DebugDumpCompactionInput(input, nullptr, out));
   }
 
-  // Flush the given CompactionInput 'input' to disk with the given snapshot.
+  // Flush the given CompactionOrFlushInput 'input' to disk with the given 
snapshot.
   // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and 
appends
   // them to the vector.
   Status DoFlushAndReopen(
-      CompactionInput* input,
+      CompactionOrFlushInput* input,
       const Schema& projection,
       const MvccSnapshot& snap,
       size_t roll_threshold,
@@ -352,14 +352,14 @@ class TestCompaction : public KuduRowSetTest {
   static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
                                      const vector<shared_ptr<DiskRowSet>>& 
rowsets,
                                      const Schema& projection,
-                                     unique_ptr<CompactionInput>* out) {
-    vector<shared_ptr<CompactionInput>> merge_inputs;
+                                     unique_ptr<CompactionOrFlushInput>* out) {
+    vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
     for (const auto& rs : rowsets) {
-      unique_ptr<CompactionInput> input;
-      RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, 
nullptr, &input));
+      unique_ptr<CompactionOrFlushInput> input;
+      RETURN_NOT_OK(CompactionOrFlushInput::Create(*rs, &projection, 
merge_snap, nullptr, &input));
       merge_inputs.emplace_back(input.release());
     }
-    out->reset(CompactionInput::Merge(merge_inputs, &projection));
+    out->reset(CompactionOrFlushInput::Merge(merge_inputs, &projection));
     return Status::OK();
   }
 
@@ -371,7 +371,7 @@ class TestCompaction : public KuduRowSetTest {
                           size_t roll_threshold,
                           vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot merge_snap(mvcc_);
-    unique_ptr<CompactionInput> compact_input;
+    unique_ptr<CompactionOrFlushInput> compact_input;
     RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, 
&compact_input));
     return DoFlushAndReopen(
         compact_input.get(), projection, merge_snap, roll_threshold, 
result_rowsets);
@@ -396,7 +396,9 @@ class TestCompaction : public KuduRowSetTest {
                            vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot snap(mvcc_);
     vector<shared_ptr<RowSetMetadata>> rowset_metas;
-    unique_ptr<CompactionInput> input(CompactionInput::Create(mrs, 
&projection, snap));
+    unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Create(mrs,
+                                                                            
&projection,
+                                                                            
snap));
     return DoFlushAndReopen(input.get(), projection, snap, roll_threshold, 
result_rowsets);
   }
 
@@ -531,7 +533,7 @@ class TestCompaction : public KuduRowSetTest {
     LOG_TIMING(INFO, "compacting " +
                std::string((OVERLAP_INPUTS ? "with overlap" : "without 
overlap"))) {
       MvccSnapshot merge_snap(mvcc_);
-      unique_ptr<CompactionInput> compact_input;
+      unique_ptr<CompactionOrFlushInput> compact_input;
       ASSERT_OK(BuildCompactionInput(merge_snap, rowsets, schema_, 
&compact_input));
       // Use a low target row size to increase the number of resulting rowsets.
       RollingDiskRowSetWriter rdrsw(tablet()->metadata(), schema_,
@@ -579,7 +581,7 @@ TEST_F(TestCompaction, TestMemRowSetInput) {
   // and mutations.
   vector<string> out;
   MvccSnapshot snap(mvcc_);
-  unique_ptr<CompactionInput> input(CompactionInput::Create(*mrs, &schema_, 
snap));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
   EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 
val=0, "
@@ -642,8 +644,8 @@ TEST_F(TestCompaction, TestRowSetInput) {
 
   // Check compaction input
   vector<string> out;
-  unique_ptr<CompactionInput> input;
-  ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), 
nullptr, &input));
+  unique_ptr<CompactionOrFlushInput> input;
+  ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), 
nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
   EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 
val=0, "
@@ -705,12 +707,12 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
   shared_ptr<DiskRowSet> result;
   CompactAndReopenNoRoll(all_rss, schema_, &result);
 
-  unique_ptr<CompactionInput> input;
-  ASSERT_OK(CompactionInput::Create(*result,
-                                    &schema_,
-                                    
MvccSnapshot::CreateSnapshotIncludingAllOps(),
-                                    nullptr,
-                                    &input));
+  unique_ptr<CompactionOrFlushInput> input;
+  ASSERT_OK(CompactionOrFlushInput::Create(*result,
+                                           &schema_,
+                                           
MvccSnapshot::CreateSnapshotIncludingAllOps(),
+                                           nullptr,
+                                           &input));
   vector<string> out;
   IterateInput(input.get(), &out);
   ASSERT_EQ(out.size(), 10);
@@ -877,9 +879,9 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
     AddExpectedDelete(&row->redo_head, reinsert->timestamp());
   }
 
-  vector<shared_ptr<CompactionInput>> inputs;
+  vector<shared_ptr<CompactionOrFlushInput>> inputs;
   for (const auto& row_set : row_sets) {
-    unique_ptr<CompactionInput> ci;
+    unique_ptr<CompactionOrFlushInput> ci;
     ASSERT_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
     inputs.emplace_back(ci.release());
   }
@@ -902,7 +904,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
   }
 
   vector<string> out;
-  unique_ptr<CompactionInput> ci;
+  unique_ptr<CompactionOrFlushInput> ci;
   ASSERT_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
   IterateInput(ci.get(), &out);
 
@@ -949,19 +951,19 @@ TEST_F(TestCompaction, 
TestMRSCompactionDoesntOutputUnobservableRows) {
 
   MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllOps();
 
-  vector<shared_ptr<CompactionInput>> to_merge;
+  vector<shared_ptr<CompactionOrFlushInput>> to_merge;
   {
-    unique_ptr<CompactionInput> rs1_input;
-    ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, nullptr, 
&rs1_input));
+    unique_ptr<CompactionOrFlushInput> rs1_input;
+    ASSERT_OK(CompactionOrFlushInput::Create(*rs1, &schema_, all_snap, 
nullptr, &rs1_input));
 
-    unique_ptr<CompactionInput> rs2_input;
-    ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, nullptr, 
&rs2_input));
+    unique_ptr<CompactionOrFlushInput> rs2_input;
+    ASSERT_OK(CompactionOrFlushInput::Create(*rs2, &schema_, all_snap, 
nullptr, &rs2_input));
 
     to_merge.emplace_back(rs1_input.release());
     to_merge.emplace_back(rs2_input.release());
   }
 
-  unique_ptr<CompactionInput> merged(CompactionInput::Merge(to_merge, 
&schema_));
+  unique_ptr<CompactionOrFlushInput> 
merged(CompactionOrFlushInput::Merge(to_merge, &schema_));
 
   // Make sure the unobservable version of the row that was inserted and 
deleted in the MRS
   // in the same op doesn't show up in the compaction input.
@@ -996,7 +998,7 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   // Catch the updates that came in after the snapshot flush was made.
   MvccSnapshot snap2(mvcc_);
-  unique_ptr<CompactionInput> input(CompactionInput::Create(*mrs, &schema_, 
snap2));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2));
 
   // Add some more updates which come into the new rowset while the "reupdate" 
is happening.
   UpdateRows(rs.get(), 1000, 0, 3);
@@ -1008,7 +1010,7 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   // If we look at the contents of the DiskRowSet now, we should see the 
"re-updated" data.
   vector<string> out;
-  ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), 
nullptr, &input));
+  ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), 
nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(1000, out.size());
   EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 
val=1, "
@@ -1018,8 +1020,8 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   // And compact (1 input to 1 output)
   MvccSnapshot snap3(mvcc_);
-  unique_ptr<CompactionInput> compact_input;
-  ASSERT_OK(CompactionInput::Create(*rs, &schema_, snap3, nullptr, 
&compact_input));
+  unique_ptr<CompactionOrFlushInput> compact_input;
+  ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, snap3, nullptr, 
&compact_input));
   ASSERT_OK(DoFlushAndReopen(compact_input.get(), schema_, snap3, 
kLargeRollThreshold, nullptr));
 }
 
@@ -1049,10 +1051,10 @@ TEST_F(TestCompaction, TestKUDU102) {
   // Catch the updates that came in after the snapshot flush was made.
   // Note that we are merging two MRS, it's a hack
   MvccSnapshot snap2(mvcc_);
-  vector<shared_ptr<CompactionInput>> merge_inputs;
-  merge_inputs.emplace_back(CompactionInput::Create(*mrs, &schema_, snap2));
-  merge_inputs.emplace_back(CompactionInput::Create(*mrs_b, &schema_, snap2));
-  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
+  vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
+  merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_, 
snap2));
+  merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs_b, &schema_, 
snap2));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
 
   string dummy_name = "";
 
@@ -1107,11 +1109,11 @@ TEST_F(TestCompaction, TestMergeMRS) {
   InsertRows(mrs_a.get(), 5, 1);
 
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput>> merge_inputs {
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, 
snap)),
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, 
snap))
+  vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap))
   };
-  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(20, CountRows(result_rs));
@@ -1126,11 +1128,11 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) {
                               mem_trackers_.tablet_tracker, &mrs_b));
   InsertRows(mrs_b.get(), 10, 0);
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput>> merge_inputs {
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, 
snap)),
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, 
snap))
+  vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap))
   };
-  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(1, result_rs.size());
@@ -1205,12 +1207,12 @@ TEST_F(TestCompaction, 
TestRandomizeDuplicatedRowsAcrossTransactions) {
     }
   }
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput>> merge_inputs;
-  merge_inputs.emplace_back(CompactionInput::Create(*main_mrs, &schema_, 
snap));
+  vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
+  merge_inputs.emplace_back(CompactionOrFlushInput::Create(*main_mrs, 
&schema_, snap));
   for (auto& mrs : txn_mrss) {
-    merge_inputs.emplace_back(CompactionInput::Create(*mrs, &schema_, snap));
+    merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_, 
snap));
   }
-  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(1, result_rs.size());
@@ -1251,12 +1253,12 @@ TEST_F(TestCompaction, 
TestRowHistoryJumpsBetweenRowsets) {
   // Despite the overlapping time ranges across these inputs, the compaction
   // should go off without a hitch.
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput>> merge_inputs {
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, 
snap)),
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, 
snap)),
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_c, &schema_, 
snap)),
+  vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap)),
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_c, 
&schema_, snap)),
   };
-  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(1, result_rs.size());
@@ -1268,11 +1270,11 @@ TEST_F(TestCompaction, 
TestMergeMRSWithAllInvisibleRows) {
   shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
   shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput>> merge_inputs {
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, 
snap)),
-    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, 
snap))
+  vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
+    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap))
   };
-  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
+  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_TRUE(result_rs.empty());
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 2291ea0eb..d56b79b78 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -113,8 +113,8 @@ void AdvanceToLastInList(const Mutation** m) {
   }
 }
 
-// CompactionInput yielding rows and mutations from a MemRowSet.
-class MemRowSetCompactionInput : public CompactionInput {
+// CompactionOrFlushInput yielding rows and mutations from a MemRowSet.
+class MemRowSetCompactionInput : public CompactionOrFlushInput {
  public:
   MemRowSetCompactionInput(const MemRowSet& memrowset,
                            const MvccSnapshot& snap,
@@ -163,7 +163,7 @@ class MemRowSetCompactionInput : public CompactionInput {
 
       // Handle the rare case where a row was inserted and deleted in the same 
operation.
       // This row can never be observed and should not be compacted/flushed. 
This saves
-      // us some trouble later on on compactions.
+      // us some trouble later during compaction.
       //
       // See CompareDuplicatedRows().
       if (PREDICT_FALSE(input_row.redo_head != nullptr &&
@@ -228,8 +228,8 @@ class MemRowSetCompactionInput : public CompactionInput {
 
 ////////////////////////////////////////////////////////////
 
-// CompactionInput yielding rows and mutations from an on-disk DiskRowSet.
-class DiskRowSetCompactionInput : public CompactionInput {
+// CompactionOrFlushInput yielding rows and mutations from an on-disk 
DiskRowSet.
+class DiskRowSetCompactionInput : public CompactionOrFlushInput {
  public:
   DiskRowSetCompactionInput(unique_ptr<RowwiseIterator> base_iter,
                             unique_ptr<DeltaIterator> redo_delta_iter,
@@ -539,7 +539,7 @@ void TransferRedoHistory(CompactionInputRow* newer, 
CompactionInputRow* older) {
 }
 
 
-class MergeCompactionInput : public CompactionInput {
+class MergeCompactionInput : public CompactionOrFlushInput {
  private:
   // State kept for each of the inputs.
   struct MergeState {
@@ -584,7 +584,7 @@ class MergeCompactionInput : public CompactionInput {
       return schema.Compare(pending.back().row, (*other.next()).row) < 0;
     }
 
-    shared_ptr<CompactionInput> input;
+    shared_ptr<CompactionOrFlushInput> input;
     vector<CompactionInputRow> pending;
     int pending_idx;
 
@@ -592,7 +592,7 @@ class MergeCompactionInput : public CompactionInput {
   };
 
  public:
-  MergeCompactionInput(const vector<shared_ptr<CompactionInput>>& inputs,
+  MergeCompactionInput(const vector<shared_ptr<CompactionOrFlushInput>>& 
inputs,
                        const Schema* schema)
       : schema_(schema),
         num_dup_rows_(0),
@@ -1096,11 +1096,11 @@ string CompactionInputRowToString(const 
CompactionInputRow& input_row) {
 
 ////////////////////////////////////////////////////////////
 
-Status CompactionInput::Create(const DiskRowSet& rowset,
-                               const Schema* projection,
-                               const MvccSnapshot& snap,
-                               const IOContext* io_context,
-                               unique_ptr<CompactionInput>* out) {
+Status CompactionOrFlushInput::Create(const DiskRowSet& rowset,
+                                      const Schema* projection,
+                                      const MvccSnapshot& snap,
+                                      const IOContext* io_context,
+                                      unique_ptr<CompactionOrFlushInput>* out) 
{
   CHECK(projection->has_column_ids());
 
   unique_ptr<ColumnwiseIterator> 
base_cwise(rowset.base_data_->NewIterator(projection, io_context));
@@ -1130,45 +1130,47 @@ Status CompactionInput::Create(const DiskRowSet& rowset,
   return Status::OK();
 }
 
-CompactionInput* CompactionInput::Create(const MemRowSet& memrowset,
-                                         const Schema* projection,
-                                         const MvccSnapshot& snap) {
+CompactionOrFlushInput* CompactionOrFlushInput::Create(const MemRowSet& 
memrowset,
+                                                       const Schema* 
projection,
+                                                       const MvccSnapshot& 
snap) {
   CHECK(projection->has_column_ids());
   return new MemRowSetCompactionInput(memrowset, snap, projection);
 }
 
-CompactionInput* CompactionInput::Merge(const 
vector<shared_ptr<CompactionInput>>& inputs,
-                                        const Schema* schema) {
+CompactionOrFlushInput* CompactionOrFlushInput::Merge(
+    const vector<shared_ptr<CompactionOrFlushInput>>& inputs,
+    const Schema* schema) {
   CHECK(schema->has_column_ids());
   return new MergeCompactionInput(inputs, schema);
 }
 
 
-Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot& snap,
-                                                  const Schema* schema,
-                                                  const IOContext* io_context,
-                                                  shared_ptr<CompactionInput>* 
out) const {
+Status RowSetsInCompactionOrFlush::CreateCompactionOrFlushInput(
+    const MvccSnapshot& snap,
+    const Schema* schema,
+    const IOContext* io_context,
+    shared_ptr<CompactionOrFlushInput>* out) const {
   CHECK(schema->has_column_ids());
 
-  vector<shared_ptr<CompactionInput>> inputs;
+  vector<shared_ptr<CompactionOrFlushInput>> inputs;
   for (const auto& rs : rowsets_) {
-    unique_ptr<CompactionInput> input;
+    unique_ptr<CompactionOrFlushInput> input;
     RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, 
&input),
                           Substitute("Could not create compaction input for 
rowset $0",
                                      rs->ToString()));
-    inputs.push_back(shared_ptr<CompactionInput>(input.release()));
+    inputs.push_back(shared_ptr<CompactionOrFlushInput>(input.release()));
   }
 
   if (inputs.size() == 1) {
     *out = std::move(inputs[0]);
   } else {
-    out->reset(CompactionInput::Merge(inputs, schema));
+    out->reset(CompactionOrFlushInput::Merge(inputs, schema));
   }
 
   return Status::OK();
 }
 
-void RowSetsInCompaction::DumpToLog() const {
+void RowSetsInCompactionOrFlush::DumpToLog() const {
   VLOG(1) << "Selected " << rowsets_.size() << " rowsets to compact:";
   // Dump the selected rowsets to the log, and collect corresponding iterators.
   for (const auto& rs : rowsets_) {
@@ -1388,9 +1390,17 @@ Status ApplyMutationsAndGenerateUndos(const 
MvccSnapshot& snap,
   #undef ERROR_LOG_CONTEXT
 }
 
+// Following method processes the compaction input by reading input rows in
+// blocks and for each row inside the block:
+// - Apply all REDO mutations collected for the row at hand.
+// - Generate corresponding UNDO deltas for applied mutations.
+// - For a row with 'ghost' entries, merge their histories of mutations.
+// - Remove any ancient UNDO mutations, as those are not applicable anymore.
+// - Append UNDO and REDO deltas to DiskRowSetWriter output.
+// - Append fully or partially (resized) processed rowblock to DRS writer 
output.
 Status FlushCompactionInput(const string& tablet_id,
                             const scoped_refptr<FsErrorManager>& error_manager,
-                            CompactionInput* input,
+                            CompactionOrFlushInput* input,
                             const MvccSnapshot& snap,
                             const HistoryGcOpts& history_gc_opts,
                             RollingDiskRowSetWriter* out) {
@@ -1460,10 +1470,12 @@ Status FlushCompactionInput(const string& tablet_id,
       rowid_t index_in_current_drs;
 
       if (new_undos_head != nullptr) {
+        // Append UNDO deltas to DiskRowSetWriter output.
         out->AppendUndoDeltas(dst_row.row_index(), new_undos_head, 
&index_in_current_drs);
       }
 
       if (new_redos_head != nullptr) {
+        // Append REDO deltas to DiskRowSetWriter output.
         out->AppendRedoDeltas(dst_row.row_index(), new_redos_head, 
&index_in_current_drs);
       }
 
@@ -1497,6 +1509,7 @@ Status FlushCompactionInput(const string& tablet_id,
 
       n++;
       if (n == block.nrows()) {
+        // Append fully processed rowblock to DRS writer output.
         RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
         live_row_count = 0;
         n = 0;
@@ -1505,6 +1518,7 @@ Status FlushCompactionInput(const string& tablet_id,
 
     if (n > 0) {
       block.Resize(n);
+      // Append partially (resized) processed rowblock to DRS writer output.
       RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
       block.Resize(block.row_capacity());
     }
@@ -1515,7 +1529,7 @@ Status FlushCompactionInput(const string& tablet_id,
 }
 
 Status ReupdateMissedDeltas(const IOContext* io_context,
-                            CompactionInput* input,
+                            CompactionOrFlushInput* input,
                             const HistoryGcOpts& history_gc_opts,
                             const MvccSnapshot& snap_to_exclude,
                             const MvccSnapshot& snap_to_include,
@@ -1687,7 +1701,9 @@ Status ReupdateMissedDeltas(const IOContext* io_context,
 }
 
 
-Status DebugDumpCompactionInput(CompactionInput* input, int64_t* rows_left, 
vector<string>* lines) {
+Status DebugDumpCompactionInput(CompactionOrFlushInput* input,
+                                int64_t* rows_left,
+                                vector<string>* lines) {
   RETURN_NOT_OK(input->Init());
   vector<CompactionInputRow> rows;
 
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 8b085958c..0404e06ca 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -95,7 +95,7 @@ class HistoryGcOpts {
 };
 
 // Interface for an input feeding into a compaction or flush.
-class CompactionInput {
+class CompactionOrFlushInput {
  public:
   // Create an input which reads from the given rowset, yielding base rows
   // prior to the given snapshot.
@@ -110,20 +110,21 @@ class CompactionInput {
                        const Schema* projection,
                        const MvccSnapshot& snap,
                        const fs::IOContext* io_context,
-                       std::unique_ptr<CompactionInput>* out);
+                       std::unique_ptr<CompactionOrFlushInput>* out);
 
   // Create an input which reads from the given memrowset, yielding base rows 
and updates
   // prior to the given snapshot.
-  static CompactionInput* Create(const MemRowSet& memrowset,
-                                 const Schema* projection,
-                                 const MvccSnapshot& snap);
+  static CompactionOrFlushInput* Create(const MemRowSet& memrowset,
+                                        const Schema* projection,
+                                        const MvccSnapshot& snap);
 
-  // Create an input which merges several other compaction inputs. The inputs 
are merged
+  // Create a collection of merge states containing a state per input. The 
inputs are merged
   // in key-order according to the given schema. All inputs must have matching 
schemas.
-  static CompactionInput* Merge(const 
std::vector<std::shared_ptr<CompactionInput>>& inputs,
-                                const Schema* schema);
+  static CompactionOrFlushInput* Merge(
+      const std::vector<std::shared_ptr<CompactionOrFlushInput>>& inputs,
+      const Schema* schema);
 
-  virtual ~CompactionInput() = default;
+  virtual ~CompactionOrFlushInput() = default;
 
   virtual Status Init() = 0;
   virtual Status PrepareBlock(std::vector<CompactionInputRow>* block) = 0;
@@ -143,8 +144,8 @@ class CompactionInput {
   virtual size_t memory_footprint() const = 0;
 };
 
-// The set of rowsets which are taking part in a given compaction.
-class RowSetsInCompaction {
+// The set of rowsets which are taking part in a given compaction or flush 
operation.
+class RowSetsInCompactionOrFlush {
  public:
   void AddRowSet(const std::shared_ptr<RowSet>& rowset,
                  std::unique_lock<std::mutex> lock) {
@@ -154,15 +155,15 @@ class RowSetsInCompaction {
     rowsets_.push_back(rowset);
   }
 
-  // Create the appropriate compaction input for this compaction -- either a 
merge
+  // Create the appropriate input for this compaction/flush -- either a merge
   // of all the inputs, or the single input if there was only one.
   //
-  // 'schema' is the schema for the output of the compaction, and must remain 
valid
-  // for the lifetime of the returned CompactionInput.
-  Status CreateCompactionInput(const MvccSnapshot& snap,
-                               const Schema* schema,
-                               const fs::IOContext* io_context,
-                               std::shared_ptr<CompactionInput>* out) const;
+  // 'schema' is the schema for the output of the compaction/flush, and must
+  // remain valid for the lifetime of the returned CompactionOrFlushInput.
+  Status CreateCompactionOrFlushInput(const MvccSnapshot& snap,
+                                      const Schema* schema,
+                                      const fs::IOContext* io_context,
+                                      std::shared_ptr<CompactionOrFlushInput>* 
out) const;
 
   // Dump a log message indicating the chosen rowsets.
   void DumpToLog() const;
@@ -178,7 +179,7 @@ class RowSetsInCompaction {
   std::vector<std::unique_lock<std::mutex>> locks_;
 };
 
-// One row yielded by CompactionInput::PrepareBlock.
+// One row yielded by CompactionOrFlushInput::PrepareBlock.
 // Looks like this (assuming n UNDO records and m REDO records):
 // UNDO_n <- ... <- UNDO_1 <- UNDO_head <- row -> REDO_head -> REDO_1 -> ... 
-> REDO_m
 struct CompactionInputRow {
@@ -232,11 +233,11 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& 
snap,
 // Iterate through this compaction input, flushing all rows to the given 
RollingDiskRowSetWriter.
 // The 'snap' argument should match the MvccSnapshot used to create the 
compaction input.
 //
-// After return of this function, this CompactionInput object is "used up" and 
will
+// After return of this function, this CompactionOrFlushInput object is "used 
up" and will
 // no longer be useful.
 Status FlushCompactionInput(const std::string& tablet_id,
                             const scoped_refptr<fs::FsErrorManager>& 
error_manager,
-                            CompactionInput* input,
+                            CompactionOrFlushInput* input,
                             const MvccSnapshot& snap,
                             const HistoryGcOpts& history_gc_opts,
                             RollingDiskRowSetWriter* out);
@@ -249,10 +250,10 @@ Status FlushCompactionInput(const std::string& tablet_id,
 // The output rowsets passed in must be non-overlapping and in ascending key 
order:
 // typically they are the resulting rowsets from a RollingDiskRowSetWriter.
 //
-// After return of this function, this CompactionInput object is "used up" and 
will
+// After return of this function, this CompactionOrFlushInput object is "used 
up" and will
 // yield no further rows.
 Status ReupdateMissedDeltas(const fs::IOContext* io_context,
-                            CompactionInput* input,
+                            CompactionOrFlushInput* input,
                             const HistoryGcOpts& history_gc_opts,
                             const MvccSnapshot& snap_to_exclude,
                             const MvccSnapshot& snap_to_include,
@@ -262,7 +263,7 @@ Status ReupdateMissedDeltas(const fs::IOContext* io_context,
 // This consumes no more rows from the compaction input than specified by the 
'rows_left' parameter.
 // If 'rows_left' is nullptr, there is no limit on the number of rows to dump.
 // If the content of 'rows_left' is equal to or less than 0, no rows will be 
dumped.
-Status DebugDumpCompactionInput(CompactionInput* input, int64_t* rows_left,
+Status DebugDumpCompactionInput(CompactionOrFlushInput* input, int64_t* 
rows_left,
                                 std::vector<std::string>* lines);
 
 // Helper methods to print a row with full history.
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 07e27ff00..b05b6fcd3 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -672,8 +672,8 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions& 
opts,
 Status DiskRowSet::NewCompactionInput(const Schema* projection,
                                       const MvccSnapshot &snap,
                                       const IOContext* io_context,
-                                      unique_ptr<CompactionInput>* out) const {
-  return CompactionInput::Create(*this, projection, snap, io_context, out);
+                                      unique_ptr<CompactionOrFlushInput>* out) 
const {
+  return CompactionOrFlushInput::Create(*this, projection, snap, io_context, 
out);
 }
 
 Status DiskRowSet::MutateRow(Timestamp timestamp,
@@ -926,9 +926,9 @@ Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp 
ancient_history_mark,
 }
 
 Status DiskRowSet::DebugDumpImpl(int64_t* rows_left, vector<string>* lines) {
-  // Using CompactionInput to dump our data is an easy way of seeing all the
+  // Using CompactionOrFlushInput to dump our data is an easy way of seeing 
all the
   // rows and deltas.
-  unique_ptr<CompactionInput> input;
+  unique_ptr<CompactionOrFlushInput> input;
   RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
                                    
MvccSnapshot::CreateSnapshotIncludingAllOps(),
                                    nullptr, &input));
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 1719cbfe9..ea1250e3e 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -93,7 +93,7 @@ Status DumpRowSetInternal(const fs::IOContext& ctx,
 namespace tablet {
 
 class CFileSet;
-class CompactionInput;
+class CompactionOrFlushInput;
 class DeltaFileWriter;
 class DeltaStats;
 class HistoryGcOpts;
@@ -381,7 +381,7 @@ class DiskRowSet :
   Status NewCompactionInput(const Schema* projection,
                             const MvccSnapshot &snap,
                             const fs::IOContext* io_context,
-                            std::unique_ptr<CompactionInput>* out) const 
override;
+                            std::unique_ptr<CompactionOrFlushInput>* out) 
const override;
 
   // Gets the number of rows in this rowset, checking 'num_rows_' first. If not
   // yet set, consults the base data and stores the result in 'num_rows_'.
@@ -489,7 +489,7 @@ class DiskRowSet :
   FRIEND_TEST(TestRowSet, TestDMSFlush);
   FRIEND_TEST(tserver::TabletServerTest, SetEncodedKeysWhenStartingUp);
 
-  friend class CompactionInput;
+  friend class CompactionOrFlushInput;
   friend class Tablet;
   friend Status kudu::tools::DumpRowSetInternal(
       const fs::IOContext& ctx,
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 2f4a030fd..404b1d236 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -343,8 +343,8 @@ Status MemRowSet::NewRowIterator(const RowIteratorOptions& 
opts,
 Status MemRowSet::NewCompactionInput(const Schema* projection,
                                      const MvccSnapshot& snap,
                                      const IOContext* /*io_context*/,
-                                     unique_ptr<CompactionInput>* out) const  {
-  out->reset(CompactionInput::Create(*this, projection, snap));
+                                     unique_ptr<CompactionOrFlushInput>* out) 
const  {
+  out->reset(CompactionOrFlushInput::Create(*this, projection, snap));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index c58a09640..2f7254c76 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -88,7 +88,7 @@ namespace tablet {
 // NOTE: all allocations done by the MemRowSet are done inside its associated
 // thread-safe arena, and then freed in bulk when the MemRowSet is destructed.
 
-class CompactionInput;
+class CompactionOrFlushInput;
 class MemRowSet;
 class Mutation;
 class OperationResultPB;
@@ -345,7 +345,7 @@ class MemRowSet : public RowSet,
   Status NewCompactionInput(const Schema* projection,
                             const MvccSnapshot& snap,
                             const fs::IOContext* io_context,
-                            std::unique_ptr<CompactionInput>* out) const 
override;
+                            std::unique_ptr<CompactionOrFlushInput>* out) 
const override;
 
   // Return the Schema for the rows in this memrowset.
    const Schema &schema() const {
@@ -493,6 +493,8 @@ class MemRowSet : public RowSet,
   volatile uint64_t debug_insert_count_;
   volatile uint64_t debug_update_count_;
 
+  // Lock governing this rowset's inclusion in a compact/flush. If locked,
+  // no other compactor/flusher will attempt to include this rowset.
   std::mutex compact_flush_lock_;
 
   log::MinLogIndexAnchorer anchorer_;
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index eedeb95f9..1e099fb79 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -61,7 +61,7 @@ class MockRowSet : public RowSet {
   Status NewCompactionInput(const Schema* /*projection*/,
                             const MvccSnapshot& /*snap*/,
                             const fs::IOContext* /*io_context*/,
-                            std::unique_ptr<CompactionInput>* /*out*/) const 
override {
+                            std::unique_ptr<CompactionOrFlushInput>* /*out*/) 
const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 0fd72b78c..bd8084347 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -142,7 +142,7 @@ Status DuplicatingRowSet::NewRowIterator(const 
RowIteratorOptions& opts,
 Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/,
                                              const MvccSnapshot& /*snap*/,
                                              const IOContext* /*io_context*/,
-                                             unique_ptr<CompactionInput>* 
/*out*/) const  {
+                                             
unique_ptr<CompactionOrFlushInput>* /*out*/) const  {
   LOG(FATAL) << "duplicating rowsets do not act as compaction input";
   return Status::OK();
 }
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index c12b51304..7ecad1f28 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -59,7 +59,7 @@ struct IOContext;
 
 namespace tablet {
 
-class CompactionInput;
+class CompactionOrFlushInput;
 class OperationResultPB;
 class RowSetKeyProbe;
 class RowSetMetadata;
@@ -168,7 +168,7 @@ class RowSet {
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
                                     const fs::IOContext* io_context,
-                                    std::unique_ptr<CompactionInput>* out) 
const = 0;
+                                    std::unique_ptr<CompactionOrFlushInput>* 
out) const = 0;
 
   // Count the number of rows in this rowset.
   virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) 
const = 0;
@@ -423,7 +423,7 @@ class DuplicatingRowSet : public RowSet {
   Status NewCompactionInput(const Schema* projection,
                             const MvccSnapshot &snap,
                             const fs::IOContext* io_context,
-                            std::unique_ptr<CompactionInput>* out) const 
override;
+                            std::unique_ptr<CompactionOrFlushInput>* out) 
const override;
 
   Status CountRows(const fs::IOContext* io_context, rowid_t *count) const 
override;
 
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index d7a1e88b5..2131590bc 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1566,7 +1566,7 @@ Status Tablet::Flush() {
 Status Tablet::FlushUnlocked() {
   TRACE_EVENT0("tablet", "Tablet::FlushUnlocked");
   RETURN_NOT_OK(CheckHasNotBeenStopped());
-  RowSetsInCompaction input;
+  RowSetsInCompactionOrFlush input;
   vector<shared_ptr<MemRowSet>> old_mrss;
   {
     // Create a new MRS with the latest schema.
@@ -1654,7 +1654,7 @@ Status Tablet::FlushUnlocked() {
   return Status::OK();
 }
 
-Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
+Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompactionOrFlush* new_mrss,
                                          vector<shared_ptr<MemRowSet>>* 
old_mrss) {
   DCHECK(old_mrss->empty());
   old_mrss->emplace_back(components_->memrowset);
@@ -1666,7 +1666,7 @@ Status 
Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
   for (auto& mrs : *old_mrss) {
     std::unique_lock<std::mutex> ms_lock(*mrs->compact_flush_lock(), 
std::try_to_lock);
     CHECK(ms_lock.owns_lock());
-    compaction->AddRowSet(mrs, std::move(ms_lock));
+    new_mrss->AddRowSet(mrs, std::move(ms_lock));
   }
 
   shared_ptr<MemRowSet> new_mrs;
@@ -1801,7 +1801,7 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
   return throttler_->Take(MonoTime::Now(), 1, bytes);
 }
 
-Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
+Status Tablet::PickRowSetsToCompact(RowSetsInCompactionOrFlush *picked,
                                     CompactFlags flags) const {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   // Grab a local reference to the current RowSetTree. This is to avoid
@@ -1997,7 +1997,20 @@ Status Tablet::FlushMetadata(const RowSetVector& 
to_remove,
                                    txns_being_flushed);
 }
 
-Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
+// Computes on-disk size of all the deltas in provided rowsets.
+size_t Tablet::GetAllDeltasSizeOnDisk(const RowSetsInCompactionOrFlush &input) 
{
+  size_t deltas_on_disk_size = 0;
+  for (const auto& rs : input.rowsets()) {
+    DiskRowSetSpace drss;
+    DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
+    drs->GetDiskRowSetSpaceUsage(&drss);
+    deltas_on_disk_size += drss.redo_deltas_size + drss.undo_deltas_size;
+  }
+
+  return deltas_on_disk_size;
+}
+
+Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush 
&input,
                                         int64_t mrs_being_flushed,
                                         const vector<TxnInfoBeingFlushed>& 
txns_being_flushed) {
   const char *op_name =
@@ -2006,39 +2019,46 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
                "tablet_id", tablet_id(),
                "op", op_name);
 
+  // Save the stats on the total on-disk size of all deltas in selected 
rowsets.
+  size_t deltas_on_disk_size = 0;
+  if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
+    deltas_on_disk_size = GetAllDeltasSizeOnDisk(input);
+  }
+
   const auto& tid = tablet_id();
   const IOContext io_context({ tid });
 
+  shared_ptr<CompactionOrFlushInput> merge;
+  const SchemaPtr schema_ptr = schema();
   MvccSnapshot flush_snap(mvcc_);
   VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). 
"
                                     "Phase 1 snapshot: $1",
                                     op_name, flush_snap.ToString());
 
-  // Save the stats on the total on-disk size of all deltas in selected 
rowsets.
-  size_t deltas_on_disk_size = 0;
-  if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
-    for (const auto& rs : input.rowsets()) {
-      DiskRowSetSpace drss;
-      DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
-      drs->GetDiskRowSetSpaceUsage(&drss);
-      deltas_on_disk_size += drss.redo_deltas_size + drss.undo_deltas_size;
-    }
-  }
-
+  // Fault injection hook for testing and debugging purpose only.
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(),
                           "PostTakeMvccSnapshot hook failed");
   }
 
-  shared_ptr<CompactionInput> merge;
-  const SchemaPtr schema_ptr = schema();
-  RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema_ptr.get(), 
&io_context, &merge));
+  // Create input of rowsets by iterating through all rowsets and for each 
rowset:
+  //   - For compaction ops, create input that contains initialized base,
+  //     relevant REDO and UNDO delta iterators to be used read from 
persistent storage.
+  //   - For Flush ops, create iterator for in-memory tree holding data 
updates.
+  RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap,
+                                                   schema_ptr.get(),
+                                                   &io_context,
+                                                   &merge));
 
+  // Initializing a DRS writer, to be used later for writing REDO, UNDO 
deltas, delta stats, etc.
   RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), 
DefaultBloomSizing(),
                                compaction_policy_->target_rowset_size());
   RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");
 
+  // Get tablet history, to be used later for AHM validation checks.
   HistoryGcOpts history_gc_opts = GetHistoryGcOpts();
+
+  // Apply REDO and UNDO deltas to the rows, merge histories of rows with 
'ghost' entries.
   RETURN_NOT_OK_PREPEND(
       FlushCompactionInput(
           tid, metadata_->fs_manager()->block_manager()->error_manager(),
@@ -2046,6 +2066,7 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
       "Flush to disk failed");
   RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");
 
+  // Fault injection hook for testing and debugging purpose only.
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostWriteSnapshot(),
                           "PostWriteSnapshot hook failed");
@@ -2071,6 +2092,8 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
     metrics_->bytes_flushed->IncrementBy(drsw.written_size());
   }
 
+  // Open all the rowsets (that were processed in this stage) from disk and
+  // store the pointers to each of rowset inside new_disk_rowsets.
   vector<shared_ptr<RowSet>> new_disk_rowsets;
   new_disk_rowsets.reserve(new_drs_metas.size());
   {
@@ -2168,6 +2191,7 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
   // swap as committed in 'non_duplicated_ops_snap'.
   non_duplicated_ops_snap.AddAppliedTimestamps(applying_during_swap);
 
+  // Fault injection hook for testing and debugging purpose only.
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapInDuplicatingRowSet(),
                           "PostSwapInDuplicatingRowSet hook failed");
@@ -2182,9 +2206,11 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
                                     "which arrived during Phase 1. Snapshot: 
$1",
                                     op_name, 
non_duplicated_ops_snap.ToString());
   const SchemaPtr schema_ptr2 = schema();
-  RETURN_NOT_OK_PREPEND(
-      input.CreateCompactionInput(non_duplicated_ops_snap, schema_ptr2.get(), 
&io_context, &merge),
-          Substitute("Failed to create $0 inputs", op_name).c_str());
+  
RETURN_NOT_OK_PREPEND(input.CreateCompactionOrFlushInput(non_duplicated_ops_snap,
+                                                           schema_ptr2.get(),
+                                                           &io_context,
+                                                           &merge),
+                        Substitute("Failed to create $0 inputs", 
op_name).c_str());
 
   // Update the output rowsets with the deltas that came in in phase 1, before 
we swapped
   // in the DuplicatingRowSets. This will perform a flush of the updated 
DeltaTrackers
@@ -2199,6 +2225,7 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
         Substitute("Failed to re-update deltas missed during $0 phase 1",
                      op_name).c_str());
 
+  // Fault injection hook for testing and debugging purpose only.
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostReupdateMissedDeltas(),
                           "PostReupdateMissedDeltas hook failed");
@@ -2269,6 +2296,7 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
                                     drs_written,
                                     bytes_written);
 
+  // Fault injection hook for testing and debugging purpose only.
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(),
                           "PostSwapNewRowSet hook failed");
@@ -2316,7 +2344,7 @@ void Tablet::UpdateAverageRowsetHeight() {
 Status Tablet::Compact(CompactFlags flags) {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
 
-  RowSetsInCompaction input;
+  RowSetsInCompactionOrFlush input;
   // Step 1. Capture the rowsets to be merged
   RETURN_NOT_OK_PREPEND(PickRowSetsToCompact(&input, flags),
                         "Failed to pick rowsets to compact");
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 16244deab..05ef62d6f 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -94,7 +94,7 @@ class HistoryGcOpts;
 class MemRowSet;
 class ParticipantOpState;
 class RowSetTree;
-class RowSetsInCompaction;
+class RowSetsInCompactionOrFlush;
 class TxnMetadata;
 class WriteOpState;
 struct RowOp;
@@ -678,11 +678,11 @@ class Tablet {
                                     const ScanSpec* spec,
                                     std::vector<IterWithBounds>* iters) const;
 
-  Status PickRowSetsToCompact(RowSetsInCompaction *picked,
+  Status PickRowSetsToCompact(RowSetsInCompactionOrFlush *picked,
                               CompactFlags flags) const;
 
   // Performs a merge compaction or a flush.
-  Status DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
+  Status DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush &input,
                                   int64_t mrs_being_flushed,
                                   const std::vector<TxnInfoBeingFlushed>& 
txns_being_flushed);
 
@@ -702,6 +702,9 @@ class Tablet {
                        int64_t mrs_being_flushed,
                        const std::vector<TxnInfoBeingFlushed>& 
txns_being_flushed);
 
+  // Computes on-disk size of all the deltas in provided rowsets.
+  size_t GetAllDeltasSizeOnDisk(const RowSetsInCompactionOrFlush& input);
+
   static void ModifyRowSetTree(const RowSetTree& old_tree,
                                const RowSetVector& rowsets_to_remove,
                                const RowSetVector& rowsets_to_add,
@@ -732,7 +735,7 @@ class Tablet {
   // before the replacement. If any MemRowSet is not empty it will be added to
   // the 'compaction' input and the MemRowSets' compaction locks will be taken
   // to prevent the inclusion in any concurrent compactions.
-  Status ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
+  Status ReplaceMemRowSetsUnlocked(RowSetsInCompactionOrFlush* new_mrss,
                                    std::vector<std::shared_ptr<MemRowSet>>* 
old_mrss);
 
   // Convert the specified read client schema (without IDs) to a server schema 
(with IDs)

Reply via email to