This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 640a84e  KUDU-3195: flush when any DMS in the tablet is older than the 
time threshold
640a84e is described below

commit 640a84ecff857c3d0447c690c68e2361eb3e9c3b
Author: Andrew Wong <[email protected]>
AuthorDate: Sun Oct 11 23:08:06 2020 -0700

    KUDU-3195: flush when any DMS in the tablet is older than the time threshold
    
    Currently each tablet will wait at least 2 minutes (controlled by
    --flush_threshold_secs) between flushing DMSs, even if there are several
    DMSs that are older than 2 minutes in a given tablet. This means that
    for tablets with several dozen rowsets and updates across the entire
    tablet, it could take hours to flush all the deltas.
    
    Rather than waiting for 2 minutes since the last flush time before
    considering time-based flushing, this patch tracks the creation time of
    every DMS and flushes as long as there is a DMS that is older than 2
    minutes in the tablet.
    
    Change-Id: Id05202bf6a4685f4d79db11ef8ebb0f91f6316b4
    Reviewed-on: http://gerrit.cloudera.org:8080/16581
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tablet/delta_tracker.cc         |  15 ++++
 src/kudu/tablet/delta_tracker.h          |   8 ++
 src/kudu/tablet/deltamemstore.cc         |   1 +
 src/kudu/tablet/deltamemstore.h          |   7 +-
 src/kudu/tablet/diskrowset.cc            |   5 ++
 src/kudu/tablet/diskrowset.h             |   2 +
 src/kudu/tablet/memrowset.h              |   4 +
 src/kudu/tablet/mock-rowsets.h           | 124 +++++++++++++++----------------
 src/kudu/tablet/rowset.h                 |  65 ++++++++--------
 src/kudu/tablet/tablet.cc                |  57 +++++++-------
 src/kudu/tablet/tablet.h                 |  17 +++--
 src/kudu/tablet/tablet_replica_mm_ops.cc |  33 ++++----
 src/kudu/tablet/tablet_replica_mm_ops.h  |  11 +--
 src/kudu/tserver/tablet_server-test.cc   |  35 +++++++++
 14 files changed, 234 insertions(+), 150 deletions(-)

diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 2f8bbcf..a8d2aae 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -863,6 +863,21 @@ Status DeltaTracker::Flush(const IOContext* io_context, 
MetadataFlushType flush_
   return Status::OK();
 }
 
+bool DeltaTracker::GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime* 
creation_time) const {
+  // Check dms_exists_ first to avoid unnecessary contention on
+  // component_lock_. We need to check again after taking the lock in case we
+  // raced with a DMS flush.
+  if (dms_exists_.Load()) {
+    shared_lock<rw_spinlock> lock(component_lock_);
+    if (dms_exists_.Load()) {
+      *size_bytes = dms_->EstimateSize();
+      *creation_time = dms_->creation_time();
+      return true;
+    }
+  }
+  return false;
+}
+
 size_t DeltaTracker::DeltaMemStoreSize() const {
   shared_lock<rw_spinlock> lock(component_lock_);
   return dms_exists_.Load() ? dms_->EstimateSize() : 0;
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index dfec6a3..e021ec8 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -235,6 +235,14 @@ class DeltaTracker {
                           const fs::IOContext* io_context,
                           DeltaType type);
 
+  // Returns true if a DMS exists, returning its size in bytes and the time at
+  // which it was created. Otherwise, returns false and doesn't update the
+  // input pointers.
+  //
+  // NOTE: we lazily create the DMS, so the creation time corresponds to the
+  // age of the oldest update to the rowset.
+  bool GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const;
+
   // Get the delta MemStore's size in bytes, including pre-allocation.
   size_t DeltaMemStoreSize() const;
 
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 3575a3c..494ad89 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -72,6 +72,7 @@ DeltaMemStore::DeltaMemStore(int64_t id,
                              shared_ptr<MemTracker> parent_tracker)
   : id_(id),
     rs_id_(rs_id),
+    creation_time_(MonoTime::Now()),
     highest_timestamp_(Timestamp::kMin),
     allocator_(new MemoryTrackingBufferAllocator(
         HeapBufferAllocator::Get(), std::move(parent_tracker))),
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index a7822a5..333a307 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <boost/type_traits/decay.hpp>
 
 #include "kudu/common/rowid.h"
 #include "kudu/common/timestamp.h"
@@ -38,6 +39,7 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/make_shared.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -130,7 +132,8 @@ class DeltaMemStore : public DeltaStore,
     return arena_->memory_footprint();
   }
 
-  const int64_t id() const { return id_; }
+  int64_t id() const { return id_; }
+  const MonoTime& creation_time() const { return creation_time_; }
 
   typedef btree::CBTree<DMSTreeTraits> DMSTree;
   typedef btree::CBTreeIterator<DMSTreeTraits> DMSTreeIter;
@@ -176,6 +179,8 @@ class DeltaMemStore : public DeltaStore,
   const int64_t id_;    // DeltaMemStore ID.
   const int64_t rs_id_; // Rowset ID.
 
+  const MonoTime creation_time_;
+
   mutable simple_spinlock ts_lock_;
   Timestamp highest_timestamp_;
 
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index a761c48..b5959ff 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -808,6 +808,11 @@ bool DiskRowSet::DeltaMemStoreEmpty() const {
   return delta_tracker_->DeltaMemStoreEmpty();
 }
 
+bool DiskRowSet::DeltaMemStoreInfo(size_t* size_bytes, MonoTime* 
creation_time) const {
+  DCHECK(open_);
+  return delta_tracker_->GetDeltaMemStoreInfo(size_bytes, creation_time);
+}
+
 int64_t DiskRowSet::MinUnflushedLogIndex() const {
   DCHECK(open_);
   return delta_tracker_->MinUnflushedLogIndex();
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 709bee2..a6b076f 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -390,6 +390,8 @@ class DiskRowSet :
 
   size_t DeltaMemStoreSize() const override;
 
+  bool DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const 
override;
+
   bool DeltaMemStoreEmpty() const override;
 
   int64_t MinUnflushedLogIndex() const override;
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 5b67666..3b3fbcb 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -377,6 +377,10 @@ class MemRowSet : public RowSet,
 
   size_t DeltaMemStoreSize() const override { return 0; }
 
+  bool DeltaMemStoreInfo(size_t* /*size_bytes*/, MonoTime* /*creation_time*/) 
const override {
+    return false;
+  }
+
   bool DeltaMemStoreEmpty() const override { return true; }
 
   int64_t MinUnflushedLogIndex() const override {
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index b3b8930..a0ae341 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -37,146 +37,146 @@ namespace tablet {
 // Mock implementation of RowSet which just aborts on every call.
 class MockRowSet : public RowSet {
  public:
-  virtual Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
-                                 const fs::IOContext* /*io_context*/,
-                                 bool* /*present*/, ProbeStats* /*stats*/) 
const override {
+  Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
+                         const fs::IOContext* /*io_context*/,
+                         bool* /*present*/, ProbeStats* /*stats*/) const 
override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status MutateRow(Timestamp /*timestamp*/,
-                           const RowSetKeyProbe& /*probe*/,
-                           const RowChangeList& /*update*/,
-                           const consensus::OpId& /*op_id_*/,
-                           const fs::IOContext* /*io_context*/,
-                           ProbeStats* /*stats*/,
-                           OperationResultPB* /*result*/) override {
+  Status MutateRow(Timestamp /*timestamp*/,
+                   const RowSetKeyProbe& /*probe*/,
+                   const RowChangeList& /*update*/,
+                   const consensus::OpId& /*op_id_*/,
+                   const fs::IOContext* /*io_context*/,
+                   ProbeStats* /*stats*/,
+                   OperationResultPB* /*result*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status NewRowIterator(const RowIteratorOptions& /*opts*/,
-                                std::unique_ptr<RowwiseIterator>* /*out*/) 
const override {
+  Status NewRowIterator(const RowIteratorOptions& /*opts*/,
+                        std::unique_ptr<RowwiseIterator>* /*out*/) const 
override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status NewCompactionInput(const Schema* /*projection*/,
-                                    const MvccSnapshot& /*snap*/,
-                                    const fs::IOContext* /*io_context*/,
-                                    std::unique_ptr<CompactionInput>* /*out*/) 
const override {
+  Status NewCompactionInput(const Schema* /*projection*/,
+                            const MvccSnapshot& /*snap*/,
+                            const fs::IOContext* /*io_context*/,
+                            std::unique_ptr<CompactionInput>* /*out*/) const 
override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* 
/*count*/) const override {
+  Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* /*count*/) 
const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountLiveRows(uint64_t* /*count*/) const override {
+  Status CountLiveRows(uint64_t* /*count*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual std::string ToString() const override {
+  std::string ToString() const override {
     LOG(FATAL) << "Unimplemented";
     return "";
   }
-  virtual Status DebugDump(std::vector<std::string>* /*lines*/) override {
+  Status DebugDump(std::vector<std::string>* /*lines*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status Delete() {
-    LOG(FATAL) << "Unimplemented";
-    return Status::OK();
-  }
-  virtual uint64_t OnDiskSize() const override {
+  uint64_t OnDiskSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataSize() const override {
+  uint64_t OnDiskBaseDataSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const 
override {
+  uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const override 
{
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
+  uint64_t OnDiskBaseDataSizeWithRedos() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual std::mutex *compact_flush_lock() override {
+  std::mutex *compact_flush_lock() override {
     LOG(FATAL) << "Unimplemented";
     return nullptr;
   }
-  virtual bool has_been_compacted() const override {
+  bool has_been_compacted() const override {
     LOG(FATAL) << "Unimplemented";
     return false;
   }
-  virtual void set_has_been_compacted() override {
+  void set_has_been_compacted() override {
     LOG(FATAL) << "Unimplemented";
   }
-  virtual std::shared_ptr<RowSetMetadata> metadata() override {
+  std::shared_ptr<RowSetMetadata> metadata() override {
     return nullptr;
   }
 
-  virtual size_t DeltaMemStoreSize() const override {
+  size_t DeltaMemStoreSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual bool DeltaMemStoreEmpty() const override {
+  bool DeltaMemStoreInfo(size_t* /*size_bytes*/, MonoTime* /*creation_time*/) 
const override {
+    LOG(FATAL) << "Unimplemented";
+    return false;
+  }
+
+  bool DeltaMemStoreEmpty() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual int64_t MinUnflushedLogIndex() const override {
+  int64_t MinUnflushedLogIndex() const override {
     LOG(FATAL) << "Unimplemented";
     return -1;
   }
 
-  virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType 
/*type*/)
-      const override {
+  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType 
/*type*/) const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
+  Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) 
override {
+  Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) override 
{
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
-                                          bool* /*deleted_and_ancient*/) 
override {
+  Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                  bool* /*deleted_and_ancient*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
/*ancient_history_mark*/,
-                                                             int64_t* 
/*bytes*/) override {
+  Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
/*ancient_history_mark*/,
+                                                     int64_t* /*bytes*/) 
override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
-                                MonoTime /*deadline*/,
-                                const fs::IOContext* /*io_context*/,
-                                int64_t* /*delta_blocks_initialized*/,
-                                int64_t* /*bytes_in_ancient_undos*/) override {
+  Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
+                        MonoTime /*deadline*/,
+                        const fs::IOContext* /*io_context*/,
+                        int64_t* /*delta_blocks_initialized*/,
+                        int64_t* /*bytes_in_ancient_undos*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
-                                         const fs::IOContext* /*io_context*/,
-                                         int64_t* /*blocks_deleted*/,
-                                         int64_t* /*bytes_deleted*/) override {
+  Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                 const fs::IOContext* /*io_context*/,
+                                 int64_t* /*blocks_deleted*/,
+                                 int64_t* /*bytes_deleted*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual bool IsAvailableForCompaction() override {
+  bool IsAvailableForCompaction() override {
     return true;
   }
 };
@@ -191,30 +191,30 @@ class MockDiskRowSet : public MockRowSet {
         size_(size),
         column_size_(column_size) {}
 
-  virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const override {
+  Status GetBounds(std::string* min_encoded_key,
+                   std::string* max_encoded_key) const override {
     *min_encoded_key = first_key_;
     *max_encoded_key = last_key_;
     return Status::OK();
   }
 
-  virtual uint64_t OnDiskSize() const override {
+  uint64_t OnDiskSize() const override {
     return size_;
   }
 
-  virtual uint64_t OnDiskBaseDataSize() const override {
+  uint64_t OnDiskBaseDataSize() const override {
     return size_;
   }
 
-  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const 
override {
+  uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const override 
{
     return column_size_;
   }
 
-  virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
+  uint64_t OnDiskBaseDataSizeWithRedos() const override {
     return size_;
   }
 
-  virtual std::string ToString() const override {
+  std::string ToString() const override {
     return strings::Substitute("mock[$0, $1]",
                                Slice(first_key_).ToDebugString(),
                                Slice(last_key_).ToDebugString());
@@ -230,8 +230,8 @@ class MockDiskRowSet : public MockRowSet {
 // Mock which acts like a MemRowSet and has no known bounds.
 class MockMemRowSet : public MockRowSet {
  public:
-  virtual Status GetBounds(std::string* /*min_encoded_key*/,
-                           std::string* /*max_encoded_key*/) const override {
+  Status GetBounds(std::string* /*min_encoded_key*/,
+                   std::string* /*max_encoded_key*/) const override {
     return Status::NotSupported("");
   }
 
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 8887cb8..6969d5c 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -177,7 +177,7 @@ class RowSet {
 
   // Dump the full contents of this rowset, for debugging.
   // This is very verbose so only useful within unit tests.
-  virtual Status DebugDump(std::vector<std::string> *lines = NULL) = 0;
+  virtual Status DebugDump(std::vector<std::string> *lines = nullptr) = 0;
 
   // Return the size of this rowset on disk, in bytes.
   virtual uint64_t OnDiskSize() const = 0;
@@ -206,6 +206,9 @@ class RowSet {
 
   virtual bool DeltaMemStoreEmpty() const = 0;
 
+  // Get the size and creation time of the DMS, returning false if none exists.
+  virtual bool DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) 
const = 0;
+
   // Get the minimum log index corresponding to unflushed data in this row set.
   virtual int64_t MinUnflushedLogIndex() const = 0;
 
@@ -389,75 +392,79 @@ class DuplicatingRowSet : public RowSet {
                            const consensus::OpId& op_id,
                            const fs::IOContext* io_context,
                            ProbeStats* stats,
-                           OperationResultPB* result) OVERRIDE;
+                           OperationResultPB* result) override;
 
   Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext* 
io_context,
-                         bool *present, ProbeStats* stats) const OVERRIDE;
+                         bool *present, ProbeStats* stats) const override;
 
   virtual Status NewRowIterator(const RowIteratorOptions& opts,
-                                std::unique_ptr<RowwiseIterator>* out) const 
OVERRIDE;
+                                std::unique_ptr<RowwiseIterator>* out) const 
override;
 
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
                                     const fs::IOContext* io_context,
-                                    std::unique_ptr<CompactionInput>* out) 
const OVERRIDE;
+                                    std::unique_ptr<CompactionInput>* out) 
const override;
 
-  Status CountRows(const fs::IOContext* io_context, rowid_t *count) const 
OVERRIDE;
+  Status CountRows(const fs::IOContext* io_context, rowid_t *count) const 
override;
 
-  virtual Status CountLiveRows(uint64_t* count) const OVERRIDE;
+  virtual Status CountLiveRows(uint64_t* count) const override;
 
   virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const OVERRIDE;
+                           std::string* max_encoded_key) const override;
 
   // Return the total size on-disk of this rowset, in bytes.
-  uint64_t OnDiskSize() const OVERRIDE;
+  uint64_t OnDiskSize() const override;
 
   // Return the total size on-disk of this rowset's data (i.e. excludes 
metadata), in bytes.
-  uint64_t OnDiskBaseDataSize() const OVERRIDE;
+  uint64_t OnDiskBaseDataSize() const override;
 
   // Return the total size on-disk of this rowset's column data, in bytes.
-  uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE;
+  uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const override;
 
   // Return the size, in bytes, of this rowset's data, not including UNDOs.
-  uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE;
+  uint64_t OnDiskBaseDataSizeWithRedos() const override;
 
-  std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
-  virtual Status DebugDump(std::vector<std::string> *lines = NULL) OVERRIDE;
+  virtual Status DebugDump(std::vector<std::string> *lines = nullptr) override;
 
-  std::shared_ptr<RowSetMetadata> metadata() OVERRIDE;
+  std::shared_ptr<RowSetMetadata> metadata() override;
 
   // A flush-in-progress rowset should never be selected for compaction.
-  std::mutex *compact_flush_lock() OVERRIDE {
+  std::mutex *compact_flush_lock() override {
     LOG(FATAL) << "Cannot be compacted";
-    return NULL;
+    return nullptr;
   }
 
-  virtual bool IsAvailableForCompaction() OVERRIDE {
+  virtual bool IsAvailableForCompaction() override {
     return false;
   }
 
-  virtual bool has_been_compacted() const OVERRIDE {
+  virtual bool has_been_compacted() const override {
     return false;
   }
 
-  virtual void set_has_been_compacted() OVERRIDE {
+  virtual void set_has_been_compacted() override {
     LOG(FATAL) << "Cannot be compacted";
   }
 
   ~DuplicatingRowSet();
 
-  size_t DeltaMemStoreSize() const OVERRIDE { return 0; }
+  size_t DeltaMemStoreSize() const override { return 0; }
+
+  bool DeltaMemStoreInfo(size_t* /*size_bytes*/, MonoTime* /*creation_time*/) 
const override {
+    return false;
+  }
 
-  bool DeltaMemStoreEmpty() const OVERRIDE { return true; }
+  bool DeltaMemStoreEmpty() const override { return true; }
 
-  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) 
const OVERRIDE {
+  double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType 
/*type*/) const override {
     return 0;
   }
 
-  int64_t MinUnflushedLogIndex() const OVERRIDE { return -1; }
+  int64_t MinUnflushedLogIndex() const override { return -1; }
 
-  Status FlushDeltas(const fs::IOContext* /*io_context*/) OVERRIDE {
+  Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
     // It's important that DuplicatingRowSet does not FlushDeltas. This 
prevents
     // a bug where we might end up with out-of-order deltas. See the long
     // comment in Tablet::Flush(...)
@@ -465,7 +472,7 @@ class DuplicatingRowSet : public RowSet {
   }
 
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp 
/*ancient_history_mark*/,
-                                                     int64_t* bytes) OVERRIDE {
+                                                     int64_t* bytes) override {
     DCHECK(bytes);
     *bytes = 0;
     return Status::OK();
@@ -482,7 +489,7 @@ class DuplicatingRowSet : public RowSet {
                         MonoTime /*deadline*/,
                         const fs::IOContext* /*io_context*/,
                         int64_t* delta_blocks_initialized,
-                        int64_t* bytes_in_ancient_undos) OVERRIDE {
+                        int64_t* bytes_in_ancient_undos) override {
     if (delta_blocks_initialized) *delta_blocks_initialized = 0;
     if (bytes_in_ancient_undos) *bytes_in_ancient_undos = 0;
     return Status::OK();
@@ -490,14 +497,14 @@ class DuplicatingRowSet : public RowSet {
 
   Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
                                  const fs::IOContext* /*io_context*/,
-                                 int64_t* blocks_deleted, int64_t* 
bytes_deleted) OVERRIDE {
+                                 int64_t* blocks_deleted, int64_t* 
bytes_deleted) override {
     if (blocks_deleted) *blocks_deleted = 0;
     if (bytes_deleted) *bytes_deleted = 0;
     return Status::OK();
   }
 
   Status MinorCompactDeltaStores(
-      const fs::IOContext* /*io_context*/) OVERRIDE { return Status::OK(); }
+      const fs::IOContext* /*io_context*/) override { return Status::OK(); }
 
  private:
   friend class Tablet;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 0bbda9b..3198782 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1740,7 +1740,7 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompaction &input,
                                     "duplicate updates in new rowsets)",
                                     op_name);
   shared_ptr<DuplicatingRowSet> inprogress_rowset(
-    new DuplicatingRowSet(input.rowsets(), new_disk_rowsets));
+      new DuplicatingRowSet(input.rowsets(), new_disk_rowsets));
 
   // The next step is to swap in the DuplicatingRowSet, and at the same time,
   // determine an MVCC snapshot which includes all of the ops that saw a
@@ -2196,20 +2196,6 @@ bool Tablet::DeltaMemRowSetEmpty() const {
   return true;
 }
 
-void Tablet::GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
-                                      int64_t* mem_size, int64_t* replay_size) 
const {
-  shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
-
-  if (rowset) {
-    *replay_size = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
-                                         replay_size_map);
-    *mem_size = rowset->DeltaMemStoreSize();
-  } else {
-    *replay_size = 0;
-    *mem_size = 0;
-  }
-}
-
 Status Tablet::FlushBestDMS(const ReplaySizeMap &replay_size_map) const {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
@@ -2220,34 +2206,53 @@ Status Tablet::FlushBestDMS(const ReplaySizeMap 
&replay_size_map) const {
   return Status::OK();
 }
 
-shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const ReplaySizeMap& 
replay_size_map) const {
+shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const ReplaySizeMap& 
replay_size_map,
+                                              int64_t* mem_size, int64_t* 
replay_size,
+                                              MonoTime* earliest_dms_time) 
const {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
-  int64_t mem_size = 0;
+  int64_t max_dms_size = 0;
   double max_score = 0;
   double mem_weight = 0;
+  int64_t dms_replay_size = 0;
+  MonoTime earliest_creation_time = MonoTime::Max();
   // If system is under memory pressure, we use the percentage of the hard 
limit consumed
   // as mem_weight, so the tighter memory, the higher weight. Otherwise just 
left the
   // mem_weight to 0.
   process_memory::UnderMemoryPressure(&mem_weight);
 
   shared_ptr<RowSet> best_dms;
-  for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
-    if (rowset->DeltaMemStoreEmpty()) {
+  for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
+    size_t dms_size_bytes;
+    MonoTime creation_time;
+    if (!rowset->DeltaMemStoreInfo(&dms_size_bytes, &creation_time)) {
       continue;
     }
-    int64_t size = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
-                                         replay_size_map);
-    int64_t mem = rowset->DeltaMemStoreSize();
-    double score = mem * mem_weight + size * (100 - mem_weight);
-
+    earliest_creation_time = std::min(earliest_creation_time, creation_time);
+    int64_t replay_size_bytes = 
GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
+                                                      replay_size_map);
+    // To facilitate memory-based flushing when under memory pressure, we
+    // define a score that's part memory and part WAL retention bytes.
+    double score = dms_size_bytes * mem_weight + replay_size_bytes * (100 - 
mem_weight);
     if ((score > max_score) ||
-        (score > max_score - 1 && mem > mem_size)) {
+        // If the score is close to the max, as a tie-breaker, just look at the
+        // DMS size.
+        (score > max_score - 1 && dms_size_bytes > max_dms_size)) {
       max_score = score;
-      mem_size = mem;
+      max_dms_size = dms_size_bytes;
+      dms_replay_size = replay_size_bytes;
       best_dms = rowset;
     }
   }
+  if (earliest_dms_time) {
+    *earliest_dms_time = earliest_creation_time;
+  }
+  if (mem_size) {
+    *mem_size = max_dms_size;
+  }
+  if (replay_size) {
+    *replay_size = dms_replay_size;
+  }
   return best_dms;
 }
 
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 227ceea..728dcea 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -292,11 +292,6 @@ class Tablet {
   // Same as MemRowSetEmpty(), but for the DMS.
   bool DeltaMemRowSetEmpty() const;
 
-  // Fills in the in-memory size and replay size in bytes for the DMS with the
-  // highest retention.
-  void GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
-                                int64_t* mem_size, int64_t* replay_size) const;
-
   // Flushes the DMS with the highest retention.
   Status FlushBestDMS(const ReplaySizeMap &replay_size_map) const;
 
@@ -500,6 +495,15 @@ class Tablet {
   // This method is not thread safe and should only be called from a single 
thread at once.
   double CollectAndUpdateWorkloadStats(MaintenanceOp::PerfImprovementOpType 
type);
 
+  // Returns the best DMS to flush, based on its memory size and retained
+  // bytes. Also returns the earliest creation time of a DMS seen. Note that
+  // 'mem_size' and 'replay_size' correspond to the same DMS but
+  // 'earliest_dms_time' may not.
+  std::shared_ptr<RowSet> FindBestDMSToFlush(const ReplaySizeMap& 
replay_size_map,
+                                             int64_t* mem_size = nullptr,
+                                             int64_t* replay_size = nullptr,
+                                             MonoTime* earliest_dms_time = 
nullptr) const;
+
  private:
   friend class kudu::AlterTableTest;
   friend class Iterator;
@@ -688,9 +692,6 @@ class Tablet {
 
   Status CheckRowInTablet(const ConstContiguousRow& row) const;
 
-  // Helper method to find the rowset that has the DMS with the highest 
retention.
-  std::shared_ptr<RowSet> FindBestDMSToFlush(const ReplaySizeMap& 
replay_size_map) const;
-
   // Helper method to find how many bytes need to be replayed to restore 
in-memory
   // state from this index.
   static int64_t GetReplaySizeForIndex(int64_t min_log_index,
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc 
b/src/kudu/tablet/tablet_replica_mm_ops.cc
index ea53453..d832851 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 
@@ -120,11 +121,13 @@ void 
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
     DCHECK_GE(extra_mb, 0);
     stats->set_perf_improvement(std::max(1.0, extra_mb));
   } else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) {
-    // Even if we aren't over the threshold, consider flushing if we haven't 
flushed
-    // in a long time. But, don't give it a large perf_improvement score. We 
should
-    // only do this if we really don't have much else to do, and if we've 
already waited a bit.
-    // The following will give an improvement that's between 0.0 and 1.0, 
gradually growing as
-    // 'elapsed_ms' approaches 'upper_bound_ms' or 'anchored_mb' approaches 
'threshold_mb'.
+    // Even if we aren't over the threshold, consider flushing if we have
+    // mem-stores that are older with respect to the time threshold. But, don't
+    // give it a large perf_improvement score. We should only do this if we
+    // really don't have much else to do, and if we've already waited a bit.
+    // The following will give an improvement that's between 0.0 and 1.0,
+    // gradually growing as 'elapsed_ms' approaches 'upper_bound_ms' or
+    // 'anchored_mb' approaches 'threshold_mb'.
     double perf = std::max(elapsed_ms / upper_bound_ms, anchored_mb / 
threshold_mb);
     stats->set_perf_improvement(std::min(1.0, perf));
   }
@@ -238,16 +241,17 @@ void 
FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
     return;
   }
 
-  std::lock_guard<simple_spinlock> l(lock_);
-  int64_t dms_size;
-  int64_t retention_size;
   map<int64_t, int64_t> max_idx_to_replay_size;
   if (tablet_replica_->tablet()->DeltaMemRowSetEmpty() ||
       !tablet_replica_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
     return;
   }
-  tablet_replica_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
-                                                   &dms_size, &retention_size);
+  int64_t dms_size;
+  int64_t retention_size;
+  MonoTime earliest_dms_time = MonoTime::Max();
+  tablet_replica_->tablet()->FindBestDMSToFlush(max_idx_to_replay_size,
+                                                &dms_size, &retention_size,
+                                                &earliest_dms_time);
 
   stats->set_ram_anchored(dms_size);
   stats->set_runnable(true);
@@ -259,9 +263,12 @@ void 
FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
     stats->set_workload_score(workload_score);
   }
 
+  const auto now = MonoTime::Now();
+  const auto time_since_earliest_update_ms = now > earliest_dms_time ?
+      (now - earliest_dms_time).ToMilliseconds() : 0;
   FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
       stats,
-      time_since_flush_.elapsed().wall_millis());
+      time_since_earliest_update_ms);
 }
 
 void FlushDeltaMemStoresOp::Perform() {
@@ -279,10 +286,6 @@ void FlushDeltaMemStoresOp::Perform() {
                                     << s.ToString();
     return;
   }
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    time_since_flush_.start();
-  }
 }
 
 scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h 
b/src/kudu/tablet/tablet_replica_mm_ops.h
index fd1dd5b..3ffa7b2 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.h
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -39,7 +39,7 @@ class FlushOpPerfImprovementPolicy {
   ~FlushOpPerfImprovementPolicy() {}
 
   // Sets the performance improvement based on the anchored ram if it's over 
the threshold,
-  // else it will set it based on how long it has been since the last flush.
+  // else it will set it based on how long the mem-store has been non-empty.
   static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double 
elapsed_ms);
 
  private:
@@ -91,9 +91,7 @@ class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
     : TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)",
                                        
tablet_replica->tablet()->tablet_id().c_str()),
                           MaintenanceOp::HIGH_IO_USAGE,
-                          tablet_replica) {
-    time_since_flush_.start();
-  }
+                          tablet_replica) {}
 
   void UpdateStats(MaintenanceOpStats* stats) override;
 
@@ -106,11 +104,6 @@ class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
   scoped_refptr<Histogram> DurationHistogram() const override;
 
   scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
-
- private:
-  // Lock protecting time_since_flush_
-  mutable simple_spinlock lock_;
-  Stopwatch time_since_flush_;
 };
 
 // Maintenance task that runs log GC. Reports log retention that represents 
the amount of data
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 74ccd03..eb1cd06 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -4332,6 +4332,41 @@ TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
   }
 }
 
+// Regression test for KUDU-3195 that ensures that as long as there are DMSs
+// older than the flush threshold, we will schedule DMS flushes.
+TEST_F(TabletServerTest, TestTimeBasedFlushDMS) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  // We're going to generate a bunch of DMSs, and for that, we need multiple
+  // rowsets, so disable merge compactions.
+  FLAGS_enable_rowset_compaction = false;
+
+  constexpr const int kNumRowsets = 100;
+  constexpr const int kRowsPerRowset = 1;
+  for (int i = 0; i < kNumRowsets; i++) {
+    NO_FATALS(InsertTestRowsDirect(i * kRowsPerRowset, kRowsPerRowset));
+    ASSERT_OK(tablet_replica_->tablet()->Flush());
+  }
+  for (int i = 0; i < kNumRowsets * kRowsPerRowset; i++) {
+    NO_FATALS(UpdateTestRowRemote(i, 0));
+  }
+  ASSERT_FALSE(tablet_replica_->tablet()->DeltaMemRowSetEmpty());
+
+  // We want to see how quickly we can flush DMSs, so speed up time-based
+  // flushing.
+  FLAGS_enable_maintenance_manager = true;
+  FLAGS_maintenance_manager_polling_interval_ms = 1;
+  FLAGS_flush_threshold_secs = 1;
+  NO_FATALS(ShutdownAndRebuildTablet());
+  // We should be able to wait for the flush threshold and very quickly see all
+  // DMSs get flushed. We'll wait 5x the flush threshold, which should be ample
+  // time given how little we're writing.
+  // NOTE: There are 100 DMSs to flush -- we shouldn't be doing anything silly
+  // like waiting a full flush threshold between flushes, which would take 100
+  // seconds!
+  SleepFor(MonoDelta::FromSeconds(5 * FLAGS_flush_threshold_secs));
+  ASSERT_TRUE(tablet_replica_->tablet()->DeltaMemRowSetEmpty());
+}
+
 TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   FLAGS_enable_maintenance_manager = true;

Reply via email to