This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e233e7c127db1f593be1b0f477267c060f7dd6f3 Author: triplesheep <[email protected]> AuthorDate: Tue Aug 20 15:05:12 2019 +0800 KUDU-2921: Exposing the table statistics to spark relation. Exposing current table statistics to spark via rpc from client to master. Change-Id: I7742a76708f989b0ccc8ba417f3390013e260175 Reviewed-on: http://gerrit.cloudera.org:8080/14107 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Adar Dembo <[email protected]> --- .../org/apache/kudu/client/AsyncKuduClient.java | 16 +++++ .../kudu/client/GetTableStatisticsRequest.java | 76 ++++++++++++++++++++++ .../kudu/client/GetTableStatisticsResponse.java | 59 +++++++++++++++++ .../java/org/apache/kudu/client/KuduClient.java | 11 ++++ .../java/org/apache/kudu/client/KuduTable.java | 8 +++ .../apache/kudu/client/KuduTableStatistics.java | 58 +++++++++++++++++ .../java/org/apache/kudu/client/TestKuduTable.java | 38 +++++++++++ .../org/apache/kudu/spark/kudu/DefaultSource.scala | 26 ++++++++ .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 73 +++++++++++++++++++++ src/kudu/master/authz_provider.h | 8 +++ src/kudu/master/catalog_manager.cc | 42 ++++++++++++ src/kudu/master/catalog_manager.h | 6 ++ src/kudu/master/default_authz_provider.h | 5 ++ src/kudu/master/master-test.cc | 28 ++++++++ src/kudu/master/master.proto | 18 +++++ src/kudu/master/master_service.cc | 13 ++++ src/kudu/master/master_service.h | 6 ++ src/kudu/master/sentry_authz_provider-test.cc | 13 ++++ src/kudu/master/sentry_authz_provider.cc | 9 +++ src/kudu/master/sentry_authz_provider.h | 3 + 20 files changed, 516 insertions(+) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 4482356..ba3824a 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -808,6 +808,22 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * Get table's statistics from master. + * @param name the table's name + * @return an deferred KuduTableStatistics + */ + public Deferred<KuduTableStatistics> getTableStatistics(String name) { + GetTableStatisticsRequest rpc = new GetTableStatisticsRequest(this.masterTable, + name, + timer, + defaultAdminOperationTimeoutMs); + + return sendRpcToTablet(rpc).addCallback(resp -> { + return new KuduTableStatistics(resp.getOnDiskSize(), resp.getLiveRowCount()); + }); + } + + /** * Test if a table exists. * @param name a non-null table name * @return true if the table exists, else false diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java new file mode 100644 index 0000000..e541f39 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import com.google.protobuf.Message; +import org.apache.yetus.audience.InterfaceAudience; +import org.jboss.netty.util.Timer; + +import org.apache.kudu.master.Master; +import org.apache.kudu.util.Pair; + [email protected] +class GetTableStatisticsRequest extends KuduRpc<GetTableStatisticsResponse> { + + static final String GET_TABLE_STATISTICS = "GetTableStatistics"; + + private final String name; + + GetTableStatisticsRequest(KuduTable table, + String name, + Timer timer, + long timeoutMillis) { + super(table, timer, timeoutMillis); + this.name = name; + } + + @Override + Message createRequestPB() { + final Master.GetTableStatisticsRequestPB.Builder builder = + Master.GetTableStatisticsRequestPB.newBuilder(); + Master.TableIdentifierPB tableID = + Master.TableIdentifierPB.newBuilder().setTableName(name).build(); + builder.setTable(tableID); + return builder.build(); + } + + @Override + String serviceName() { + return MASTER_SERVICE_NAME; + } + + @Override + String method() { + return GET_TABLE_STATISTICS; + } + + @Override + Pair<GetTableStatisticsResponse, Object> deserialize(CallResponse callResponse, + String tsUUID) throws KuduException { + final Master.GetTableStatisticsResponsePB.Builder respBuilder = + Master.GetTableStatisticsResponsePB.newBuilder(); + readProtobuf(callResponse.getPBMessage(), respBuilder); + GetTableStatisticsResponse response = new GetTableStatisticsResponse( + timeoutTracker.getElapsedMillis(), + tsUUID, + respBuilder.getOnDiskSize(), + respBuilder.getLiveRowCount()); + return new Pair<GetTableStatisticsResponse, Object>( + response, respBuilder.hasError() ? respBuilder.getError() : null); + } +} diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsResponse.java new file mode 100644 index 0000000..92f46f5 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsResponse.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import org.apache.yetus.audience.InterfaceAudience; + [email protected] +class GetTableStatisticsResponse extends KuduRpcResponse { + + private final long onDiskSize; + private final long liveRowCount; + + + /** + * @param elapsedMillis Time in milliseconds since RPC creation to now + * @param tsUUID the UUID of the tablet server that sent the response + * @param onDiskSize the table's on disk size + * @param liveRowCount the table's live row count + */ + GetTableStatisticsResponse(long elapsedMillis, + String tsUUID, + long onDiskSize, + long liveRowCount) { + super(elapsedMillis, tsUUID); + this.onDiskSize = onDiskSize; + this.liveRowCount = liveRowCount; + } + + /** + * Get the table's on disk size, this statistic is pre-replication. + * @return Table's on disk size + */ + public long getOnDiskSize() { + return onDiskSize; + } + + /** + * Get the table's live row count, this statistic is pre-replication. + * @return Table's live row count + */ + public long getLiveRowCount() { + return liveRowCount; + } +} diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java index acb2b69..4e3a705 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java @@ -219,6 +219,17 @@ public class KuduClient implements AutoCloseable { } /** + * Get table's statistics from master. + * @param name the table's name + * @return the statistics of table + * @throws KuduException if anything went wrong + */ + public KuduTableStatistics getTableStatistics(String name) throws KuduException { + Deferred<KuduTableStatistics> d = asyncClient.getTableStatistics(name); + return joinAndHandleException(d); + } + + /** * Test if a table exists. * @param name a non-null table name * @return true if the table exists, else false diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java index 5acb72d..29aa4a7 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java @@ -281,4 +281,12 @@ public class KuduTable { } return rangePartitions; } + + /** + * Get this table's statistics. + * @return this table's statistics + */ + public KuduTableStatistics getTableStatistics() throws KuduException { + return client.syncClient().getTableStatistics(name); + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTableStatistics.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTableStatistics.java new file mode 100644 index 0000000..3ebbae9 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTableStatistics.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Represent statistics belongs to a specific kudu table. + */ [email protected] [email protected] +public class KuduTableStatistics { + + private final long onDiskSize; + private final long liveRowCount; + + /** + * @param onDiskSize the table's on disk size + * @param liveRowCount the table's live row count + */ + KuduTableStatistics(long onDiskSize, + long liveRowCount) { + this.onDiskSize = onDiskSize; + this.liveRowCount = liveRowCount; + } + + /** + * Get the table's on disk size, this statistic is pre-replication. + * @return Table's on disk size + */ + public long getOnDiskSize() { + return onDiskSize; + } + + /** + * Get the table's live row count, this statistic is pre-replication. + * @return Table's live row count + */ + public long getLiveRowCount() { + return liveRowCount; + } +} diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java index 064b3cc..43758f4 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java @@ -779,4 +779,42 @@ public class TestKuduTable { assertEquals(9, dimensionMap.get("labelA").intValue()); assertEquals(3, dimensionMap.get("labelB").intValue()); } + + @Test(timeout = 100000) + @KuduTestHarness.TabletServerConfig(flags = { + "--update_tablet_stats_interval_ms=200", + "--heartbeat_interval_ms=100", + }) + public void testGetTableStatistics() throws Exception { + // Create a table. + CreateTableOptions builder = getBasicCreateTableOptions(); + KuduTable table = client.createTable(tableName, BASIC_SCHEMA, builder); + + // Insert some rows and test the statistics. + KuduTableStatistics prevStatistics = new KuduTableStatistics(-1, -1); + KuduTableStatistics currentStatistics = new KuduTableStatistics(-1, -1); + KuduSession session = client.newSession(); + int num = 100; + for (int i = 0; i < num; ++i) { + // Get current table statistics. + currentStatistics = table.getTableStatistics(); + assertTrue(currentStatistics.getOnDiskSize() >= prevStatistics.getOnDiskSize()); + assertTrue(currentStatistics.getLiveRowCount() >= prevStatistics.getLiveRowCount()); + assertTrue(currentStatistics.getLiveRowCount() <= i + 1); + prevStatistics = currentStatistics; + // Insert row. + Insert insert = createBasicSchemaInsert(table, i); + session.apply(insert); + List<String> rows = scanTableToStrings(table); + assertEquals("wrong number of rows", i + 1, rows.size()); + } + + // Final accuracy test. + // Wait for master to aggregate table statistics. + Thread.sleep(200 * 6); + currentStatistics = table.getTableStatistics(); + assertTrue(currentStatistics.getOnDiskSize() >= prevStatistics.getOnDiskSize()); + assertTrue(currentStatistics.getLiveRowCount() >= prevStatistics.getLiveRowCount()); + assertTrue(currentStatistics.getLiveRowCount() == num); + } } diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index fcbafc2..74da812 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.apache.kudu.client.KuduPredicate.ComparisonOp import org.apache.kudu.client._ import org.apache.kudu.spark.kudu.KuduReadOptions._ @@ -254,12 +256,36 @@ class KuduRelation( val readOptions: KuduReadOptions = new KuduReadOptions, val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation { + val log: Logger = LoggerFactory.getLogger(getClass) private val context: KuduContext = new KuduContext(masterAddrs, sqlContext.sparkContext) private val table: KuduTable = context.syncClient.openTable(tableName) + private val estimatedSize: Long = { + try { + table.getTableStatistics().getOnDiskSize + } catch { + case e: Exception => + log.warn("Error while getting table statistic from master, maybe the current" + + " master doesn't support the rpc, please check the version.", e) + super.sizeInBytes + } + } + + /** + * Estimated size of this relation in bytes, this information is used by spark to + * decide whether it is safe to broadcast a relation such as in join selection. It + * is always better to overestimate this size than underestimate, because underestimation + * may lead to expensive execution plan such as broadcasting a very large table which + * will cause great network bandwidth consumption. + * TODO(KUDU-2933): Consider projection and predicates in size estimation. + * + * @return size of this relation in bytes + */ + override def sizeInBytes: Long = estimatedSize + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = filters.filterNot(KuduRelation.supportsFilter) diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 4c7e04d..7dc96d2 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -34,8 +34,10 @@ import org.apache.kudu.Schema import org.apache.kudu.Type import org.apache.kudu.test.RandomUtils import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter +import org.apache.kudu.test.KuduTestHarness.MasterServerConfig import org.apache.kudu.test.KuduTestHarness.TabletServerConfig import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.junit.Before import org.junit.Test @@ -1053,4 +1055,75 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { } assert(actualNumTasks > 2) } + + @Test + @MasterServerConfig( + flags = Array( + "--mock_table_metrics_for_testing=true", + "--on_disk_size_for_testing=1024", + "--live_row_count_for_testing=100" + )) + def testGetTableStatistics(): Unit = { + val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load + val kuduRelation = kuduRelationFromDataFrame(dataFrame) + assert(kuduRelation.sizeInBytes == 1024) + } + + @Test + @MasterServerConfig( + flags = Array( + "--mock_table_metrics_for_testing=true", + "--on_disk_size_for_testing=1024", + "--live_row_count_for_testing=100" + )) + def testJoinWithTableStatistics(): Unit = { + val df = sqlContext.read.options(kuduOptions).format("kudu").load + + // 1. Create two tables. + val table1 = "table1" + kuduContext.createTable( + table1, + df.schema, + Seq("key"), + new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1)) + var options1: Map[String, String] = + Map("kudu.table" -> table1, "kudu.master" -> harness.getMasterAddressesAsString) + df.write.options(options1).mode("append").format("kudu").save + val df1 = sqlContext.read.options(options1).format("kudu").load + df1.createOrReplaceTempView(table1) + + val table2 = "table2" + kuduContext.createTable( + table2, + df.schema, + Seq("key"), + new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1)) + var options2: Map[String, String] = + Map("kudu.table" -> table2, "kudu.master" -> harness.getMasterAddressesAsString) + df.write.options(options2).mode("append").format("kudu").save + val df2 = sqlContext.read.options(options2).format("kudu").load + df2.createOrReplaceTempView(table2) + + // 2. Get the table statistics of each table and verify. + val relation1 = kuduRelationFromDataFrame(df1) + val relation2 = kuduRelationFromDataFrame(df2) + assert(relation1.sizeInBytes == relation2.sizeInBytes) + assert(relation1.sizeInBytes == 1024) + + // 3. Test join with table size should be able to broadcast. + val sqlStr = s"SELECT * FROM $table1 JOIN $table2 ON $table1.key = $table2.key" + var physical = sqlContext.sql(sqlStr).queryExecution.sparkPlan + var operators = physical.collect { + case j: BroadcastHashJoinExec => j + } + assert(operators.size == 1) + + // Verify result. + var results = sqlContext.sql(sqlStr).collectAsList() + assert(results.size() == rowCount) + } } diff --git a/src/kudu/master/authz_provider.h b/src/kudu/master/authz_provider.h index 51ba635..48023e5 100644 --- a/src/kudu/master/authz_provider.h +++ b/src/kudu/master/authz_provider.h @@ -95,6 +95,14 @@ class AuthzProvider { std::unordered_set<std::string>* table_names, bool* checked_table_names) WARN_UNUSED_RESULT = 0; + // Checks if statistics of the table is authorized for the + // given user. + // + // If the operation is not authorized, returns Status::NotAuthorized(). + // Otherwise, may return other Status error codes depend on actual errors. + virtual Status AuthorizeGetTableStatistics(const std::string& table_name, + const std::string& user) WARN_UNUSED_RESULT = 0; + // Populates the privilege fields of 'pb' with the table-specific privileges // for the given user, using 'schema_pb' for metadata (e.g. column IDs). This // does not populate the table ID field of 'pb' -- only the privilege fields; diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 22792c2..d9cd3ab 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -268,6 +268,21 @@ DEFINE_int32(catalog_manager_inject_latency_list_authz_ms, 0, TAG_FLAG(catalog_manager_inject_latency_list_authz_ms, hidden); TAG_FLAG(catalog_manager_inject_latency_list_authz_ms, unsafe); +DEFINE_bool(mock_table_metrics_for_testing, false, + "Whether to enable mock table metrics for testing."); +TAG_FLAG(mock_table_metrics_for_testing, hidden); +TAG_FLAG(mock_table_metrics_for_testing, runtime); + +DEFINE_int64(on_disk_size_for_testing, 0, + "Mock the on disk size of metrics for testing."); +TAG_FLAG(on_disk_size_for_testing, hidden); +TAG_FLAG(on_disk_size_for_testing, runtime); + +DEFINE_int64(live_row_count_for_testing, 0, + "Mock the live row count of metrics for testing."); +TAG_FLAG(live_row_count_for_testing, hidden); +TAG_FLAG(live_row_count_for_testing, runtime); + DECLARE_bool(raft_prepare_replacement_before_eviction); DECLARE_bool(raft_attempt_to_replace_replica_without_majority); DECLARE_int64(tsk_rotation_seconds); @@ -2894,6 +2909,32 @@ Status CatalogManager::ListTables(const ListTablesRequestPB* req, return Status::OK(); } +Status CatalogManager::GetTableStatistics(const GetTableStatisticsRequestPB* req, + GetTableStatisticsResponsePB* resp, + optional<const string&> user) { + leader_lock_.AssertAcquiredForReading(); + RETURN_NOT_OK(CheckOnline()); + + scoped_refptr<TableInfo> table; + TableMetadataLock l; + auto authz_func = [&] (const string& username, const string& table_name) { + return SetupError(authz_provider_->AuthorizeGetTableStatistics(table_name, username), + resp, MasterErrorPB::NOT_AUTHORIZED); + }; + RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user, + &table, &l)); + + int64_t on_disk_size = table->GetMetrics()->on_disk_size->value(); + int64_t live_row_count = table->GetMetrics()->live_row_count->value(); + if (FLAGS_mock_table_metrics_for_testing) { + on_disk_size = FLAGS_on_disk_size_for_testing; + live_row_count = FLAGS_live_row_count_for_testing; + } + resp->set_on_disk_size(on_disk_size); + resp->set_live_row_count(live_row_count); + return Status::OK(); +} + Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableInfo> *table) { leader_lock_.AssertAcquiredForReading(); RETURN_NOT_OK(CheckOnline()); @@ -5236,6 +5277,7 @@ INITTED_AND_LEADER_OR_RESPOND(IsCreateTableDoneResponsePB); INITTED_AND_LEADER_OR_RESPOND(ListTablesResponsePB); INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB); INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB); +INITTED_AND_LEADER_OR_RESPOND(GetTableStatisticsResponsePB); INITTED_AND_LEADER_OR_RESPOND(GetTabletLocationsResponsePB); INITTED_AND_LEADER_OR_RESPOND(ReplaceTabletResponsePB); diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index 3e89815..e7fc11c 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -633,6 +633,12 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { ListTablesResponsePB* resp, boost::optional<const std::string&> user); + // Get table statistics. If 'user' is provided, checks if the user is + // authorized to get such statistics. + Status GetTableStatistics(const GetTableStatisticsRequestPB* req, + GetTableStatisticsResponsePB* resp, + boost::optional<const std::string&> user); + // Lookup the tablets contained in the partition range of the request. If 'user' // is provided, checks that the user is authorized to get such information. // diff --git a/src/kudu/master/default_authz_provider.h b/src/kudu/master/default_authz_provider.h index 9db2f5a..55b3311 100644 --- a/src/kudu/master/default_authz_provider.h +++ b/src/kudu/master/default_authz_provider.h @@ -73,6 +73,11 @@ class DefaultAuthzProvider : public AuthzProvider { return Status::OK(); } + Status AuthorizeGetTableStatistics(const std::string& /*table_name*/, + const std::string& /*user*/) override WARN_UNUSED_RESULT { + return Status::OK(); + } + Status FillTablePrivilegePB(const std::string& /*table_name*/, const std::string& /*user*/, const SchemaPB& /*schema_pb*/, diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc index 7cbbcb5..bc35210 100644 --- a/src/kudu/master/master-test.cc +++ b/src/kudu/master/master-test.cc @@ -105,10 +105,13 @@ using strings::Substitute; DECLARE_bool(catalog_manager_check_ts_count_for_create_table); DECLARE_bool(master_support_authz_tokens); +DECLARE_bool(mock_table_metrics_for_testing); DECLARE_bool(raft_prepare_replacement_before_eviction); DECLARE_double(sys_catalog_fail_during_write); DECLARE_int32(diagnostics_log_stack_traces_interval_ms); DECLARE_int32(master_inject_latency_on_tablet_lookups_ms); +DECLARE_int64(live_row_count_for_testing); +DECLARE_int64(on_disk_size_for_testing); DECLARE_string(location_mapping_cmd); namespace kudu { @@ -1772,6 +1775,31 @@ TEST_F(MasterTest, TestTableIdentifierWithIdAndName) { } } +TEST_F(MasterTest, TestGetTableStatistics) { + const char *kTableName = "testtable"; + const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1); + ASSERT_OK(CreateTable(kTableName, kTableSchema)); + + // Get table statistics with right name. + GetTableStatisticsRequestPB req; + GetTableStatisticsResponsePB resp; + RpcController controller; + req.mutable_table()->set_table_name(kTableName); + ASSERT_OK(proxy_->GetTableStatistics(req, &resp, &controller)); + ASSERT_FALSE(resp.has_error()) << resp.error().DebugString(); + ASSERT_EQ(0, resp.on_disk_size()); + ASSERT_EQ(0, resp.live_row_count()); + + FLAGS_mock_table_metrics_for_testing = true; + FLAGS_on_disk_size_for_testing = 1024; + FLAGS_live_row_count_for_testing = 100; + controller.Reset(); + ASSERT_OK(proxy_->GetTableStatistics(req, &resp, &controller)); + ASSERT_FALSE(resp.has_error()) << resp.error().DebugString(); + ASSERT_EQ(FLAGS_on_disk_size_for_testing, resp.on_disk_size()); + ASSERT_EQ(FLAGS_live_row_count_for_testing, resp.live_row_count()); +} + class AuthzTokenMasterTest : public MasterTest, public ::testing::WithParamInterface<bool> {}; diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index fc5fef7..73a656d 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -535,6 +535,19 @@ message ListTablesResponsePB { repeated TableInfo tables = 2; } +message GetTableStatisticsRequestPB { + required TableIdentifierPB table = 1; +} + +message GetTableStatisticsResponsePB { + // The error, if an error occurred with this request. + optional MasterErrorPB error = 1; + + // The table statistics from table metrics. + optional uint64 on_disk_size = 2; + optional uint64 live_row_count = 3; +} + message GetTableLocationsRequestPB { required TableIdentifierPB table = 1; @@ -919,6 +932,11 @@ service MasterService { rpc ListTables(ListTablesRequestPB) returns (ListTablesResponsePB) { option (kudu.rpc.authz_method) = "AuthorizeClient"; } + + rpc GetTableStatistics(GetTableStatisticsRequestPB) returns (GetTableStatisticsResponsePB) { + option (kudu.rpc.authz_method) = "AuthorizeClient"; + } + rpc GetTableLocations(GetTableLocationsRequestPB) returns (GetTableLocationsResponsePB) { option (kudu.rpc.authz_method) = "AuthorizeClient"; } diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc index 9b9af6d..20fa876 100644 --- a/src/kudu/master/master_service.cc +++ b/src/kudu/master/master_service.cc @@ -411,6 +411,19 @@ void MasterServiceImpl::ListTables(const ListTablesRequestPB* req, rpc->RespondSuccess(); } +void MasterServiceImpl::GetTableStatistics(const GetTableStatisticsRequestPB* req, + GetTableStatisticsResponsePB* resp, + rpc::RpcContext* rpc) { + CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager()); + if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) { + return; + } + Status s = server_->catalog_manager()->GetTableStatistics( + req, resp, make_optional<const string&>(rpc->remote_user().username())); + CheckRespErrorOrSetUnknown(s, resp); + rpc->RespondSuccess(); +} + void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req, GetTableLocationsResponsePB* resp, rpc::RpcContext* rpc) { diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h index 5d6846e..83dee04 100644 --- a/src/kudu/master/master_service.h +++ b/src/kudu/master/master_service.h @@ -50,6 +50,8 @@ class GetTableLocationsRequestPB; class GetTableLocationsResponsePB; class GetTableSchemaRequestPB; class GetTableSchemaResponsePB; +class GetTableStatisticsRequestPB; +class GetTableStatisticsResponsePB; class GetTabletLocationsRequestPB; class GetTabletLocationsResponsePB; class IsAlterTableDoneRequestPB; @@ -132,6 +134,10 @@ class MasterServiceImpl : public MasterServiceIf { ListTablesResponsePB* resp, rpc::RpcContext* rpc) override; + void GetTableStatistics(const GetTableStatisticsRequestPB* req, + GetTableStatisticsResponsePB* resp, + rpc::RpcContext* rpc) override; + void GetTableLocations(const GetTableLocationsRequestPB* req, GetTableLocationsResponsePB* resp, rpc::RpcContext* rpc) override; diff --git a/src/kudu/master/sentry_authz_provider-test.cc b/src/kudu/master/sentry_authz_provider-test.cc index a3a694c..7efe19d 100644 --- a/src/kudu/master/sentry_authz_provider-test.cc +++ b/src/kudu/master/sentry_authz_provider-test.cc @@ -812,6 +812,19 @@ TEST_F(SentryAuthzProviderTest, TestAuthorizeAlterTable) { kTestUser)); } +TEST_F(SentryAuthzProviderTest, TestAuthorizeGetTableStatistics) { + // Don't authorize getting statistics of a table for a user without required + // privileges. + ASSERT_OK(CreateRoleAndAddToGroups()); + Status s = sentry_authz_provider_->AuthorizeGetTableStatistics("db.table", kTestUser); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + + // Authorize get table statistics on a user with proper privileges. + TSentryPrivilege privilege = GetDatabasePrivilege("db", "SELECT"); + ASSERT_OK(AlterRoleGrantPrivilege(privilege)); + ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableStatistics("db.table", kTestUser)); +} + TEST_F(SentryAuthzProviderTest, TestAuthorizeGetTableMetadata) { // Don't authorize getting metadata on a table for a user without required // privileges. diff --git a/src/kudu/master/sentry_authz_provider.cc b/src/kudu/master/sentry_authz_provider.cc index 355bdae..b9705c0 100644 --- a/src/kudu/master/sentry_authz_provider.cc +++ b/src/kudu/master/sentry_authz_provider.cc @@ -253,6 +253,15 @@ Status SentryAuthzProvider::AuthorizeListTables(const string& user, return Status::OK(); } +Status SentryAuthzProvider::AuthorizeGetTableStatistics(const std::string& table_name, + const std::string& user) { + // Statistics contain data (e.g. number of rows) that requires the 'SELECT ON TABLE' + // privilege. + return Authorize(SentryAuthorizableScope::Scope::TABLE, + SentryAction::Action::SELECT, + table_name, user); +} + Status SentryAuthzProvider::FillTablePrivilegePB(const string& table_name, const string& user, const SchemaPB& schema_pb, diff --git a/src/kudu/master/sentry_authz_provider.h b/src/kudu/master/sentry_authz_provider.h index 7f1496f..27c0fa1 100644 --- a/src/kudu/master/sentry_authz_provider.h +++ b/src/kudu/master/sentry_authz_provider.h @@ -96,6 +96,9 @@ class SentryAuthzProvider : public AuthzProvider { std::unordered_set<std::string>* table_names, bool* checked_table_names) override WARN_UNUSED_RESULT; + Status AuthorizeGetTableStatistics(const std::string& table_name, + const std::string& user) override WARN_UNUSED_RESULT; + Status FillTablePrivilegePB(const std::string& table_name, const std::string& user, const SchemaPB& schema_pb,
