Repository: kudu
Updated Branches:
  refs/heads/master 40ba6c143 -> 1f2617674


tablet: don't store row count in delta tracker

DeltaTracker's constructor previously required a row-count that can
only be obtained by first reading from disk. While useful as a sanity
check while tracking updates, it's not important for this count to be
known at instantiation-time.

This patch is helpful in reducing the amount of data required from disk
at startup, as one of the reasons we need to fully open some CFiles is
to get this row count. This patch itself doesn't defer any reads; it
just defers the requirement of the row count.

This patch has no functional changes.

Change-Id: I084e0944f388c22e838b017663a812b0ba77245d
Reviewed-on: http://gerrit.cloudera.org:8080/9216
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1f261767
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1f261767
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1f261767

Branch: refs/heads/master
Commit: 1f26176748911ccc7104e932440f44c74bd3d006
Parents: 40ba6c1
Author: Andrew Wong <aw...@cloudera.com>
Authored: Mon Feb 5 11:14:02 2018 -0800
Committer: Todd Lipcon <t...@apache.org>
Committed: Fri Feb 23 00:22:00 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/compaction.cc    | 23 ++++++++++++++---------
 src/kudu/tablet/delta_tracker.cc |  9 ++-------
 src/kudu/tablet/delta_tracker.h  | 14 +-------------
 src/kudu/tablet/diskrowset.cc    | 31 +++++++++++++++++++++++++------
 src/kudu/tablet/diskrowset.h     |  9 +++++++--
 5 files changed, 49 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1f261767/src/kudu/tablet/compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 62b6302..981a638 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -1188,10 +1188,10 @@ Status ReupdateMissedDeltas(const string &tablet_name,
   VLOG(1) << "Reupdating missed deltas between snapshot " <<
     snap_to_exclude.ToString() << " and " << snap_to_include.ToString();
 
-  // Collect the delta trackers that we'll push the updates into.
-  deque<DeltaTracker *> delta_trackers;
+  // Collect the disk rowsets that we'll push the updates into.
+  deque<DiskRowSet *> diskrowsets;
   for (const shared_ptr<RowSet> &rs : output_rowsets) {
-    delta_trackers.push_back(down_cast<DiskRowSet 
*>(rs.get())->delta_tracker());
+    diskrowsets.push_back(down_cast<DiskRowSet *>(rs.get()));
   }
 
   // The set of updated delta trackers.
@@ -1285,25 +1285,30 @@ Status ReupdateMissedDeltas(const string &tablet_name,
         DVLOG(3) << "Flushing missed delta for row " << output_row_offset
                  << " @" << mut->timestamp() << ": " << 
mut->changelist().ToString(*schema);
 
-        DeltaTracker *cur_tracker = delta_trackers.front();
+        rowid_t num_rows;
+        DiskRowSet* cur_drs = diskrowsets.front();
+        RETURN_NOT_OK(cur_drs->CountRows(&num_rows));
 
         // The index on the input side isn't necessarily the index on the 
output side:
         // we may have output several small DiskRowSets, so we need to find 
the index
         // relative to the current one.
         int64_t idx_in_delta_tracker = output_row_offset - 
delta_tracker_base_row;
-        while (idx_in_delta_tracker >= cur_tracker->num_rows()) {
+        while (idx_in_delta_tracker >= num_rows) {
           // If the current index is higher than the total number of rows in 
the current
           // DeltaTracker, that means we're now processing the next one in the 
list.
           // Pop the current front tracker, and make the indexes relative to 
the next
           // in the list.
-          delta_tracker_base_row += cur_tracker->num_rows();
-          idx_in_delta_tracker -= cur_tracker->num_rows();
+          delta_tracker_base_row += num_rows;
+          idx_in_delta_tracker -= num_rows;
           DCHECK_GE(idx_in_delta_tracker, 0);
-          delta_trackers.pop_front();
-          cur_tracker = delta_trackers.front();
+          diskrowsets.pop_front();
+          cur_drs = diskrowsets.front();
+          RETURN_NOT_OK(cur_drs->CountRows(&num_rows));
         }
 
+        DeltaTracker* cur_tracker = cur_drs->delta_tracker();
         gscoped_ptr<OperationResultPB> result(new OperationResultPB);
+        DCHECK_LT(idx_in_delta_tracker, num_rows);
         Status s = cur_tracker->Update(mut->timestamp(),
                                        idx_in_delta_tracker,
                                        mut->changelist(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f261767/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index c867bab..96d581e 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -74,12 +74,11 @@ using std::vector;
 using strings::Substitute;
 
 Status DeltaTracker::Open(const shared_ptr<RowSetMetadata>& rowset_metadata,
-                          rowid_t num_rows,
                           LogAnchorRegistry* log_anchor_registry,
                           const TabletMemTrackers& mem_trackers,
                           gscoped_ptr<DeltaTracker>* delta_tracker) {
   gscoped_ptr<DeltaTracker> local_dt(
-      new DeltaTracker(rowset_metadata, num_rows, log_anchor_registry,
+      new DeltaTracker(rowset_metadata, log_anchor_registry,
                        mem_trackers));
   RETURN_NOT_OK(local_dt->DoOpen());
 
@@ -88,11 +87,9 @@ Status DeltaTracker::Open(const shared_ptr<RowSetMetadata>& 
rowset_metadata,
 }
 
 DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
-                           rowid_t num_rows,
                            LogAnchorRegistry* log_anchor_registry,
                            TabletMemTrackers mem_trackers)
     : rowset_metadata_(std::move(rowset_metadata)),
-      num_rows_(num_rows),
       open_(false),
       read_only_(false),
       log_anchor_registry_(log_anchor_registry),
@@ -606,9 +603,8 @@ Status DeltaTracker::Update(Timestamp timestamp,
                             const RowChangeList &update,
                             const consensus::OpId& op_id,
                             OperationResultPB* result) {
-  // TODO: can probably lock this more fine-grained.
+  // TODO(todd): can probably lock this more fine-grained.
   shared_lock<rw_spinlock> lock(component_lock_);
-  DCHECK_LT(row_idx, num_rows_);
 
   Status s = dms_->Update(timestamp, row_idx, update, op_id);
   if (s.ok()) {
@@ -625,7 +621,6 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, bool 
*deleted,
                                      ProbeStats* stats) const {
   shared_lock<rw_spinlock> lock(component_lock_);
 
-  DCHECK_LT(row_idx, num_rows_);
 
   *deleted = false;
   // Check if the row has a deletion in DeltaMemStore.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f261767/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index f7388c7..4732c38 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -83,7 +83,6 @@ class DeltaTracker {
   };
 
   static Status Open(const std::shared_ptr<RowSetMetadata>& rowset_metadata,
-                     rowid_t num_rows,
                      log::LogAnchorRegistry* log_anchor_registry,
                      const TabletMemTrackers& mem_trackers,
                      gscoped_ptr<DeltaTracker>* delta_tracker);
@@ -223,12 +222,6 @@ class DeltaTracker {
                           const SharedDeltaStoreVector& new_stores,
                           DeltaType type);
 
-  // Return the number of rows encompassed by this DeltaTracker. Note that
-  // this is _not_ the number of updated rows, but rather the number of rows
-  // in the associated CFileSet base data. All updates must have a rowid
-  // strictly less than num_rows().
-  int64_t num_rows() const { return num_rows_; }
-
   // Get the delta MemStore's size in bytes, including pre-allocation.
   size_t DeltaMemStoreSize() const;
 
@@ -276,7 +269,7 @@ class DeltaTracker {
   FRIEND_TEST(TestMajorDeltaCompaction, TestCompact);
 
   DeltaTracker(std::shared_ptr<RowSetMetadata> rowset_metadata,
-               rowid_t num_rows, log::LogAnchorRegistry* log_anchor_registry,
+               log::LogAnchorRegistry* log_anchor_registry,
                TabletMemTrackers mem_trackers);
 
   Status DoOpen();
@@ -320,11 +313,6 @@ class DeltaTracker {
 
   std::shared_ptr<RowSetMetadata> rowset_metadata_;
 
-  // The number of rows in the DiskRowSet that this tracker is associated with.
-  // This is just used for assertions to make sure that we don't update a row
-  // which doesn't exist.
-  rowid_t num_rows_;
-
   bool open_;
 
   // Certain errors (e.g. failed delta tracker flushes) may leave the delta

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f261767/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 37aede9..f0ce31b 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -519,6 +519,7 @@ DiskRowSet::DiskRowSet(shared_ptr<RowSetMetadata> 
rowset_metadata,
       open_(false),
       log_anchor_registry_(log_anchor_registry),
       mem_trackers_(std::move(mem_trackers)),
+      num_rows_(-1),
       has_been_compacted_(false) {}
 
 Status DiskRowSet::Open() {
@@ -527,9 +528,7 @@ Status DiskRowSet::Open() {
                                mem_trackers_.tablet_tracker,
                                &base_data_));
 
-  rowid_t num_rows;
-  RETURN_NOT_OK(base_data_->CountRows(&num_rows));
-  RETURN_NOT_OK(DeltaTracker::Open(rowset_metadata_, num_rows,
+  RETURN_NOT_OK(DeltaTracker::Open(rowset_metadata_,
                                    log_anchor_registry_,
                                    mem_trackers_,
                                    &delta_tracker_));
@@ -670,6 +669,10 @@ Status DiskRowSet::MutateRow(Timestamp timestamp,
                              ProbeStats* stats,
                              OperationResultPB* result) {
   DCHECK(open_);
+#ifndef NDEBUG
+  rowid_t num_rows;
+  RETURN_NOT_OK(CountRows(&num_rows));
+#endif
   shared_lock<rw_spinlock> l(component_lock_);
 
   boost::optional<rowid_t> row_idx;
@@ -677,6 +680,9 @@ Status DiskRowSet::MutateRow(Timestamp timestamp,
   if (PREDICT_FALSE(row_idx == boost::none)) {
     return Status::NotFound("row not found");
   }
+#ifndef NDEBUG
+  CHECK_LT(*row_idx, num_rows);
+#endif
 
   // It's possible that the row key exists in this DiskRowSet, but it has
   // in fact been Deleted already. Check with the delta tracker to be sure.
@@ -695,6 +701,10 @@ Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe 
&probe,
                                    bool* present,
                                    ProbeStats* stats) const {
   DCHECK(open_);
+#ifndef NDEBUG
+  rowid_t num_rows;
+  RETURN_NOT_OK(CountRows(&num_rows));
+#endif
   shared_lock<rw_spinlock> l(component_lock_);
 
   rowid_t row_idx;
@@ -703,6 +713,9 @@ Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe 
&probe,
     // If it wasn't in the base data, then it's definitely not in the rowset.
     return Status::OK();
   }
+#ifndef NDEBUG
+  CHECK_LT(row_idx, num_rows);
+#endif
 
   // Otherwise it might be in the base data but deleted.
   bool deleted = false;
@@ -713,9 +726,15 @@ Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe 
&probe,
 
 Status DiskRowSet::CountRows(rowid_t *count) const {
   DCHECK(open_);
-  shared_lock<rw_spinlock> l(component_lock_);
-
-  return base_data_->CountRows(count);
+  rowid_t num_rows = num_rows_.load();
+  if (PREDICT_TRUE(num_rows != -1)) {
+    *count = num_rows;
+  } else {
+    shared_lock<rw_spinlock> l(component_lock_);
+    RETURN_NOT_OK(base_data_->CountRows(count));
+    num_rows_.store(*count);
+  }
+  return Status::OK();
 }
 
 Status DiskRowSet::GetBounds(std::string* min_encoded_key,

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f261767/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index f5739d5..ba1b325 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -361,8 +361,9 @@ class DiskRowSet : public RowSet {
                                     const MvccSnapshot &snap,
                                     gscoped_ptr<CompactionInput>* out) const 
override;
 
-  // Count the number of rows in this rowset.
-  Status CountRows(rowid_t *count) const override;
+  // Gets the number of rows in this rowset, checking 'num_rows_' first. If not
+  // yet set, consults the base data and stores the result in 'num_rows_'.
+  Status CountRows(rowid_t *count) const final override;
 
   // See RowSet::GetBounds(...)
   virtual Status GetBounds(std::string* min_encoded_key,
@@ -470,6 +471,10 @@ class DiskRowSet : public RowSet {
   std::shared_ptr<CFileSet> base_data_;
   gscoped_ptr<DeltaTracker> delta_tracker_;
 
+  // Number of rows in the rowset. This may be unset (-1) if the rows in the
+  // underlying cfile set have not been counted yet.
+  mutable std::atomic<int64_t> num_rows_;
+
   // Lock governing this rowset's inclusion in a compact/flush. If locked,
   // no other compactor will attempt to include this rowset.
   std::mutex compact_flush_lock_;

Reply via email to