This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch branch-1.11.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.11.x by this push:
     new b52bf50  [metrics] Hide metric live_row_count when tablet not support
b52bf50 is described below

commit b52bf5047ba58e5c108e997516cf28b14806934e
Author: Yingchun Lai <[email protected]>
AuthorDate: Fri Oct 18 14:08:51 2019 +0800

    [metrics] Hide metric live_row_count when tablet not support
    
    When upgrade an tserver from version older than 1.11, and add some
    partitions for an existing table, then the table will hold some tablets
    that support live row counting together with some ones that not support.
    Tablet which not support live row counting has value of -1, in this
    case, metrics merge on tserver and metrics aggregate on master have
    problems.
    This patch add feature to validate metric when happend to see an invalid
    metric when MergeFrom, and disable 'live_row_count' metric of a table
    when any tablet of a table not support live rows counting on master.
    
    Change-Id: I2a54704d8cbd64a521e65aa3e95bf1a68f7757b7
    Reviewed-on: http://gerrit.cloudera.org:8080/14507
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Kudu Jenkins
    (cherry picked from commit 67fbb7c0367e12629da8a14a21362c0e256e1f89)
    Reviewed-on: http://gerrit.cloudera.org:8080/14534
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../integration-tests/ts_tablet_manager-itest.cc   |   2 +
 src/kudu/master/catalog_manager.cc                 |  56 +++++++---
 src/kudu/master/catalog_manager.h                  |   9 +-
 src/kudu/master/master_path_handlers.cc            |   6 +-
 src/kudu/master/table_metrics.cc                   |  29 ++++-
 src/kudu/master/table_metrics.h                    |  17 ++-
 src/kudu/tablet/compaction-test.cc                 |   4 +-
 src/kudu/tablet/diskrowset.cc                      |   4 +-
 src/kudu/tablet/diskrowset.h                       |   2 +-
 src/kudu/tablet/memrowset-test.cc                  |   2 +-
 src/kudu/tablet/memrowset.h                        |   5 +-
 src/kudu/tablet/metadata.proto                     |   4 +-
 src/kudu/tablet/mock-rowsets.h                     |   2 +-
 src/kudu/tablet/mt-tablet-test.cc                  |   2 +-
 src/kudu/tablet/rowset.cc                          |   4 +-
 src/kudu/tablet/rowset.h                           |   4 +-
 src/kudu/tablet/tablet-test.cc                     |   4 +-
 src/kudu/tablet/tablet.cc                          |   6 +-
 src/kudu/tablet/tablet.h                           |   2 +-
 src/kudu/tablet/tablet_replica-test.cc             |   4 +-
 src/kudu/tablet/tablet_replica.cc                  |  41 ++++---
 src/kudu/tablet/tablet_replica.h                   |  10 +-
 src/kudu/tserver/ts_tablet_manager.cc              |   2 +-
 src/kudu/tserver/tserver_path_handlers.cc          |   5 +-
 src/kudu/util/metrics.cc                           |  31 ++++--
 src/kudu/util/metrics.h                            | 124 ++++++++++++++++-----
 26 files changed, 270 insertions(+), 111 deletions(-)

diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc 
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 36300bc..2e40920 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -755,6 +755,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
     NO_FATALS(GetLeaderMasterAndRun(live_row_count, [&] (
       TableInfo* table_info, int64_t live_row_count) {
         ASSERT_EVENTUALLY([&] () {
+          ASSERT_TRUE(table_info->GetMetrics()->TableSupportsLiveRowCount());
           ASSERT_EQ(live_row_count, 
table_info->GetMetrics()->live_row_count->value());
         });
       }));
@@ -838,6 +839,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
         ASSERT_STR_NOT_CONTAINS(metric_attrs_str, kTableName);
         ASSERT_STR_CONTAINS(metric_attrs_str, kNewTableName);
         ASSERT_EQ(table->id(), table_info->metric_entity_->id());
+        ASSERT_TRUE(table_info->GetMetrics()->TableSupportsLiveRowCount());
         ASSERT_EQ(live_row_count, 
table_info->GetMetrics()->live_row_count->value());
       }));
   }
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 411906c..a93afc3 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2934,14 +2934,16 @@ Status CatalogManager::GetTableStatistics(const 
GetTableStatisticsRequestPB* req
   RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, 
authz_func, user,
                                           &table, &l));
 
-  int64_t on_disk_size = table->GetMetrics()->on_disk_size->value();
-  int64_t live_row_count = table->GetMetrics()->live_row_count->value();
-  if (FLAGS_mock_table_metrics_for_testing) {
-    on_disk_size = FLAGS_on_disk_size_for_testing;
-    live_row_count = FLAGS_live_row_count_for_testing;
-  }
-  resp->set_on_disk_size(on_disk_size);
-  resp->set_live_row_count(live_row_count);
+  if (PREDICT_FALSE(FLAGS_mock_table_metrics_for_testing)) {
+    resp->set_on_disk_size(FLAGS_on_disk_size_for_testing);
+    resp->set_live_row_count(FLAGS_live_row_count_for_testing);
+  } else {
+    resp->set_on_disk_size(table->GetMetrics()->on_disk_size->value());
+    if (table->GetMetrics()->TableSupportsLiveRowCount()) {
+      resp->set_live_row_count(table->GetMetrics()->live_row_count->value());
+    }
+  }
+
   return Status::OK();
 }
 
@@ -4243,11 +4245,11 @@ Status CatalogManager::ProcessTabletReport(
 
     // 10. Process the report's tablet statistics.
     if (report.has_stats() && report.has_consensus_state()) {
-      DCHECK(ts_desc->permanent_uuid() == 
report.consensus_state().leader_uuid());
+      DCHECK_EQ(ts_desc->permanent_uuid(), 
report.consensus_state().leader_uuid());
 
       // Now the tserver only reports the LEADER replicas its own.
       // First, update table's stats.
-      tablet->table()->UpdateMetrics(tablet->GetStats(), report.stats());
+      tablet->table()->UpdateMetrics(tablet_id, tablet->GetStats(), 
report.stats());
       // Then, update tablet's stats.
       tablet->UpdateStats(report.stats());
     }
@@ -5417,8 +5419,8 @@ void TableInfo::AddRemoveTablets(const 
vector<scoped_refptr<TabletInfo>>& tablet
     const auto& lower_bound = 
tablet->metadata().state().pb.partition().partition_key_start();
     CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
     DecrementSchemaVersionCountUnlocked(tablet->reported_schema_version());
-    // Update the table metrics for the deleted tablets.
-    UpdateMetrics(tablet->GetStats(), ReportedTabletStatsPB());
+    // Remove the table metrics for the deleted tablets.
+    RemoveMetrics(tablet->id(), tablet->GetStats());
   }
   for (const auto& tablet : tablets_to_add) {
     TabletInfo* old = nullptr;
@@ -5569,15 +5571,35 @@ void TableInfo::UnregisterMetrics() {
   }
 }
 
-void TableInfo::UpdateMetrics(const tablet::ReportedTabletStatsPB& old_stats,
+void TableInfo::UpdateMetrics(const string& tablet_id,
+                              const tablet::ReportedTabletStatsPB& old_stats,
                               const tablet::ReportedTabletStatsPB& new_stats) {
   if (metrics_) {
-    metrics_->on_disk_size->IncrementBy(new_stats.on_disk_size() - 
old_stats.on_disk_size());
-    if (new_stats.live_row_count() >= 0) {
+    metrics_->on_disk_size->IncrementBy(
+        static_cast<int64_t>(new_stats.on_disk_size())
+        - static_cast<int64_t>(old_stats.on_disk_size()));
+    if (new_stats.has_live_row_count()) {
+      DCHECK(!metrics_->ContainsTabletNoLiveRowCount(tablet_id));
       metrics_->live_row_count->IncrementBy(
-          new_stats.live_row_count() - old_stats.live_row_count());
+          static_cast<int64_t>(new_stats.live_row_count())
+          - static_cast<int64_t>(old_stats.live_row_count()));
+    } else if (!old_stats.has_on_disk_size()) {
+      // The new reported stat doesn't support 'live_row_count' and
+      // this is the first report stat of the tablet.
+      metrics_->AddTabletNoLiveRowCount(tablet_id);
+    }
+  }
+}
+
+void TableInfo::RemoveMetrics(const string& tablet_id,
+                              const tablet::ReportedTabletStatsPB& old_stats) {
+  if (metrics_) {
+    
metrics_->on_disk_size->IncrementBy(-static_cast<int64_t>(old_stats.on_disk_size()));
+    if (old_stats.has_live_row_count()) {
+      DCHECK(!metrics_->ContainsTabletNoLiveRowCount(tablet_id));
+      
metrics_->live_row_count->IncrementBy(-static_cast<int64_t>(old_stats.live_row_count()));
     } else {
-      metrics_->live_row_count->set_value(-1);
+      metrics_->DeleteTabletNoLiveRowCount(tablet_id);
     }
   }
 }
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index 2dd529f..242d99b 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -322,10 +322,15 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
   // Unregister metrics for the table.
   void UnregisterMetrics();
 
-  // Update the metrics.
-  void UpdateMetrics(const tablet::ReportedTabletStatsPB& old_stats,
+  // Update stats belonging to 'tablet_id' in the table's metrics.
+  void UpdateMetrics(const std::string& tablet_id,
+                     const tablet::ReportedTabletStatsPB& old_stats,
                      const tablet::ReportedTabletStatsPB& new_stats);
 
+  // Remove stats belonging to 'tablet_id' from table metrics.
+  void RemoveMetrics(const std::string& tablet_id,
+                     const tablet::ReportedTabletStatsPB& old_stats);
+
   // Update the attributes of the metrics.
   void UpdateMetricsAttrs(const std::string& new_table_name);
 
diff --git a/src/kudu/master/master_path_handlers.cc 
b/src/kudu/master/master_path_handlers.cc
index f261312..aa9aeb5 100644
--- a/src/kudu/master/master_path_handlers.cc
+++ b/src/kudu/master/master_path_handlers.cc
@@ -42,7 +42,6 @@
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/quorum_util.h"
-#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
@@ -494,9 +493,8 @@ void MasterPathHandlers::HandleTablePage(const 
Webserver::WebRequest& req,
     // But the value of disk size will never be negative.
     (*output)["table_disk_size"] =
         HumanReadableNumBytes::ToString(table_metrics->on_disk_size->value());
-    int64 live_row_count = table_metrics->live_row_count->value();
-    if (live_row_count >= 0) {
-      (*output)["table_live_row_count"] = live_row_count;
+    if (table_metrics->TableSupportsLiveRowCount()) {
+      (*output)["table_live_row_count"] = 
table_metrics->live_row_count->value();
     } else {
       (*output)["table_live_row_count"] = "N/A";
     }
diff --git a/src/kudu/master/table_metrics.cc b/src/kudu/master/table_metrics.cc
index b259034..30aa7a4 100644
--- a/src/kudu/master/table_metrics.cc
+++ b/src/kudu/master/table_metrics.cc
@@ -16,19 +16,22 @@
 // under the License.
 #include "kudu/master/table_metrics.h"
 
+#include <mutex>
+
+#include "kudu/gutil/map-util.h"
 
 namespace kudu {
 namespace master {
 
 // Table-specific stats.
-METRIC_DEFINE_gauge_int64(table, on_disk_size, "Table Size On Disk",
+METRIC_DEFINE_gauge_uint64(table, on_disk_size, "Table Size On Disk",
     kudu::MetricUnit::kBytes,
     "Pre-replication aggregated disk space used by all tablets in this table, "
     "including metadata.");
-METRIC_DEFINE_gauge_int64(table, live_row_count, "Table Live Row count",
+METRIC_DEFINE_gauge_uint64(table, live_row_count, "Table Live Row count",
     kudu::MetricUnit::kRows,
     "Pre-replication aggregated number of live rows in this table. "
-    "When the table doesn't support live row counting, -1 will be returned.");
+    "Only accurate if all tablets in the table support live row counting.");
 
 #define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
 
@@ -38,5 +41,25 @@ TableMetrics::TableMetrics(const 
scoped_refptr<MetricEntity>& entity)
 }
 #undef GINIT
 
+void TableMetrics::AddTabletNoLiveRowCount(const std::string& tablet_id) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  InsertIfNotPresent(&tablet_ids_no_live_row_count_, tablet_id);
+}
+
+void TableMetrics::DeleteTabletNoLiveRowCount(const std::string& tablet_id) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  tablet_ids_no_live_row_count_.erase(tablet_id);
+}
+
+bool TableMetrics::ContainsTabletNoLiveRowCount(const std::string& tablet_id) 
const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return ContainsKey(tablet_ids_no_live_row_count_, tablet_id);
+}
+
+bool TableMetrics::TableSupportsLiveRowCount() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return tablet_ids_no_live_row_count_.empty();
+}
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/table_metrics.h b/src/kudu/master/table_metrics.h
index 26591b0..f90f52e 100644
--- a/src/kudu/master/table_metrics.h
+++ b/src/kudu/master/table_metrics.h
@@ -18,8 +18,11 @@
 #define KUDU_MASTER_TABLE_METRICS_H
 
 #include <cstdint>
+#include <string>
+#include <unordered_set>
 
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 
 namespace kudu {
@@ -42,8 +45,18 @@ namespace master {
 struct TableMetrics {
   explicit TableMetrics(const scoped_refptr<MetricEntity>& entity);
 
-  scoped_refptr<AtomicGauge<int64_t>> on_disk_size;
-  scoped_refptr<AtomicGauge<int64_t>> live_row_count;
+  scoped_refptr<AtomicGauge<uint64_t>> on_disk_size;
+  scoped_refptr<AtomicGauge<uint64_t>> live_row_count;
+
+  void AddTabletNoLiveRowCount(const std::string& tablet_id);
+  void DeleteTabletNoLiveRowCount(const std::string& tablet_id);
+  bool ContainsTabletNoLiveRowCount(const std::string& tablet_id) const;
+  bool TableSupportsLiveRowCount() const;
+
+ private:
+  mutable simple_spinlock lock_;
+  // IDs of tablets which do not support reporting live row count.
+  std::unordered_set<std::string> tablet_ids_no_live_row_count_;
 };
 
 } // namespace master
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index c45a61f..4f81f95 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1195,7 +1195,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfMemRowSetFlush) 
{
   NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
   NO_FATALS(DeleteRows(mrs.get(), 50));
   NO_FATALS(InsertRows(mrs.get(), 10, 0));
-  int64_t count = 0;
+  uint64_t count = 0;
   ASSERT_OK(mrs->CountLiveRows(&count));
   ASSERT_EQ(100 - 50 + 10, count);
 
@@ -1250,7 +1250,7 @@ TEST_F(TestCompaction, 
TestCountLiveRowsOfDiskRowSetsCompact) {
   std::random_shuffle(all_rss.begin(), all_rss.end());
   NO_FATALS(CompactAndReopenNoRoll(all_rss, schema_, &result));
 
-  int64_t count = 0;
+  uint64_t count = 0;
   ASSERT_OK(result->CountLiveRows(&count));
   ASSERT_EQ((100 - 50 + 10) * 3, count);
 }
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index f4dd37d..b6a9dc3 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -756,9 +756,9 @@ Status DiskRowSet::CountRows(const IOContext* io_context, 
rowid_t *count) const
   return Status::OK();
 }
 
-Status DiskRowSet::CountLiveRows(int64_t* count) const {
+Status DiskRowSet::CountLiveRows(uint64_t* count) const {
+  DCHECK_GE(rowset_metadata_->live_row_count(), 
delta_tracker_->CountDeletedRows());
   *count = rowset_metadata_->live_row_count() - 
delta_tracker_->CountDeletedRows();
-  DCHECK_GE(*count, 0);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 391d82d..52689a4 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -370,7 +370,7 @@ class DiskRowSet : public RowSet {
   Status CountRows(const fs::IOContext* io_context, rowid_t *count) const 
final override;
 
   // Count the number of live rows in this DRS.
-  virtual Status CountLiveRows(int64_t* count) const override;
+  virtual Status CountLiveRows(uint64_t* count) const override;
 
   // See RowSet::GetBounds(...)
   virtual Status GetBounds(std::string* min_encoded_key,
diff --git a/src/kudu/tablet/memrowset-test.cc 
b/src/kudu/tablet/memrowset-test.cc
index 0e3b363..29d8733 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -823,7 +823,7 @@ TEST_F(TestMemRowSet, TestCountLiveRows) {
                               MemTracker::GetRootTracker(), &mrs));
 
   const auto CheckLiveRowsCount = [&](int64_t expect) {
-    int64_t count = 0;
+    uint64_t count = 0;
     ASSERT_OK(mrs->CountLiveRows(&count));
     ASSERT_EQ(expect, count);
   };
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 89a02ac..dff340c 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -255,9 +255,8 @@ class MemRowSet : public RowSet,
     return Status::OK();
   }
 
-  virtual Status CountLiveRows(int64_t* count) const override {
+  virtual Status CountLiveRows(uint64_t* count) const override {
     *count = live_row_count_.Load();
-    DCHECK_GE(*count, 0);
     return Status::OK();
   }
 
@@ -459,7 +458,7 @@ class MemRowSet : public RowSet,
   std::atomic<bool> has_been_compacted_;
 
   // Number of live rows in this MRS.
-  AtomicInt<int64_t> live_row_count_;
+  AtomicInt<uint64_t> live_row_count_;
 
   DISALLOW_COPY_AND_ASSIGN(MemRowSet);
 };
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index bff3a2e..e3fa281 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -196,6 +196,6 @@ enum TabletStatePB {
 
 // Statistics for a tablet replica.
 message ReportedTabletStatsPB {
-  required int64 on_disk_size = 1;
-  required int64 live_row_count = 2;
+  optional uint64 on_disk_size = 1;
+  optional uint64 live_row_count = 2;
 }
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index c22d44d..719b748 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -70,7 +70,7 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountLiveRows(int64_t* /*count*/) const OVERRIDE {
+  virtual Status CountLiveRows(uint64_t* /*count*/) const OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
diff --git a/src/kudu/tablet/mt-tablet-test.cc 
b/src/kudu/tablet/mt-tablet-test.cc
index dab3913..ac8a5bb 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -415,7 +415,7 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     while (running_insert_count_.count() > 0) {
       num_rowsets_ts->SetValue(tablet()->num_rowsets());
       memrowset_size_ts->SetValue(tablet()->MemRowSetSize() / 1024.0);
-      int64_t num_live_rows;
+      uint64_t num_live_rows;
       ignore_result(tablet()->CountLiveRows(&num_live_rows));
       num_live_rows_ts->SetValue(num_live_rows);
 
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 62c1131..3566fb1 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -230,9 +230,9 @@ Status DuplicatingRowSet::CountRows(const IOContext* 
io_context, rowid_t *count)
   return Status::OK();
 }
 
-Status DuplicatingRowSet::CountLiveRows(int64_t* count) const {
+Status DuplicatingRowSet::CountLiveRows(uint64_t* count) const {
   for (const shared_ptr<RowSet>& rs : old_rowsets_) {
-    int64_t tmp = 0;
+    uint64_t tmp = 0;
     RETURN_NOT_OK(rs->CountLiveRows(&tmp));
     *count += tmp;
   }
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 2cfccaf..e08b515 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -164,7 +164,7 @@ class RowSet {
   virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) 
const = 0;
 
   // Count the number of live rows in this rowset.
-  virtual Status CountLiveRows(int64_t* count) const = 0;
+  virtual Status CountLiveRows(uint64_t* count) const = 0;
 
   // Return the bounds for this RowSet. 'min_encoded_key' and 'max_encoded_key'
   // are set to the first and last encoded keys for this RowSet.
@@ -409,7 +409,7 @@ class DuplicatingRowSet : public RowSet {
 
   Status CountRows(const fs::IOContext* io_context, rowid_t *count) const 
OVERRIDE;
 
-  virtual Status CountLiveRows(int64_t* count) const OVERRIDE;
+  virtual Status CountLiveRows(uint64_t* count) const OVERRIDE;
 
   virtual Status GetBounds(std::string* min_encoded_key,
                            std::string* max_encoded_key) const OVERRIDE;
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index a42d268..6877358 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -100,7 +100,7 @@ public:
   }
 
   void CheckLiveRowsCount(int64_t expect) {
-    int64_t count = 0;
+    uint64_t count = 0;
     ASSERT_OK(this->tablet()->CountLiveRows(&count));
     ASSERT_EQ(expect, count);
   }
@@ -915,7 +915,7 @@ TYPED_TEST(TestTablet, TestCountLiveRowsAfterShutdown) {
   NO_FATALS(this->tablet()->Shutdown());
 
   // Call the CountLiveRows().
-  int64_t count = 0;
+  uint64_t count = 0;
   ASSERT_TRUE(tablet->CountLiveRows(&count).IsRuntimeError());
 }
 
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 5c3db23..8d0d560 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1881,7 +1881,7 @@ Status Tablet::CountRows(uint64_t *count) const {
   return Status::OK();
 }
 
-Status Tablet::CountLiveRows(int64_t* count) const {
+Status Tablet::CountLiveRows(uint64_t* count) const {
   if (!metadata_->supports_live_row_count()) {
     return Status::NotSupported("This tablet doesn't support live row 
counting");
   }
@@ -1892,8 +1892,8 @@ Status Tablet::CountLiveRows(int64_t* count) const {
     return Status::RuntimeError("The tablet has been shut down");
   }
 
-  int64_t ret = 0;
-  int64_t tmp = 0;
+  uint64_t ret = 0;
+  uint64_t tmp = 0;
   RETURN_NOT_OK(comps->memrowset->CountLiveRows(&ret));
   for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
     RETURN_NOT_OK(rowset->CountLiveRows(&tmp));
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index d7bfb8d..bbf7edb 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -348,7 +348,7 @@ class Tablet {
   Status CountRows(uint64_t *count) const;
 
   // Count the number of live rows in this tablet.
-  Status CountLiveRows(int64_t* count) const;
+  Status CountLiveRows(uint64_t* count) const;
 
   // Verbosely dump this entire tablet to the logs. This is only
   // really useful when debugging unit tests failures where the tablet
diff --git a/src/kudu/tablet/tablet_replica-test.cc 
b/src/kudu/tablet/tablet_replica-test.cc
index e41239e..d1c6bcc 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -84,7 +84,7 @@ DECLARE_int32(flush_threshold_mb);
 
 METRIC_DECLARE_entity(tablet);
 
-METRIC_DECLARE_gauge_int64(live_row_count);
+METRIC_DECLARE_gauge_uint64(live_row_count);
 
 using kudu::consensus::CommitMsg;
 using kudu::consensus::ConsensusBootstrapInfo;
@@ -780,7 +780,7 @@ TEST_F(TabletReplicaTest, TestLiveRowCountMetric) {
   ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
 
   auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
-      tablet_replica_->tablet()->GetMetricEntity(), Callback<int64_t(void)>());
+      tablet_replica_->tablet()->GetMetricEntity(), 
Callback<uint64_t(void)>());
   ASSERT_EQ(0, live_row_count->value());
 
   // Insert some rows.
diff --git a/src/kudu/tablet/tablet_replica.cc 
b/src/kudu/tablet/tablet_replica.cc
index 3decd0e..163585b 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -89,11 +89,9 @@ METRIC_DEFINE_gauge_size(tablet, on_disk_size, "Tablet Size 
On Disk",
 METRIC_DEFINE_gauge_string(tablet, state, "Tablet State",
                            kudu::MetricUnit::kState,
                            "State of this tablet.");
-METRIC_DEFINE_gauge_int64(tablet, live_row_count, "Tablet Live Row Count",
-                          kudu::MetricUnit::kRows,
-                          "Number of live rows in this tablet, excludes 
deleted rows. "
-                          "When the tablet doesn't support live row counting, 
-1 will "
-                          "be returned.");
+METRIC_DEFINE_gauge_uint64(tablet, live_row_count, "Tablet Live Row Count",
+                           kudu::MetricUnit::kRows,
+                           "Number of live rows in this tablet, excludes 
deleted rows.");
 
 namespace kudu {
 namespace tablet {
@@ -209,9 +207,14 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& 
bootstrap_info,
         METRIC_state.InstantiateFunctionGauge(
             tablet_->GetMetricEntity(), Bind(&TabletReplica::StateName, 
Unretained(this)))
             ->AutoDetach(&metric_detacher_);
-        METRIC_live_row_count.InstantiateFunctionGauge(
-            tablet_->GetMetricEntity(), Bind(&TabletReplica::CountLiveRows, 
Unretained(this)))
-            ->AutoDetach(&metric_detacher_);
+        if (tablet_->metadata()->supports_live_row_count()) {
+          METRIC_live_row_count.InstantiateFunctionGauge(
+              tablet_->GetMetricEntity(),
+              Bind(&TabletReplica::CountLiveRowsNoFail, Unretained(this)))
+              ->AutoDetach(&metric_detacher_);
+        } else {
+          METRIC_live_row_count.InstantiateInvalid(tablet_->GetMetricEntity(), 
0);
+        }
       }
       txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
 
@@ -805,18 +808,24 @@ size_t TabletReplica::OnDiskSize() const {
   return ret;
 }
 
-int64_t TabletReplica::CountLiveRows() const {
-  int64_t ret = -1;
+Status TabletReplica::CountLiveRows(uint64_t* live_row_count) const {
   shared_ptr<Tablet> tablet;
   {
     std::lock_guard<simple_spinlock> l(lock_);
     tablet = tablet_;
   }
 
-  if (tablet) {
-    ignore_result(tablet->CountLiveRows(&ret));
+  if (!tablet) {
+    return Status::IllegalState("The tablet is shutdown.");
   }
-  return ret;
+
+  return tablet->CountLiveRows(live_row_count);
+}
+
+uint64_t TabletReplica::CountLiveRowsNoFail() const {
+  uint64_t live_row_count = 0;
+  ignore_result(CountLiveRows(&live_row_count));
+  return live_row_count;
 }
 
 void TabletReplica::UpdateTabletStats(vector<string>* dirty_tablets) {
@@ -827,7 +836,11 @@ void TabletReplica::UpdateTabletStats(vector<string>* 
dirty_tablets) {
 
   ReportedTabletStatsPB pb;
   pb.set_on_disk_size(OnDiskSize());
-  pb.set_live_row_count(CountLiveRows());
+  uint64_t live_row_count;
+  Status s = CountLiveRows(&live_row_count);
+  if (s.ok()) {
+    pb.set_live_row_count(live_row_count);
+  }
 
   // We cannot hold 'lock_' while calling RaftConsensus::role() because
   // it may invoke TabletReplica::StartFollowerTransaction() and lead to
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 2d603fd..89338df 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -300,9 +300,13 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   // Return the total on-disk size of this tablet replica, in bytes.
   size_t OnDiskSize() const;
 
-  // Return the number of live rows of this tablet replica.
-  // -1 will be returned if the tablet doesn't support live row counting.
-  int64_t CountLiveRows() const;
+  // Counts the number of live rows in this tablet replica.
+  //
+  // Returns a bad Status on failure.
+  Status CountLiveRows(uint64_t* live_row_count) const;
+
+  // Like CountLiveRows but returns 0 on failure.
+  uint64_t CountLiveRowsNoFail() const;
 
   // Update the tablet stats.
   // When the replica's stats change and it's the LEADER, it is added to
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index 78b6e02..1e71d3c 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1333,7 +1333,7 @@ void TSTabletManager::CreateReportedTabletPB(const 
scoped_refptr<TabletReplica>&
     // If we're the leader, report stats.
     if (cstate.leader_uuid() == fs_manager_->uuid()) {
       ReportedTabletStatsPB stats_pb = replica->GetTabletStats();
-      if (stats_pb.IsInitialized()) {
+      if (stats_pb.has_on_disk_size() || stats_pb.has_live_row_count()) {
         *reported_tablet->mutable_stats() = std::move(stats_pb);
       }
     }
diff --git a/src/kudu/tserver/tserver_path_handlers.cc 
b/src/kudu/tserver/tserver_path_handlers.cc
index d0e7951..414a8e5 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -417,8 +417,9 @@ void TabletServerPathHandlers::HandleTabletPage(const 
Webserver::WebRequest& req
   output->Set("partition",
               
tmeta->partition_schema().PartitionDebugString(tmeta->partition(), schema));
   output->Set("on_disk_size", 
HumanReadableNumBytes::ToString(replica->OnDiskSize()));
-  int64_t live_row_count = replica->CountLiveRows();
-  if (live_row_count >= 0) {
+  uint64_t live_row_count;
+  Status s = replica->CountLiveRows(&live_row_count);
+  if (s.ok()) {
     output->Set("tablet_live_row_count", live_row_count);
   } else {
     output->Set("tablet_live_row_count", "N/A");
diff --git a/src/kudu/util/metrics.cc b/src/kudu/util/metrics.cc
index fc6b538..3dd98da 100644
--- a/src/kudu/util/metrics.cc
+++ b/src/kudu/util/metrics.cc
@@ -674,11 +674,13 @@ StringGauge::StringGauge(const GaugePrototype<string>* 
proto,
 
 scoped_refptr<Metric> StringGauge::snapshot() const {
   std::lock_guard<simple_spinlock> l(lock_);
-  scoped_refptr<Metric> m
-    = new StringGauge(down_cast<const GaugePrototype<string>*>(prototype_),
-                      value_,
-                      unique_values_);
-  return m;
+  auto p = new StringGauge(down_cast<const 
GaugePrototype<string>*>(prototype_),
+                           value_,
+                           unique_values_);
+  p->m_epoch_.store(m_epoch_);
+  p->invalid_for_merge_ = invalid_for_merge_;
+  p->retire_time_ = retire_time_;
+  return scoped_refptr<Metric>(p);
 }
 
 string StringGauge::value() const {
@@ -712,8 +714,12 @@ void StringGauge::MergeFrom(const scoped_refptr<Metric>& 
other) {
   if (PREDICT_FALSE(this == other.get())) {
     return;
   }
-  UpdateModificationEpoch();
 
+  if (InvalidateIfNeededInMerge(other)) {
+    return;
+  }
+
+  UpdateModificationEpoch();
   scoped_refptr<StringGauge> other_ptr = down_cast<StringGauge*>(other.get());
   auto other_values = other_ptr->unique_values();
 
@@ -732,9 +738,11 @@ void StringGauge::WriteValue(JsonWriter* writer) const {
 
 scoped_refptr<Metric> MeanGauge::snapshot() const {
   std::lock_guard<simple_spinlock> l(lock_);
-  scoped_refptr<Metric> m
-    = new MeanGauge(down_cast<const GaugePrototype<double>*>(prototype_));
-  return m;
+  auto p = new MeanGauge(down_cast<const GaugePrototype<double>*>(prototype_));
+  p->m_epoch_.store(m_epoch_);
+  p->invalid_for_merge_ = invalid_for_merge_;
+  p->retire_time_ = retire_time_;
+  return scoped_refptr<Metric>(p);
 }
 
 double MeanGauge::value() const {
@@ -764,6 +772,11 @@ void MeanGauge::MergeFrom(const scoped_refptr<Metric>& 
other) {
     return;
   }
 
+  if (InvalidateIfNeededInMerge(other)) {
+    return;
+  }
+
+  UpdateModificationEpoch();
   scoped_refptr<MeanGauge> other_ptr = down_cast<MeanGauge*>(other.get());
   std::lock_guard<simple_spinlock> l(lock_);
   total_sum_ += other_ptr->total_sum();
diff --git a/src/kudu/util/metrics.h b/src/kudu/util/metrics.h
index eccca0a..4608770 100644
--- a/src/kudu/util/metrics.h
+++ b/src/kudu/util/metrics.h
@@ -731,7 +731,7 @@ class Metric : public RefCountedThreadSafe<Metric> {
   virtual bool IsUntouched() const = 0;
 
   // Return true if this metric has changed in or after the given metrics 
epoch.
-  bool ModifiedInOrAfterEpoch(int64_t epoch) {
+  bool ModifiedInOrAfterEpoch(int64_t epoch) const {
     return m_epoch_ >= epoch;
   }
 
@@ -756,8 +756,6 @@ class Metric : public RefCountedThreadSafe<Metric> {
   explicit Metric(const MetricPrototype* prototype);
   virtual ~Metric();
 
-  const MetricPrototype* const prototype_;
-
   void UpdateModificationEpoch() {
     // If we have some upper bound, we need to invalidate it. We use a 
'test-and-set'
     // here to avoid contending on writes to this cacheline.
@@ -772,6 +770,28 @@ class Metric : public RefCountedThreadSafe<Metric> {
     m_epoch_ = -1;
   }
 
+  // Causes this metric to be skipped during a merge..
+  void InvalidateForMerge() {
+    invalid_for_merge_ = true;
+  }
+
+  // Returns whether the merge of 'other' into this metric should be 
prohibited. If true,
+  // also ensures that this metric is invalidated.
+  bool InvalidateIfNeededInMerge(const scoped_refptr<Metric>& other) {
+    if (invalid_for_merge_) {
+      DCHECK_EQ(m_epoch_, -1);
+      return true;
+    }
+    if (other->invalid_for_merge_) {
+      InvalidateEpoch();
+      invalid_for_merge_ = true;
+      return true;
+    }
+    return false;
+  }
+
+  const MetricPrototype* const prototype_;
+
   // The last metrics epoch in which this metric was modified.
   // We use epochs instead of timestamps since we can ensure that epochs
   // only change rarely. Thus this member is read-mostly and doesn't cause
@@ -779,6 +799,14 @@ class Metric : public RefCountedThreadSafe<Metric> {
   // the system clock, which is more expensive compared to reading 'g_epoch_'.
   std::atomic<int64_t> m_epoch_;
 
+  // Whether this metric is invalid for merge.
+  bool invalid_for_merge_ = false;
+
+  // The time at which we should retire this metric if it is still 
un-referenced outside
+  // of the metrics subsystem. If this metric is not due for retirement, this 
member is
+  // uninitialized.
+  MonoTime retire_time_;
+
  private:
   void UpdateModificationEpochSlowPath();
 
@@ -788,11 +816,6 @@ class Metric : public RefCountedThreadSafe<Metric> {
   friend class GaugePrototype;
   FRIEND_TEST(MetricsTest, TestDumpOnlyChanged);
 
-  // The time at which we should retire this metric if it is still 
un-referenced outside
-  // of the metrics subsystem. If this metric is not due for retirement, this 
member is
-  // uninitialized.
-  MonoTime retire_time_;
-
   // See 'current_epoch()'.
   static std::atomic<int64_t> g_epoch_;
 
@@ -920,6 +943,16 @@ class GaugePrototype : public MetricPrototype {
     return gauge;
   }
 
+  // Instantiate a "manual" gauge and hide it, and it will
+  // invalidate the result when merge with other metric.
+  scoped_refptr<AtomicGauge<T> > InstantiateInvalid(
+      const scoped_refptr<MetricEntity>& entity,
+      const T& initial_value) const {
+    auto gauge = InstantiateHidden(entity, initial_value);
+    gauge->InvalidateForMerge();
+    return gauge;
+  }
+
   virtual MetricType::Type type() const OVERRIDE {
     if (args_.flags_ & EXPOSE_AS_COUNTER) {
       return MetricType::kCounter;
@@ -1011,9 +1044,11 @@ class AtomicGauge : public Gauge {
       value_(initial_value) {
   }
   scoped_refptr<Metric> snapshot() const override {
-    scoped_refptr<Metric> m = new AtomicGauge(down_cast<const 
GaugePrototype<T>*>(prototype_),
-                                              value());
-    return m;
+    auto p = new AtomicGauge(down_cast<const GaugePrototype<T>*>(prototype_), 
value());
+    p->m_epoch_.store(m_epoch_);
+    p->invalid_for_merge_ = invalid_for_merge_;
+    p->retire_time_ = retire_time_;
+    return scoped_refptr<Metric>(p);
   }
   T value() const {
     return static_cast<T>(value_.Load(kMemOrderRelease));
@@ -1039,9 +1074,15 @@ class AtomicGauge : public Gauge {
     return false;
   }
   void MergeFrom(const scoped_refptr<Metric>& other) override {
-    if (PREDICT_TRUE(this != other.get())) {
-      IncrementBy(down_cast<AtomicGauge<T>*>(other.get())->value());
+    if (PREDICT_FALSE(this == other.get())) {
+      return;
+    }
+
+    if (InvalidateIfNeededInMerge(other)) {
+      return;
     }
+
+    IncrementBy(down_cast<AtomicGauge<T>*>(other.get())->value());
   }
  protected:
   virtual void WriteValue(JsonWriter* writer) const OVERRIDE {
@@ -1114,13 +1155,16 @@ template <typename T>
 class FunctionGauge : public Gauge {
  public:
   scoped_refptr<Metric> snapshot() const override {
-    scoped_refptr<Metric> m = new FunctionGauge(down_cast<const 
GaugePrototype<T>*>(prototype_),
-                                                Callback<T()>(function_));
+    auto p = new FunctionGauge(down_cast<const GaugePrototype<T>*>(prototype_),
+                               Callback<T()>(function_));
     // The bounded function is associated with another MetricEntity instance, 
here we don't know
     // when it release, it's not safe to keep the function as a member, so 
it's needed to
     // call DetachToCurrentValue() to make it safe.
-    down_cast<FunctionGauge<T>*>(m.get())->DetachToCurrentValue();
-    return m;
+    p->DetachToCurrentValue();
+    p->m_epoch_.store(m_epoch_);
+    p->invalid_for_merge_ = invalid_for_merge_;
+    p->retire_time_ = retire_time_;
+    return scoped_refptr<Metric>(p);
   }
 
   T value() const {
@@ -1174,9 +1218,13 @@ class FunctionGauge : public Gauge {
 
   // value() will be constant after MergeFrom()
   void MergeFrom(const scoped_refptr<Metric>& other) override {
-    if (PREDICT_TRUE(this != other.get())) {
-      DetachToConstant(value() + 
down_cast<FunctionGauge<T>*>(other.get())->value());
+    if (PREDICT_FALSE(this == other.get())) {
+      return;
     }
+
+    // It's not needed to check whether a FunctionGauge is 
InvalidateIfNeededInMerge
+    // or not, because it's always 'touched' after constructing.
+    DetachToConstant(value() + 
down_cast<FunctionGauge<T>*>(other.get())->value());
   }
 
  private:
@@ -1221,9 +1269,12 @@ class CounterPrototype : public MetricPrototype {
 class Counter : public Metric {
  public:
   scoped_refptr<Metric> snapshot() const override {
-    scoped_refptr<Metric> m = new Counter(down_cast<const 
CounterPrototype*>(prototype_));
-    down_cast<Counter*>(m.get())->IncrementBy(value());
-    return m;
+    auto p = new Counter(down_cast<const CounterPrototype*>(prototype_));
+    p->IncrementBy(value());
+    p->m_epoch_.store(m_epoch_);
+    p->invalid_for_merge_ = invalid_for_merge_;
+    p->retire_time_ = retire_time_;
+    return scoped_refptr<Metric>(p);
   }
   int64_t value() const;
   void Increment();
@@ -1236,9 +1287,15 @@ class Counter : public Metric {
   }
 
   void MergeFrom(const scoped_refptr<Metric>& other) override {
-    if (PREDICT_TRUE(this != other.get())) {
-      IncrementBy(down_cast<Counter*>(other.get())->value());
+    if (PREDICT_FALSE(this == other.get())) {
+      return;
     }
+
+    if (InvalidateIfNeededInMerge(other)) {
+      return;
+    }
+
+    IncrementBy(down_cast<Counter *>(other.get())->value());
   }
 
  private:
@@ -1272,9 +1329,11 @@ class HistogramPrototype : public MetricPrototype {
 class Histogram : public Metric {
  public:
   scoped_refptr<Metric> snapshot() const override {
-    scoped_refptr<Metric> m = new Histogram(down_cast<const 
HistogramPrototype*>(prototype_),
-                                              *histogram_);
-    return m;
+    auto p = new Histogram(down_cast<const HistogramPrototype*>(prototype_), 
*histogram_);
+    p->m_epoch_.store(m_epoch_);
+    p->invalid_for_merge_ = invalid_for_merge_;
+    p->retire_time_ = retire_time_;
+    return scoped_refptr<Metric>(p);
   }
 
   // Increment the histogram for the given value.
@@ -1310,9 +1369,16 @@ class Histogram : public Metric {
   }
 
   void MergeFrom(const scoped_refptr<Metric>& other) override {
-    if (PREDICT_TRUE(this != other.get())) {
-      
histogram_->MergeFrom(*(down_cast<Histogram*>(other.get())->histogram()));
+    if (PREDICT_FALSE(this == other.get())) {
+      return;
     }
+
+    if (InvalidateIfNeededInMerge(other)) {
+      return;
+    }
+
+    UpdateModificationEpoch();
+    histogram_->MergeFrom(*(down_cast<Histogram*>(other.get())->histogram()));
   }
 
  private:

Reply via email to