Repository: kudu
Updated Branches:
refs/heads/master f3ec97bd1 -> 8329bdc88
[sys_catalog] make visitor's code more generic
Minor refactoring on sys_catalog.{cc,h}: the visitor-related code
is made more generic to avoid code duplication.
There are no functional changes in this patch.
Change-Id: I7aa1e81ad7e61af8376ce4b52cd4020f5ce69a63
Reviewed-on: http://gerrit.cloudera.org:8080/5901
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/27e05ee8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/27e05ee8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/27e05ee8
Branch: refs/heads/master
Commit: 27e05ee84cb5f3abae0d36d1a2425f5d7701d1f8
Parents: f3ec97b
Author: Alexey Serbin <[email protected]>
Authored: Fri Feb 3 16:04:13 2017 -0800
Committer: Todd Lipcon <[email protected]>
Committed: Sat Feb 4 04:39:57 2017 +0000
----------------------------------------------------------------------
src/kudu/master/sys_catalog.cc | 138 +++++++++++++++++-------------------
src/kudu/master/sys_catalog.h | 13 ++--
2 files changed, 74 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/27e05ee8/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 3f2e093..59c4bc3 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -18,11 +18,14 @@
#include "kudu/master/sys_catalog.h"
#include <algorithm>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
+#include <functional>
#include <iterator>
#include <memory>
#include <set>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
@@ -70,8 +73,10 @@ using kudu::tablet::TabletPeer;
using kudu::tablet::TabletStatusListener;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
+using std::function;
using std::shared_ptr;
using std::unique_ptr;
+using std::vector;
using strings::Substitute;
namespace kudu {
@@ -352,7 +357,7 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB
*req, WriteResponsePB *re
CountDownLatch latch(1);
gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
- new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));
+ new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));
unique_ptr<tablet::WriteTransactionState> tx_state(
new tablet::WriteTransactionState(tablet_peer_.get(),
req,
@@ -467,16 +472,44 @@ void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req,
const TableInfo* table
Status SysCatalogTable::VisitTables(TableVisitor* visitor) {
TRACE_EVENT0("master", "SysCatalogTable::VisitTables");
+ auto processor = [&](
+ const string& entry_id,
+ const SysTablesEntryPB& entry_data) {
+ return visitor->VisitTable(entry_id, entry_data);
+ };
+ return ProcessRows<SysTablesEntryPB, TABLES_ENTRY>(processor);
+}
- const int8_t tables_entry = TABLES_ENTRY;
+template<typename T>
+Status SysCatalogTable::GetEntryFromRow(
+ const RowBlockRow& row, string* entry_id, T* entry_data) const {
+ const Slice* id = schema_.ExtractColumnFromRow<STRING>(
+ row, schema_.find_column(kSysCatalogTableColId));
+ const Slice* data = schema_.ExtractColumnFromRow<STRING>(
+ row, schema_.find_column(kSysCatalogTableColMetadata));
+ const string& str_id = id->ToString();
+ RETURN_NOT_OK_PREPEND(
+ pb_util::ParseFromArray(entry_data, data->data(), data->size()),
+ "unable to parse metadata field for row " + str_id);
+ *entry_id = str_id;
+
+ return Status::OK();
+}
+
+// Scan for entries of the specified type and run the specified function
+// with each entry found.
+template<typename T, SysCatalogTable::CatalogEntryType entry_type>
+Status SysCatalogTable::ProcessRows(
+ function<Status(const string&, const T&)> processor) const {
const int type_col_idx = schema_.find_column(kSysCatalogTableColType);
CHECK(type_col_idx != Schema::kColumnNotFound)
- << "Cannot find sys catalog table column " << kSysCatalogTableColType <<
" in schema: "
- << schema_.ToString();
+ << "cannot find sys catalog table column " << kSysCatalogTableColType
+ << " in schema: " << schema_.ToString();
- auto pred_tables = ColumnPredicate::Equality(schema_.column(type_col_idx),
&tables_entry);
+ const int8_t et = entry_type;
+ auto pred = ColumnPredicate::Equality(schema_.column(type_col_idx), &et);
ScanSpec spec;
- spec.AddPredicate(pred_tables);
+ spec.AddPredicate(pred);
gscoped_ptr<RowwiseIterator> iter;
RETURN_NOT_OK(tablet_peer_->tablet()->NewRowIterator(schema_, &iter));
@@ -486,30 +519,19 @@ Status SysCatalogTable::VisitTables(TableVisitor*
visitor) {
RowBlock block(iter->schema(), 512, &arena);
while (iter->HasNext()) {
RETURN_NOT_OK(iter->NextBlock(&block));
- for (size_t i = 0; i < block.nrows(); i++) {
- if (!block.selection_vector()->IsRowSelected(i)) continue;
-
- RETURN_NOT_OK(VisitTableFromRow(block.row(i), visitor));
+ for (size_t i = 0; i < block.nrows(); ++i) {
+ if (!block.selection_vector()->IsRowSelected(i)) {
+ continue;
+ }
+ string entry_id;
+ T entry_data;
+ RETURN_NOT_OK(GetEntryFromRow(block.row(i), &entry_id, &entry_data));
+ RETURN_NOT_OK(processor(entry_id, entry_data));
}
}
return Status::OK();
}
-Status SysCatalogTable::VisitTableFromRow(const RowBlockRow& row,
- TableVisitor* visitor) {
- const Slice* table_id =
- schema_.ExtractColumnFromRow<STRING>(row,
schema_.find_column(kSysCatalogTableColId));
- const Slice* data =
- schema_.ExtractColumnFromRow<STRING>(row,
schema_.find_column(kSysCatalogTableColMetadata));
-
- SysTablesEntryPB metadata;
- RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&metadata, data->data(),
data->size()),
- "Unable to parse metadata field for table " +
table_id->ToString());
-
- RETURN_NOT_OK(visitor->VisitTable(table_id->ToString(), metadata));
- return Status::OK();
-}
-
// ==================================================================
// Tablet related methods
// ==================================================================
@@ -553,55 +575,25 @@ void SysCatalogTable::ReqDeleteTablets(WriteRequestPB*
req,
}
}
-Status SysCatalogTable::VisitTabletFromRow(const RowBlockRow& row,
TabletVisitor *visitor) {
- const Slice *tablet_id =
- schema_.ExtractColumnFromRow<STRING>(row,
schema_.find_column(kSysCatalogTableColId));
- const Slice *data =
- schema_.ExtractColumnFromRow<STRING>(row,
schema_.find_column(kSysCatalogTableColMetadata));
-
- SysTabletsEntryPB metadata;
- RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(&metadata, data->data(),
data->size()),
- "Unable to parse metadata field for tablet " +
tablet_id->ToString());
-
- // Upgrade from the deprecated start/end-key fields to the 'partition' field.
- if (!metadata.has_partition()) {
- metadata.mutable_partition()->set_partition_key_start(
- metadata.deprecated_start_key());
- metadata.mutable_partition()->set_partition_key_end(
- metadata.deprecated_end_key());
- metadata.clear_deprecated_start_key();
- metadata.clear_deprecated_end_key();
- }
-
- RETURN_NOT_OK(visitor->VisitTablet(metadata.table_id(),
tablet_id->ToString(), metadata));
- return Status::OK();
-}
-
Status SysCatalogTable::VisitTablets(TabletVisitor* visitor) {
TRACE_EVENT0("master", "SysCatalogTable::VisitTablets");
- const int8_t tablets_entry = TABLETS_ENTRY;
- const int type_col_idx = schema_.find_column(kSysCatalogTableColType);
- CHECK(type_col_idx != Schema::kColumnNotFound);
-
- auto pred_tablets = ColumnPredicate::Equality(schema_.column(type_col_idx),
&tablets_entry);
- ScanSpec spec;
- spec.AddPredicate(pred_tablets);
-
- gscoped_ptr<RowwiseIterator> iter;
- RETURN_NOT_OK(tablet_peer_->tablet()->NewRowIterator(schema_, &iter));
- RETURN_NOT_OK(iter->Init(&spec));
-
- Arena arena(32 * 1024, 256 * 1024);
- RowBlock block(iter->schema(), 512, &arena);
- while (iter->HasNext()) {
- RETURN_NOT_OK(iter->NextBlock(&block));
- for (size_t i = 0; i < block.nrows(); i++) {
- if (!block.selection_vector()->IsRowSelected(i)) continue;
-
- RETURN_NOT_OK(VisitTabletFromRow(block.row(i), visitor));
+ auto processor = [&](
+ const string& entry_id,
+ const SysTabletsEntryPB& entry_data) {
+ if (entry_data.has_partition()) {
+ return visitor->VisitTablet(entry_data.table_id(), entry_id, entry_data);
}
- }
- return Status::OK();
+ // Upgrade from the deprecated start/end-key fields to the 'partition'
field.
+ SysTabletsEntryPB metadata = entry_data;
+ metadata.mutable_partition()->set_partition_key_start(
+ entry_data.deprecated_start_key());
+ metadata.clear_deprecated_start_key();
+ metadata.mutable_partition()->set_partition_key_end(
+ entry_data.deprecated_end_key());
+ metadata.clear_deprecated_end_key();
+ return visitor->VisitTablet(metadata.table_id(), entry_id, metadata);
+ };
+ return ProcessRows<SysTabletsEntryPB, TABLETS_ENTRY>(processor);
}
void SysCatalogTable::InitLocalRaftPeerPB() {
http://git-wip-us.apache.org/repos/asf/kudu/blob/27e05ee8/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index e4702ed..1916636 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -17,6 +17,7 @@
#ifndef KUDU_MASTER_SYS_CATALOG_H_
#define KUDU_MASTER_SYS_CATALOG_H_
+#include <functional>
#include <string>
#include <vector>
@@ -36,10 +37,11 @@ class WriteResponsePB;
}
namespace master {
+
class Master;
-struct MasterOptions;
class TableInfo;
class TabletInfo;
+struct MasterOptions;
// The SysCatalogTable has two separate visitors because the tables
// data must be loaded into memory before the tablets data.
@@ -160,8 +162,12 @@ class SysCatalogTable {
// tablets.
Status WaitUntilRunning();
- // Table related private methods.
- Status VisitTableFromRow(const RowBlockRow& row, TableVisitor* visitor);
+ template<typename T>
+ Status GetEntryFromRow(const RowBlockRow& row,
+ std::string* entry_id, T* entry_data) const;
+
+ template<typename T, CatalogEntryType entry_type>
+ Status ProcessRows(std::function<Status(const std::string&, const T&)>)
const;
// Tablet related private methods.
@@ -169,7 +175,6 @@ class SysCatalogTable {
Status AddTabletsToPB(const std::vector<TabletInfo*>& tablets,
RowOperationsPB::Type op_type,
RowOperationsPB* ops) const;
- Status VisitTabletFromRow(const RowBlockRow& row, TabletVisitor* visitor);
// Initializes the RaftPeerPB for the local peer.
// Crashes due to an invariant check if the rpc server is not running.