This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 640a84e KUDU-3195: flush when any DMS in the tablet is older than the
time threshold
640a84e is described below
commit 640a84ecff857c3d0447c690c68e2361eb3e9c3b
Author: Andrew Wong <[email protected]>
AuthorDate: Sun Oct 11 23:08:06 2020 -0700
KUDU-3195: flush when any DMS in the tablet is older than the time threshold
Currently each tablet will wait at least 2 minutes (controlled by
--flush_threshold_secs) between flushing DMSs, even if there are several
DMSs that are older than 2 minutes in a given tablet. This means that
for tablets with several dozen rowsets and updates across the entire
tablet, it could take hours to flush all the deltas.
Rather than waiting for 2 minutes since the last flush time before
considering time-based flushing, this patch tracks the creation time of
every DMS and flushes as long as there is a DMS that is older than 2
minutes in the tablet.
Change-Id: Id05202bf6a4685f4d79db11ef8ebb0f91f6316b4
Reviewed-on: http://gerrit.cloudera.org:8080/16581
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/delta_tracker.cc | 15 ++++
src/kudu/tablet/delta_tracker.h | 8 ++
src/kudu/tablet/deltamemstore.cc | 1 +
src/kudu/tablet/deltamemstore.h | 7 +-
src/kudu/tablet/diskrowset.cc | 5 ++
src/kudu/tablet/diskrowset.h | 2 +
src/kudu/tablet/memrowset.h | 4 +
src/kudu/tablet/mock-rowsets.h | 124 +++++++++++++++----------------
src/kudu/tablet/rowset.h | 65 ++++++++--------
src/kudu/tablet/tablet.cc | 57 +++++++-------
src/kudu/tablet/tablet.h | 17 +++--
src/kudu/tablet/tablet_replica_mm_ops.cc | 33 ++++----
src/kudu/tablet/tablet_replica_mm_ops.h | 11 +--
src/kudu/tserver/tablet_server-test.cc | 35 +++++++++
14 files changed, 234 insertions(+), 150 deletions(-)
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 2f8bbcf..a8d2aae 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -863,6 +863,21 @@ Status DeltaTracker::Flush(const IOContext* io_context,
MetadataFlushType flush_
return Status::OK();
}
+bool DeltaTracker::GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime*
creation_time) const {
+ // Check dms_exists_ first to avoid unnecessary contention on
+ // component_lock_. We need to check again after taking the lock in case we
+ // raced with a DMS flush.
+ if (dms_exists_.Load()) {
+ shared_lock<rw_spinlock> lock(component_lock_);
+ if (dms_exists_.Load()) {
+ *size_bytes = dms_->EstimateSize();
+ *creation_time = dms_->creation_time();
+ return true;
+ }
+ }
+ return false;
+}
+
size_t DeltaTracker::DeltaMemStoreSize() const {
shared_lock<rw_spinlock> lock(component_lock_);
return dms_exists_.Load() ? dms_->EstimateSize() : 0;
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index dfec6a3..e021ec8 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -235,6 +235,14 @@ class DeltaTracker {
const fs::IOContext* io_context,
DeltaType type);
+ // Returns true if a DMS exists, returning its size in bytes and the time at
+ // which it was created. Otherwise, returns false and doesn't update the
+ // input pointers.
+ //
+ // NOTE: we lazily create the DMS, so the creation time corresponds to the
+ // age of the oldest update to the rowset.
+ bool GetDeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const;
+
// Get the delta MemStore's size in bytes, including pre-allocation.
size_t DeltaMemStoreSize() const;
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 3575a3c..494ad89 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -72,6 +72,7 @@ DeltaMemStore::DeltaMemStore(int64_t id,
shared_ptr<MemTracker> parent_tracker)
: id_(id),
rs_id_(rs_id),
+ creation_time_(MonoTime::Now()),
highest_timestamp_(Timestamp::kMin),
allocator_(new MemoryTrackingBufferAllocator(
HeapBufferAllocator::Get(), std::move(parent_tracker))),
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index a7822a5..333a307 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -24,6 +24,7 @@
#include <vector>
#include <boost/optional/optional.hpp>
+#include <boost/type_traits/decay.hpp>
#include "kudu/common/rowid.h"
#include "kudu/common/timestamp.h"
@@ -38,6 +39,7 @@
#include "kudu/util/locks.h"
#include "kudu/util/make_shared.h"
#include "kudu/util/memory/arena.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
@@ -130,7 +132,8 @@ class DeltaMemStore : public DeltaStore,
return arena_->memory_footprint();
}
- const int64_t id() const { return id_; }
+ int64_t id() const { return id_; }
+ const MonoTime& creation_time() const { return creation_time_; }
typedef btree::CBTree<DMSTreeTraits> DMSTree;
typedef btree::CBTreeIterator<DMSTreeTraits> DMSTreeIter;
@@ -176,6 +179,8 @@ class DeltaMemStore : public DeltaStore,
const int64_t id_; // DeltaMemStore ID.
const int64_t rs_id_; // Rowset ID.
+ const MonoTime creation_time_;
+
mutable simple_spinlock ts_lock_;
Timestamp highest_timestamp_;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index a761c48..b5959ff 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -808,6 +808,11 @@ bool DiskRowSet::DeltaMemStoreEmpty() const {
return delta_tracker_->DeltaMemStoreEmpty();
}
+bool DiskRowSet::DeltaMemStoreInfo(size_t* size_bytes, MonoTime*
creation_time) const {
+ DCHECK(open_);
+ return delta_tracker_->GetDeltaMemStoreInfo(size_bytes, creation_time);
+}
+
int64_t DiskRowSet::MinUnflushedLogIndex() const {
DCHECK(open_);
return delta_tracker_->MinUnflushedLogIndex();
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 709bee2..a6b076f 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -390,6 +390,8 @@ class DiskRowSet :
size_t DeltaMemStoreSize() const override;
+ bool DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const
override;
+
bool DeltaMemStoreEmpty() const override;
int64_t MinUnflushedLogIndex() const override;
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 5b67666..3b3fbcb 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -377,6 +377,10 @@ class MemRowSet : public RowSet,
size_t DeltaMemStoreSize() const override { return 0; }
+ bool DeltaMemStoreInfo(size_t* /*size_bytes*/, MonoTime* /*creation_time*/)
const override {
+ return false;
+ }
+
bool DeltaMemStoreEmpty() const override { return true; }
int64_t MinUnflushedLogIndex() const override {
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index b3b8930..a0ae341 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -37,146 +37,146 @@ namespace tablet {
// Mock implementation of RowSet which just aborts on every call.
class MockRowSet : public RowSet {
public:
- virtual Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
- const fs::IOContext* /*io_context*/,
- bool* /*present*/, ProbeStats* /*stats*/)
const override {
+ Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
+ const fs::IOContext* /*io_context*/,
+ bool* /*present*/, ProbeStats* /*stats*/) const
override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status MutateRow(Timestamp /*timestamp*/,
- const RowSetKeyProbe& /*probe*/,
- const RowChangeList& /*update*/,
- const consensus::OpId& /*op_id_*/,
- const fs::IOContext* /*io_context*/,
- ProbeStats* /*stats*/,
- OperationResultPB* /*result*/) override {
+ Status MutateRow(Timestamp /*timestamp*/,
+ const RowSetKeyProbe& /*probe*/,
+ const RowChangeList& /*update*/,
+ const consensus::OpId& /*op_id_*/,
+ const fs::IOContext* /*io_context*/,
+ ProbeStats* /*stats*/,
+ OperationResultPB* /*result*/) override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status NewRowIterator(const RowIteratorOptions& /*opts*/,
- std::unique_ptr<RowwiseIterator>* /*out*/)
const override {
+ Status NewRowIterator(const RowIteratorOptions& /*opts*/,
+ std::unique_ptr<RowwiseIterator>* /*out*/) const
override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status NewCompactionInput(const Schema* /*projection*/,
- const MvccSnapshot& /*snap*/,
- const fs::IOContext* /*io_context*/,
- std::unique_ptr<CompactionInput>* /*out*/)
const override {
+ Status NewCompactionInput(const Schema* /*projection*/,
+ const MvccSnapshot& /*snap*/,
+ const fs::IOContext* /*io_context*/,
+ std::unique_ptr<CompactionInput>* /*out*/) const
override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t*
/*count*/) const override {
+ Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* /*count*/)
const override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status CountLiveRows(uint64_t* /*count*/) const override {
+ Status CountLiveRows(uint64_t* /*count*/) const override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual std::string ToString() const override {
+ std::string ToString() const override {
LOG(FATAL) << "Unimplemented";
return "";
}
- virtual Status DebugDump(std::vector<std::string>* /*lines*/) override {
+ Status DebugDump(std::vector<std::string>* /*lines*/) override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status Delete() {
- LOG(FATAL) << "Unimplemented";
- return Status::OK();
- }
- virtual uint64_t OnDiskSize() const override {
+ uint64_t OnDiskSize() const override {
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual uint64_t OnDiskBaseDataSize() const override {
+ uint64_t OnDiskBaseDataSize() const override {
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const
override {
+ uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const override
{
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
+ uint64_t OnDiskBaseDataSizeWithRedos() const override {
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual std::mutex *compact_flush_lock() override {
+ std::mutex *compact_flush_lock() override {
LOG(FATAL) << "Unimplemented";
return nullptr;
}
- virtual bool has_been_compacted() const override {
+ bool has_been_compacted() const override {
LOG(FATAL) << "Unimplemented";
return false;
}
- virtual void set_has_been_compacted() override {
+ void set_has_been_compacted() override {
LOG(FATAL) << "Unimplemented";
}
- virtual std::shared_ptr<RowSetMetadata> metadata() override {
+ std::shared_ptr<RowSetMetadata> metadata() override {
return nullptr;
}
- virtual size_t DeltaMemStoreSize() const override {
+ size_t DeltaMemStoreSize() const override {
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual bool DeltaMemStoreEmpty() const override {
+ bool DeltaMemStoreInfo(size_t* /*size_bytes*/, MonoTime* /*creation_time*/)
const override {
+ LOG(FATAL) << "Unimplemented";
+ return false;
+ }
+
+ bool DeltaMemStoreEmpty() const override {
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual int64_t MinUnflushedLogIndex() const override {
+ int64_t MinUnflushedLogIndex() const override {
LOG(FATAL) << "Unimplemented";
return -1;
}
- virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType
/*type*/)
- const override {
+ double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType
/*type*/) const override {
LOG(FATAL) << "Unimplemented";
return 0;
}
- virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
+ Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/)
override {
+ Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) override
{
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
- bool* /*deleted_and_ancient*/)
override {
+ Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+ bool* /*deleted_and_ancient*/) override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp
/*ancient_history_mark*/,
- int64_t*
/*bytes*/) override {
+ Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp
/*ancient_history_mark*/,
+ int64_t* /*bytes*/)
override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
- MonoTime /*deadline*/,
- const fs::IOContext* /*io_context*/,
- int64_t* /*delta_blocks_initialized*/,
- int64_t* /*bytes_in_ancient_undos*/) override {
+ Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
+ MonoTime /*deadline*/,
+ const fs::IOContext* /*io_context*/,
+ int64_t* /*delta_blocks_initialized*/,
+ int64_t* /*bytes_in_ancient_undos*/) override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
- const fs::IOContext* /*io_context*/,
- int64_t* /*blocks_deleted*/,
- int64_t* /*bytes_deleted*/) override {
+ Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+ const fs::IOContext* /*io_context*/,
+ int64_t* /*blocks_deleted*/,
+ int64_t* /*bytes_deleted*/) override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
- virtual bool IsAvailableForCompaction() override {
+ bool IsAvailableForCompaction() override {
return true;
}
};
@@ -191,30 +191,30 @@ class MockDiskRowSet : public MockRowSet {
size_(size),
column_size_(column_size) {}
- virtual Status GetBounds(std::string* min_encoded_key,
- std::string* max_encoded_key) const override {
+ Status GetBounds(std::string* min_encoded_key,
+ std::string* max_encoded_key) const override {
*min_encoded_key = first_key_;
*max_encoded_key = last_key_;
return Status::OK();
}
- virtual uint64_t OnDiskSize() const override {
+ uint64_t OnDiskSize() const override {
return size_;
}
- virtual uint64_t OnDiskBaseDataSize() const override {
+ uint64_t OnDiskBaseDataSize() const override {
return size_;
}
- virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const
override {
+ uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const override
{
return column_size_;
}
- virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
+ uint64_t OnDiskBaseDataSizeWithRedos() const override {
return size_;
}
- virtual std::string ToString() const override {
+ std::string ToString() const override {
return strings::Substitute("mock[$0, $1]",
Slice(first_key_).ToDebugString(),
Slice(last_key_).ToDebugString());
@@ -230,8 +230,8 @@ class MockDiskRowSet : public MockRowSet {
// Mock which acts like a MemRowSet and has no known bounds.
class MockMemRowSet : public MockRowSet {
public:
- virtual Status GetBounds(std::string* /*min_encoded_key*/,
- std::string* /*max_encoded_key*/) const override {
+ Status GetBounds(std::string* /*min_encoded_key*/,
+ std::string* /*max_encoded_key*/) const override {
return Status::NotSupported("");
}
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 8887cb8..6969d5c 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -177,7 +177,7 @@ class RowSet {
// Dump the full contents of this rowset, for debugging.
// This is very verbose so only useful within unit tests.
- virtual Status DebugDump(std::vector<std::string> *lines = NULL) = 0;
+ virtual Status DebugDump(std::vector<std::string> *lines = nullptr) = 0;
// Return the size of this rowset on disk, in bytes.
virtual uint64_t OnDiskSize() const = 0;
@@ -206,6 +206,9 @@ class RowSet {
virtual bool DeltaMemStoreEmpty() const = 0;
+ // Get the size and creation time of the DMS, returning false if none exists.
+ virtual bool DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time)
const = 0;
+
// Get the minimum log index corresponding to unflushed data in this row set.
virtual int64_t MinUnflushedLogIndex() const = 0;
@@ -389,75 +392,79 @@ class DuplicatingRowSet : public RowSet {
const consensus::OpId& op_id,
const fs::IOContext* io_context,
ProbeStats* stats,
- OperationResultPB* result) OVERRIDE;
+ OperationResultPB* result) override;
Status CheckRowPresent(const RowSetKeyProbe &probe, const fs::IOContext*
io_context,
- bool *present, ProbeStats* stats) const OVERRIDE;
+ bool *present, ProbeStats* stats) const override;
virtual Status NewRowIterator(const RowIteratorOptions& opts,
- std::unique_ptr<RowwiseIterator>* out) const
OVERRIDE;
+ std::unique_ptr<RowwiseIterator>* out) const
override;
virtual Status NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionInput>* out)
const OVERRIDE;
+ std::unique_ptr<CompactionInput>* out)
const override;
- Status CountRows(const fs::IOContext* io_context, rowid_t *count) const
OVERRIDE;
+ Status CountRows(const fs::IOContext* io_context, rowid_t *count) const
override;
- virtual Status CountLiveRows(uint64_t* count) const OVERRIDE;
+ virtual Status CountLiveRows(uint64_t* count) const override;
virtual Status GetBounds(std::string* min_encoded_key,
- std::string* max_encoded_key) const OVERRIDE;
+ std::string* max_encoded_key) const override;
// Return the total size on-disk of this rowset, in bytes.
- uint64_t OnDiskSize() const OVERRIDE;
+ uint64_t OnDiskSize() const override;
// Return the total size on-disk of this rowset's data (i.e. excludes
metadata), in bytes.
- uint64_t OnDiskBaseDataSize() const OVERRIDE;
+ uint64_t OnDiskBaseDataSize() const override;
// Return the total size on-disk of this rowset's column data, in bytes.
- uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE;
+ uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const override;
// Return the size, in bytes, of this rowset's data, not including UNDOs.
- uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE;
+ uint64_t OnDiskBaseDataSizeWithRedos() const override;
- std::string ToString() const OVERRIDE;
+ std::string ToString() const override;
- virtual Status DebugDump(std::vector<std::string> *lines = NULL) OVERRIDE;
+ virtual Status DebugDump(std::vector<std::string> *lines = nullptr) override;
- std::shared_ptr<RowSetMetadata> metadata() OVERRIDE;
+ std::shared_ptr<RowSetMetadata> metadata() override;
// A flush-in-progress rowset should never be selected for compaction.
- std::mutex *compact_flush_lock() OVERRIDE {
+ std::mutex *compact_flush_lock() override {
LOG(FATAL) << "Cannot be compacted";
- return NULL;
+ return nullptr;
}
- virtual bool IsAvailableForCompaction() OVERRIDE {
+ virtual bool IsAvailableForCompaction() override {
return false;
}
- virtual bool has_been_compacted() const OVERRIDE {
+ virtual bool has_been_compacted() const override {
return false;
}
- virtual void set_has_been_compacted() OVERRIDE {
+ virtual void set_has_been_compacted() override {
LOG(FATAL) << "Cannot be compacted";
}
~DuplicatingRowSet();
- size_t DeltaMemStoreSize() const OVERRIDE { return 0; }
+ size_t DeltaMemStoreSize() const override { return 0; }
+
+ bool DeltaMemStoreInfo(size_t* /*size_bytes*/, MonoTime* /*creation_time*/)
const override {
+ return false;
+ }
- bool DeltaMemStoreEmpty() const OVERRIDE { return true; }
+ bool DeltaMemStoreEmpty() const override { return true; }
- double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type)
const OVERRIDE {
+ double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType
/*type*/) const override {
return 0;
}
- int64_t MinUnflushedLogIndex() const OVERRIDE { return -1; }
+ int64_t MinUnflushedLogIndex() const override { return -1; }
- Status FlushDeltas(const fs::IOContext* /*io_context*/) OVERRIDE {
+ Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
// It's important that DuplicatingRowSet does not FlushDeltas. This
prevents
// a bug where we might end up with out-of-order deltas. See the long
// comment in Tablet::Flush(...)
@@ -465,7 +472,7 @@ class DuplicatingRowSet : public RowSet {
}
Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp
/*ancient_history_mark*/,
- int64_t* bytes) OVERRIDE {
+ int64_t* bytes) override {
DCHECK(bytes);
*bytes = 0;
return Status::OK();
@@ -482,7 +489,7 @@ class DuplicatingRowSet : public RowSet {
MonoTime /*deadline*/,
const fs::IOContext* /*io_context*/,
int64_t* delta_blocks_initialized,
- int64_t* bytes_in_ancient_undos) OVERRIDE {
+ int64_t* bytes_in_ancient_undos) override {
if (delta_blocks_initialized) *delta_blocks_initialized = 0;
if (bytes_in_ancient_undos) *bytes_in_ancient_undos = 0;
return Status::OK();
@@ -490,14 +497,14 @@ class DuplicatingRowSet : public RowSet {
Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
const fs::IOContext* /*io_context*/,
- int64_t* blocks_deleted, int64_t*
bytes_deleted) OVERRIDE {
+ int64_t* blocks_deleted, int64_t*
bytes_deleted) override {
if (blocks_deleted) *blocks_deleted = 0;
if (bytes_deleted) *bytes_deleted = 0;
return Status::OK();
}
Status MinorCompactDeltaStores(
- const fs::IOContext* /*io_context*/) OVERRIDE { return Status::OK(); }
+ const fs::IOContext* /*io_context*/) override { return Status::OK(); }
private:
friend class Tablet;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 0bbda9b..3198782 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1740,7 +1740,7 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
"duplicate updates in new rowsets)",
op_name);
shared_ptr<DuplicatingRowSet> inprogress_rowset(
- new DuplicatingRowSet(input.rowsets(), new_disk_rowsets));
+ new DuplicatingRowSet(input.rowsets(), new_disk_rowsets));
// The next step is to swap in the DuplicatingRowSet, and at the same time,
// determine an MVCC snapshot which includes all of the ops that saw a
@@ -2196,20 +2196,6 @@ bool Tablet::DeltaMemRowSetEmpty() const {
return true;
}
-void Tablet::GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
- int64_t* mem_size, int64_t* replay_size)
const {
- shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
-
- if (rowset) {
- *replay_size = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
- replay_size_map);
- *mem_size = rowset->DeltaMemStoreSize();
- } else {
- *replay_size = 0;
- *mem_size = 0;
- }
-}
-
Status Tablet::FlushBestDMS(const ReplaySizeMap &replay_size_map) const {
RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
@@ -2220,34 +2206,53 @@ Status Tablet::FlushBestDMS(const ReplaySizeMap
&replay_size_map) const {
return Status::OK();
}
-shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const ReplaySizeMap&
replay_size_map) const {
+shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const ReplaySizeMap&
replay_size_map,
+ int64_t* mem_size, int64_t*
replay_size,
+ MonoTime* earliest_dms_time)
const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
- int64_t mem_size = 0;
+ int64_t max_dms_size = 0;
double max_score = 0;
double mem_weight = 0;
+ int64_t dms_replay_size = 0;
+ MonoTime earliest_creation_time = MonoTime::Max();
// If system is under memory pressure, we use the percentage of the hard
limit consumed
// as mem_weight, so the tighter memory, the higher weight. Otherwise just
left the
// mem_weight to 0.
process_memory::UnderMemoryPressure(&mem_weight);
shared_ptr<RowSet> best_dms;
- for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
- if (rowset->DeltaMemStoreEmpty()) {
+ for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
+ size_t dms_size_bytes;
+ MonoTime creation_time;
+ if (!rowset->DeltaMemStoreInfo(&dms_size_bytes, &creation_time)) {
continue;
}
- int64_t size = GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
- replay_size_map);
- int64_t mem = rowset->DeltaMemStoreSize();
- double score = mem * mem_weight + size * (100 - mem_weight);
-
+ earliest_creation_time = std::min(earliest_creation_time, creation_time);
+ int64_t replay_size_bytes =
GetReplaySizeForIndex(rowset->MinUnflushedLogIndex(),
+ replay_size_map);
+ // To facilitate memory-based flushing when under memory pressure, we
+ // define a score that's part memory and part WAL retention bytes.
+ double score = dms_size_bytes * mem_weight + replay_size_bytes * (100 -
mem_weight);
if ((score > max_score) ||
- (score > max_score - 1 && mem > mem_size)) {
+ // If the score is close to the max, as a tie-breaker, just look at the
+ // DMS size.
+ (score > max_score - 1 && dms_size_bytes > max_dms_size)) {
max_score = score;
- mem_size = mem;
+ max_dms_size = dms_size_bytes;
+ dms_replay_size = replay_size_bytes;
best_dms = rowset;
}
}
+ if (earliest_dms_time) {
+ *earliest_dms_time = earliest_creation_time;
+ }
+ if (mem_size) {
+ *mem_size = max_dms_size;
+ }
+ if (replay_size) {
+ *replay_size = dms_replay_size;
+ }
return best_dms;
}
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 227ceea..728dcea 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -292,11 +292,6 @@ class Tablet {
// Same as MemRowSetEmpty(), but for the DMS.
bool DeltaMemRowSetEmpty() const;
- // Fills in the in-memory size and replay size in bytes for the DMS with the
- // highest retention.
- void GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
- int64_t* mem_size, int64_t* replay_size) const;
-
// Flushes the DMS with the highest retention.
Status FlushBestDMS(const ReplaySizeMap &replay_size_map) const;
@@ -500,6 +495,15 @@ class Tablet {
// This method is not thread safe and should only be called from a single
thread at once.
double CollectAndUpdateWorkloadStats(MaintenanceOp::PerfImprovementOpType
type);
+ // Returns the best DMS to flush, based on its memory size and retained
+ // bytes. Also returns the earliest creation time of a DMS seen. Note that
+ // 'mem_size' and 'replay_size' correspond to the same DMS but
+ // 'earliest_dms_time' may not.
+ std::shared_ptr<RowSet> FindBestDMSToFlush(const ReplaySizeMap&
replay_size_map,
+ int64_t* mem_size = nullptr,
+ int64_t* replay_size = nullptr,
+ MonoTime* earliest_dms_time =
nullptr) const;
+
private:
friend class kudu::AlterTableTest;
friend class Iterator;
@@ -688,9 +692,6 @@ class Tablet {
Status CheckRowInTablet(const ConstContiguousRow& row) const;
- // Helper method to find the rowset that has the DMS with the highest
retention.
- std::shared_ptr<RowSet> FindBestDMSToFlush(const ReplaySizeMap&
replay_size_map) const;
-
// Helper method to find how many bytes need to be replayed to restore
in-memory
// state from this index.
static int64_t GetReplaySizeForIndex(int64_t min_log_index,
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc
b/src/kudu/tablet/tablet_replica_mm_ops.cc
index ea53453..d832851 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -36,6 +36,7 @@
#include "kudu/util/logging.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
@@ -120,11 +121,13 @@ void
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
DCHECK_GE(extra_mb, 0);
stats->set_perf_improvement(std::max(1.0, extra_mb));
} else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) {
- // Even if we aren't over the threshold, consider flushing if we haven't
flushed
- // in a long time. But, don't give it a large perf_improvement score. We
should
- // only do this if we really don't have much else to do, and if we've
already waited a bit.
- // The following will give an improvement that's between 0.0 and 1.0,
gradually growing as
- // 'elapsed_ms' approaches 'upper_bound_ms' or 'anchored_mb' approaches
'threshold_mb'.
+ // Even if we aren't over the threshold, consider flushing if we have
+ // mem-stores that are older with respect to the time threshold. But, don't
+ // give it a large perf_improvement score. We should only do this if we
+ // really don't have much else to do, and if we've already waited a bit.
+ // The following will give an improvement that's between 0.0 and 1.0,
+ // gradually growing as 'elapsed_ms' approaches 'upper_bound_ms' or
+ // 'anchored_mb' approaches 'threshold_mb'.
double perf = std::max(elapsed_ms / upper_bound_ms, anchored_mb /
threshold_mb);
stats->set_perf_improvement(std::min(1.0, perf));
}
@@ -238,16 +241,17 @@ void
FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
return;
}
- std::lock_guard<simple_spinlock> l(lock_);
- int64_t dms_size;
- int64_t retention_size;
map<int64_t, int64_t> max_idx_to_replay_size;
if (tablet_replica_->tablet()->DeltaMemRowSetEmpty() ||
!tablet_replica_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
return;
}
- tablet_replica_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
- &dms_size, &retention_size);
+ int64_t dms_size;
+ int64_t retention_size;
+ MonoTime earliest_dms_time = MonoTime::Max();
+ tablet_replica_->tablet()->FindBestDMSToFlush(max_idx_to_replay_size,
+ &dms_size, &retention_size,
+ &earliest_dms_time);
stats->set_ram_anchored(dms_size);
stats->set_runnable(true);
@@ -259,9 +263,12 @@ void
FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
stats->set_workload_score(workload_score);
}
+ const auto now = MonoTime::Now();
+ const auto time_since_earliest_update_ms = now > earliest_dms_time ?
+ (now - earliest_dms_time).ToMilliseconds() : 0;
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
stats,
- time_since_flush_.elapsed().wall_millis());
+ time_since_earliest_update_ms);
}
void FlushDeltaMemStoresOp::Perform() {
@@ -279,10 +286,6 @@ void FlushDeltaMemStoresOp::Perform() {
<< s.ToString();
return;
}
- {
- std::lock_guard<simple_spinlock> l(lock_);
- time_since_flush_.start();
- }
}
scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h
b/src/kudu/tablet/tablet_replica_mm_ops.h
index fd1dd5b..3ffa7b2 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.h
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -39,7 +39,7 @@ class FlushOpPerfImprovementPolicy {
~FlushOpPerfImprovementPolicy() {}
// Sets the performance improvement based on the anchored ram if it's over
the threshold,
- // else it will set it based on how long it has been since the last flush.
+ // else it will set it based on how long the mem-store has been non-empty.
static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double
elapsed_ms);
private:
@@ -91,9 +91,7 @@ class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
: TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::HIGH_IO_USAGE,
- tablet_replica) {
- time_since_flush_.start();
- }
+ tablet_replica) {}
void UpdateStats(MaintenanceOpStats* stats) override;
@@ -106,11 +104,6 @@ class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
scoped_refptr<Histogram> DurationHistogram() const override;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
-
- private:
- // Lock protecting time_since_flush_
- mutable simple_spinlock lock_;
- Stopwatch time_since_flush_;
};
// Maintenance task that runs log GC. Reports log retention that represents
the amount of data
diff --git a/src/kudu/tserver/tablet_server-test.cc
b/src/kudu/tserver/tablet_server-test.cc
index 74ccd03..eb1cd06 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -4332,6 +4332,41 @@ TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
}
}
+// Regression test for KUDU-3195 that ensures that as long as there are DMSs
+// older than the flush threshold, we will schedule DMS flushes.
+TEST_F(TabletServerTest, TestTimeBasedFlushDMS) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+ // We're going to generate a bunch of DMSs, and for that, we need multiple
+ // rowsets, so disable merge compactions.
+ FLAGS_enable_rowset_compaction = false;
+
+ constexpr const int kNumRowsets = 100;
+ constexpr const int kRowsPerRowset = 1;
+ for (int i = 0; i < kNumRowsets; i++) {
+ NO_FATALS(InsertTestRowsDirect(i * kRowsPerRowset, kRowsPerRowset));
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ }
+ for (int i = 0; i < kNumRowsets * kRowsPerRowset; i++) {
+ NO_FATALS(UpdateTestRowRemote(i, 0));
+ }
+ ASSERT_FALSE(tablet_replica_->tablet()->DeltaMemRowSetEmpty());
+
+ // We want to see how quickly we can flush DMSs, so speed up time-based
+ // flushing.
+ FLAGS_enable_maintenance_manager = true;
+ FLAGS_maintenance_manager_polling_interval_ms = 1;
+ FLAGS_flush_threshold_secs = 1;
+ NO_FATALS(ShutdownAndRebuildTablet());
+ // We should be able to wait for the flush threshold and very quickly see all
+ // DMSs get flushed. We'll wait 5x the flush threshold, which should be ample
+ // time given how little we're writing.
+ // NOTE: There are 100 DMSs to flush -- we shouldn't be doing anything silly
+ // like waiting a full flush threshold between flushes, which would take 100
+ // seconds!
+ SleepFor(MonoDelta::FromSeconds(5 * FLAGS_flush_threshold_secs));
+ ASSERT_TRUE(tablet_replica_->tablet()->DeltaMemRowSetEmpty());
+}
+
TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
SKIP_IF_SLOW_NOT_ALLOWED();
FLAGS_enable_maintenance_manager = true;