Repository: kudu
Updated Branches:
  refs/heads/master 96951f68f -> fe4d962c0


KUDU-2191 (7/n): HmsCatalog

This commit adds some important higher-level HMS machinery in the
HmsCatalog class.  This class is responsible for translating Kudu
catalog information to the Hive format, thread safety, retries, Hive HA,
and configuration flags for the Hive integration.

A follow-up commit will integrate the HmsCatalog into the
CatalogManager, at which point many parts of the HMS integration will
become functional. Until that point this is still non-functional code.

Change-Id: I6eb2a4c400f5aaee095e4e4ad572981565a1c040
Reviewed-on: http://gerrit.cloudera.org:8080/9862
Reviewed-by: Adar Dembo <a...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fe4d962c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fe4d962c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fe4d962c

Branch: refs/heads/master
Commit: fe4d962c09012e779dd3746c2514df4f0ef70a9a
Parents: 96951f6
Author: Dan Burkert <danburk...@apache.org>
Authored: Wed Mar 28 16:41:19 2018 -0700
Committer: Dan Burkert <danburk...@apache.org>
Committed: Mon Apr 2 22:59:36 2018 +0000

----------------------------------------------------------------------
 src/kudu/hms/CMakeLists.txt      |   6 +-
 src/kudu/hms/hms_catalog-test.cc | 405 ++++++++++++++++++++++++++
 src/kudu/hms/hms_catalog.cc      | 526 ++++++++++++++++++++++++++++++++++
 src/kudu/hms/hms_catalog.h       | 144 ++++++++++
 src/kudu/hms/mini_hms.cc         |   4 +
 src/kudu/hms/mini_hms.h          |   4 +
 src/kudu/master/CMakeLists.txt   |   1 +
 src/kudu/master/master.cc        |  23 ++
 8 files changed, 1111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/hms/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/hms/CMakeLists.txt b/src/kudu/hms/CMakeLists.txt
index 572552f..fbc4c6e 100644
--- a/src/kudu/hms/CMakeLists.txt
+++ b/src/kudu/hms/CMakeLists.txt
@@ -24,6 +24,7 @@ target_link_libraries(hms_thrift thrift)
 add_dependencies(hms_thrift ${HMS_THRIFT_TGTS})
 
 set(HMS_SRCS
+  hms_catalog.cc
   hms_client.cc
   sasl_client_transport.cc)
 set(HMS_DEPS
@@ -31,6 +32,7 @@ set(HMS_DEPS
   glog
   hms_thrift
   krpc
+  kudu_common
   kudu_util)
 
 add_library(kudu_hms ${HMS_SRCS})
@@ -78,7 +80,7 @@ if (NOT NO_TESTS)
     mini_kdc
     ${KUDU_MIN_TEST_LIBS})
 
-  # This test has to run serially since otherwise starting the HMS can take a 
very
-  # long time.
+  # These tests must run serially, otherwise starting the HMS can take a very 
long time.
+  ADD_KUDU_TEST(hms_catalog-test RUN_SERIAL true NUM_SHARDS 4)
   ADD_KUDU_TEST(hms_client-test RUN_SERIAL true NUM_SHARDS 4)
 endif()

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/hms/hms_catalog-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
new file mode 100644
index 0000000..1510af0
--- /dev/null
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/hms/hms_catalog.h"
+
+#include <map> // IWYU pragma: keep
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/hms/hive_metastore_constants.h"
+#include "kudu/hms/hive_metastore_types.h"
+#include "kudu/hms/hms_client.h"
+#include "kudu/hms/mini_hms.h" // IWYU pragma: keep
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/security/test/mini_kdc.h" // IWYU pragma: keep
+#include "kudu/util/net/net_util.h" // IWYU pragma: keep
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+DECLARE_string(hive_metastore_uris);
+DECLARE_bool(hive_metastore_sasl_enabled);
+
+using kudu::rpc::SaslProtection;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace hms {
+
+TEST(HmsCatalogStaticTest, TestParseTableName) {
+  string db;
+  string tbl;
+
+  EXPECT_OK(HmsCatalog::ParseTableName("foo.bar", &db, &tbl));
+  EXPECT_EQ("foo", db);
+  EXPECT_EQ("bar", tbl);
+
+  EXPECT_OK(HmsCatalog::ParseTableName("99bottles.my_awesome/table/22", &db, 
&tbl));
+  EXPECT_EQ("99bottles", db);
+  EXPECT_EQ("my_awesome/table/22", tbl);
+
+  
EXPECT_OK(HmsCatalog::ParseTableName("_leading_underscore.trailing_underscore_",
 &db, &tbl));
+  EXPECT_EQ("_leading_underscore", db);
+  EXPECT_EQ("trailing_underscore_", tbl);
+
+  EXPECT_OK(HmsCatalog::ParseTableName("unicode ☃ tables?.maybe/one_day", 
&db, &tbl));
+  EXPECT_EQ("unicode ☃ tables?", db);
+  EXPECT_EQ("maybe/one_day", tbl);
+
+  EXPECT_OK(HmsCatalog::ParseTableName(".", &db, &tbl));
+  EXPECT_EQ("", db);
+  EXPECT_EQ("", tbl);
+
+  EXPECT_OK(HmsCatalog::ParseTableName(string("\0.\0", 3), &db, &tbl));
+  EXPECT_EQ(string("\0", 1), db);
+  EXPECT_EQ(string("\0", 1), tbl);
+
+  EXPECT_TRUE(HmsCatalog::ParseTableName("no-table", &db, 
&tbl).IsInvalidArgument());
+  EXPECT_TRUE(HmsCatalog::ParseTableName("lots.of.tables", &db, 
&tbl).IsInvalidArgument());
+}
+
+TEST(HmsCatalogStaticTest, TestParseUris) {
+  vector<HostPort> hostports;
+
+  EXPECT_OK(HmsCatalog::ParseUris("", &hostports));
+  EXPECT_TRUE(hostports.empty());
+
+  EXPECT_OK(HmsCatalog::ParseUris(",,,", &hostports));
+  EXPECT_TRUE(hostports.empty());
+
+  EXPECT_OK(HmsCatalog::ParseUris("thrift://foo-bar-baz:1234", &hostports));
+  EXPECT_EQ(hostports, vector<HostPort>({ HostPort("foo-bar-baz", 1234) }));
+
+  EXPECT_OK(HmsCatalog::ParseUris("thrift://hms-1:1234,thrift://hms-2", 
&hostports));
+  EXPECT_EQ(hostports, vector<HostPort>(
+            { HostPort("hms-1", 1234), HostPort("hms-2", 
HmsClient::kDefaultHmsPort) }));
+
+  Status s = HmsCatalog::ParseUris("://illegal-scheme:12", &hostports);
+  EXPECT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "missing scheme");
+
+  s = HmsCatalog::ParseUris("missing-scheme", &hostports);
+  EXPECT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "missing scheme");
+
+  s = HmsCatalog::ParseUris("thrift://foo,missing-scheme:1234,thrift://baz", 
&hostports);
+  EXPECT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "missing scheme");
+}
+
+// Base class for HmsCatalog tests. Parameterized by whether
+// SASL/GSSAPI/Kerberos should be enabled.
+class HmsCatalogTest : public ::testing::Test {
+ public:
+
+  const char* const kMasterAddrs = "master-addrs";
+  const char* const kTableId = "abc123";
+
+  virtual bool EnableKerberos() {
+    return false;
+  }
+
+  void SetUp() override {
+    bool enable_kerberos = EnableKerberos();
+
+    HmsClientOptions hms_client_opts;
+
+    hms_.reset(new hms::MiniHms());
+    if (enable_kerberos) {
+      kdc_.reset(new MiniKdc(MiniKdcOptions()));
+      ASSERT_OK(kdc_->Start());
+
+      // Create a service principal for the HMS, and configure it to use it.
+      string spn = "hive/127.0.0.1";
+      string ktpath;
+      ASSERT_OK(kdc_->CreateServiceKeytab(spn, &ktpath));
+      hms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"],
+                           spn,
+                           ktpath,
+                           SaslProtection::kPrivacy);
+
+      // Create a principal for the HmsCatalog, and configure it to use it.
+      ASSERT_OK(rpc::SaslInit());
+      ASSERT_OK(kdc_->CreateUserPrincipal("alice"));
+      ASSERT_OK(kdc_->Kinit("alice"));
+      ASSERT_OK(kdc_->SetKrb5Environment());
+      hms_client_opts.enable_kerberos = true;
+
+      // Configure the HmsCatalog flags.
+      FLAGS_hive_metastore_sasl_enabled = true;
+    }
+
+    ASSERT_OK(hms_->Start());
+
+    hms_client_.reset(new HmsClient(hms_->address(), hms_client_opts));
+    ASSERT_OK(hms_client_->Start());
+
+    FLAGS_hive_metastore_uris = hms_->uris();
+    hms_catalog_.reset(new HmsCatalog(kMasterAddrs));
+    ASSERT_OK(hms_catalog_->Start());
+  }
+
+  void TearDown() override {
+    ASSERT_OK(hms_->Stop());
+    ASSERT_OK(hms_client_->Stop());
+  }
+
+  Status StopHms() {
+    RETURN_NOT_OK(hms_client_->Stop());
+    RETURN_NOT_OK(hms_->Stop());
+    return Status::OK();
+  }
+
+  Status StartHms() {
+    RETURN_NOT_OK(hms_->Start());
+    RETURN_NOT_OK(hms_client_->Start());
+    return Status::OK();
+  }
+
+  Schema AllTypesSchema() {
+    SchemaBuilder b;
+    b.AddKeyColumn("key", DataType::INT32);
+    b.AddColumn("int8_val", DataType::INT8);
+    b.AddColumn("int16_val", DataType::INT16);
+    b.AddColumn("int32_val", DataType::INT32);
+    b.AddColumn("int64_val", DataType::INT64);
+    b.AddColumn("timestamp_val", DataType::UNIXTIME_MICROS);
+    b.AddColumn("string_val", DataType::STRING);
+    b.AddColumn("bool_val", DataType::BOOL);
+    b.AddColumn("float_val", DataType::FLOAT);
+    b.AddColumn("double_val", DataType::DOUBLE);
+    b.AddColumn("binary_val", DataType::BINARY);
+    b.AddColumn("decimal32_val", DataType::DECIMAL32);
+    b.AddColumn("decimal64_val", DataType::DECIMAL64);
+    b.AddColumn("decimal128_val", DataType::DECIMAL128);
+    return b.Build();
+  }
+
+  void CheckTable(const string& database_name,
+                  const string& table_name,
+                  const string& table_id,
+                  const Schema& schema) {
+    hive::Table table;
+    ASSERT_OK(hms_client_->GetTable(database_name, table_name, &table));
+
+    EXPECT_EQ(table.parameters[HmsClient::kKuduTableIdKey], table_id);
+    EXPECT_EQ(table.parameters[HmsClient::kKuduMasterAddrsKey], kMasterAddrs);
+    
EXPECT_EQ(table.parameters[hive::g_hive_metastore_constants.META_TABLE_STORAGE],
+              HmsClient::kKuduStorageHandler);
+
+    for (int column_idx = 0; column_idx < schema.num_columns(); column_idx++) {
+      EXPECT_EQ(table.sd.cols[column_idx].name, 
schema.columns()[column_idx].name());
+    }
+  }
+
+  void CheckTableDoesNotExist(const string& database_name, const string& 
table_name) {
+    hive::Table table;
+    Status s = hms_client_->GetTable(database_name, table_name, &table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+ protected:
+  unique_ptr<MiniKdc> kdc_;
+  unique_ptr<MiniHms> hms_;
+  unique_ptr<HmsClient> hms_client_;
+  unique_ptr<HmsCatalog> hms_catalog_;
+};
+
+// Subclass of HmsCatalogTest that allows running individual test cases with
+// Kerberos enabled and disabled. Most of the test cases are run only with
+// Kerberos disabled, but to get coverage against a Kerberized HMS we run 
select
+// cases in both modes.
+class HmsCatalogTestParameterized : public HmsCatalogTest,
+                                    public ::testing::WithParamInterface<bool> 
{
+  bool EnableKerberos() override {
+    return GetParam();
+  }
+};
+INSTANTIATE_TEST_CASE_P(HmsCatalogTests, HmsCatalogTestParameterized,
+                        ::testing::Values(false, true));
+
+// Test creating, altering, and dropping a table with the HMS Catalog.
+TEST_P(HmsCatalogTestParameterized, TestTableLifecycle) {
+  const string kTableId = "table-id";
+  const string kHmsDatabase = "default";
+  const string kHmsTableName = "table_name";
+  const string kTableName = Substitute("$0.$1", kHmsDatabase, kHmsTableName);
+  const string kHmsAlteredTableName = "altered_table_name";
+  const string kAlteredTableName = Substitute("$0.$1", kHmsDatabase, 
kHmsAlteredTableName);
+
+  Schema schema = AllTypesSchema();
+
+  // Create the table.
+  ASSERT_OK(hms_catalog_->CreateTable(kTableId, kTableName, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsTableName, kTableId, schema));
+
+  // Alter the table.
+  SchemaBuilder b(schema);
+  b.AddColumn("new_column", DataType::INT32);
+  Schema altered_schema = b.Build();
+  ASSERT_OK(hms_catalog_->AlterTable(kTableId, kTableName, kAlteredTableName, 
altered_schema));
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, kHmsTableName));
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsAlteredTableName, kTableId, 
altered_schema));
+
+  // Drop the table.
+  ASSERT_OK(hms_catalog_->DropTable(kTableId, kAlteredTableName));
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, kHmsTableName));
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, kHmsAlteredTableName));
+}
+
+// Checks that 'legacy' Kudu tables can be altered and dropped by the
+// HmsCatalog. Altering a legacy table to be HMS compliant should result in a
+// valid HMS table entry being created. Dropping a legacy table should do
+// nothing, but return success.
+TEST_F(HmsCatalogTest, TestLegacyTables) {
+  const string kTableId = "table-id";
+  const string kHmsDatabase = "default";
+
+  Schema schema = AllTypesSchema();
+  hive::Table table;
+
+  // Alter a table containing a non Hive-compatible character, and ensure an
+  // entry is created with the new (valid) name.
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "a"));
+  ASSERT_OK(hms_catalog_->AlterTable(kTableId, "default.☃", "default.a", 
schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, schema));
+
+  // Alter a table without a database and ensure an entry is created with the 
new (valid) name.
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "b"));
+  ASSERT_OK(hms_catalog_->AlterTable(kTableId, "no_database", "default.b", 
schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "b", kTableId, schema));
+
+  // Drop a table containing a Hive incompatible character, and ensure it 
doesn't fail.
+  ASSERT_OK(hms_catalog_->DropTable(kTableId, "foo.☃"));
+
+  // Drop a table without a database, and ensure it doesn't fail.
+  ASSERT_OK(hms_catalog_->DropTable(kTableId, "no_database"));
+}
+
+// Checks that Kudu tables will not replace or modify existing HMS entries that
+// belong to external tables from other systems.
+TEST_F(HmsCatalogTest, TestExternalTable) {
+  const string kTableId = "table-id";
+  const string kHmsDatabase = "default";
+
+  const string kHmsExternalTable = "external_table";
+  const string kExternalTableName = Substitute("$0.$1", kHmsDatabase, 
kHmsExternalTable);
+
+  const string kHmsKuduTable = "kudu_table";
+  const string kKuduTableName = Substitute("$0.$1", kHmsDatabase, 
kHmsKuduTable);
+
+  // Create the external table.
+  hive::Table external_table;
+  external_table.dbName = kHmsDatabase;
+  external_table.tableName = kHmsExternalTable;
+  external_table.tableType = HmsClient::kManagedTable;
+  ASSERT_OK(hms_client_->CreateTable(external_table));
+  // Retrieve the full HMS table object so it can be compared later (the HMS
+  // fills in some fields during creation).
+  ASSERT_OK(hms_client_->GetTable(kHmsDatabase, kHmsExternalTable, 
&external_table));
+
+  auto CheckExternalTable = [&] {
+    hive::Table current_table;
+    ASSERT_OK(hms_client_->GetTable(kHmsDatabase, kHmsExternalTable, 
&current_table));
+    ASSERT_EQ(current_table, external_table);
+  };
+
+  // Create the Kudu table.
+  Schema schema = AllTypesSchema();
+  ASSERT_OK(hms_catalog_->CreateTable(kTableId, kKuduTableName, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsKuduTable, kTableId, schema));
+
+  // Try and create a Kudu table with the same name as the external table.
+  Status s = hms_catalog_->CreateTable(kTableId, kKuduTableName, schema);
+  EXPECT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  NO_FATALS(CheckExternalTable());
+
+  // Try and rename the Kudu table to the external table name.
+  s = hms_catalog_->AlterTable(kTableId, kKuduTableName, kExternalTableName, 
schema);
+  EXPECT_TRUE(s.IsIllegalState()) << s.ToString();
+  NO_FATALS(CheckExternalTable());
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsKuduTable, kTableId, schema));
+
+  // Try and rename a Kudu table from the external table name to a new name.
+  // This depends on the Kudu table not actually existing in the HMS catalog.
+  const string kHmsRenamedTable = "renamed_table";
+  const string kRenamedTableName = Substitute("$0.$1", kHmsDatabase, 
kHmsRenamedTable);
+  ASSERT_OK(hms_catalog_->AlterTable(kTableId, kExternalTableName, 
kRenamedTableName, schema));
+  NO_FATALS(CheckExternalTable());
+  // The 'renamed' table is really just created with the new name.
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsRenamedTable, kTableId, schema));
+
+  // Try and alter a Kudu table with the same name as the external table.
+  // This depends on the Kudu table not actually existing in the HMS catalog.
+  s = hms_catalog_->AlterTable(kTableId, kExternalTableName, 
kExternalTableName, schema);
+  EXPECT_TRUE(s.IsIllegalState()) << s.ToString();
+  NO_FATALS(CheckExternalTable());
+
+  // Try and drop the external table as if it were a Kudu table.  This should
+  // return an OK status, but not actually modify the external table.
+  ASSERT_OK(hms_catalog_->DropTable(kTableId, kExternalTableName));
+  NO_FATALS(CheckExternalTable());
+
+  // Drop a Kudu table with no corresponding HMS entry.
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "bogus_table_name"));
+  ASSERT_OK(hms_catalog_->DropTable(kTableId, "default.bogus_table_name"));
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "bogus_table_name"));
+}
+
+// Checks that the HmsCatalog handles reconnecting to the metastore after a 
connection failure.
+TEST_F(HmsCatalogTest, TestReconnect) {
+  // TODO(dan): Figure out a way to test failover among HA HMS instances. The
+  // MiniHms does not support HA, since it relies on a single-process Derby 
database.
+
+  const string kTableId = "table-id";
+  const string kHmsDatabase = "default";
+  Schema schema = AllTypesSchema();
+  ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.a", schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, schema));
+
+  // Shutdown the HMS and try a few operations.
+  ASSERT_OK(StopHms());
+
+  Status s = hms_catalog_->CreateTable(kTableId, "default.b", schema);
+  EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.c", schema);
+  EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Start the HMS back up and ensure that the same operations succeed.
+  ASSERT_OK(StartHms());
+  EXPECT_OK(hms_catalog_->CreateTable(kTableId, "default.d", schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "d", kTableId, schema));
+
+  EXPECT_OK(hms_catalog_->AlterTable(kTableId, "default.a", "default.c", 
schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "c", kTableId, schema));
+  NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "a"));
+}
+
+} // namespace hms
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/hms/hms_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
new file mode 100644
index 0000000..3cfbd90
--- /dev/null
+++ b/src/kudu/hms/hms_catalog.cc
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/hms/hms_catalog.h"
+
+#include <algorithm> // IWYU pragma: keep
+#include <functional> // IWYU pragma: keep
+#include <iostream>
+#include <map> // IWYU pragma: keep
+#include <string>
+#include <type_traits> // IWYU pragma: keep
+#include <utility> // IWYU pragma: keep
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/types.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/split.h" // IWYU pragma: keep
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/hms/hive_metastore_constants.h"
+#include "kudu/hms/hive_metastore_types.h"
+#include "kudu/hms/hms_client.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/net/net_util.h" // IWYU pragma: keep
+#include "kudu/util/slice.h"
+#include "kudu/util/threadpool.h" // IWYU pragma: keep
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+DEFINE_string(hive_metastore_uris, "",
+              "Address of the Hive Metastore instance(s). The provided port 
must be for the HMS "
+              "Thrift service. If a port isn't provided, defaults to 9083. If 
the HMS is deployed "
+              "in an HA configuration, multiple comma-separated addresses can 
be supplied. If not "
+              "set, the Kudu master will not send Kudu table catalog updates 
to Hive. The "
+              "configured value must match the Hive hive.metastore.uris 
configuration.");
+DEFINE_validator(hive_metastore_uris, &kudu::hms::HmsCatalog::ValidateUris);
+TAG_FLAG(hive_metastore_uris, experimental);
+
+// Note: the hive_metastore_sasl_enabled and keytab_file combination is 
validated in master.cc.
+DEFINE_bool(hive_metastore_sasl_enabled, false,
+            "Configures whether Thrift connections to the Hive Metastore use 
SASL "
+            "(Kerberos) security. Must match the value of the "
+            "hive.metastore.sasl.enabled option in the Hive Metastore 
configuration. "
+            "When enabled, the --keytab_file flag must be provided.");
+TAG_FLAG(hive_metastore_sasl_enabled, experimental);
+
+DEFINE_int32(hive_metastore_retry_count, 1,
+             "The number of times that HMS operations will retry after "
+             "encountering retriable failures, such as network errors.");
+TAG_FLAG(hive_metastore_retry_count, advanced);
+TAG_FLAG(hive_metastore_retry_count, experimental);
+TAG_FLAG(hive_metastore_retry_count, runtime);
+
+DEFINE_int32(hive_metastore_send_timeout, 60,
+             "Configures the socket send timeout, in seconds, for Thrift "
+             "connections to the Hive Metastore.");
+TAG_FLAG(hive_metastore_send_timeout, advanced);
+TAG_FLAG(hive_metastore_send_timeout, experimental);
+TAG_FLAG(hive_metastore_send_timeout, runtime);
+
+DEFINE_int32(hive_metastore_recv_timeout, 60,
+             "Configures the socket receive timeout, in seconds, for Thrift "
+             "connections to the Hive Metastore.");
+TAG_FLAG(hive_metastore_recv_timeout, advanced);
+TAG_FLAG(hive_metastore_recv_timeout, experimental);
+TAG_FLAG(hive_metastore_recv_timeout, runtime);
+
+DEFINE_int32(hive_metastore_conn_timeout, 60,
+             "Configures the socket connect timeout, in seconds, for Thrift "
+             "connections to the Hive Metastore.");
+TAG_FLAG(hive_metastore_conn_timeout, advanced);
+TAG_FLAG(hive_metastore_conn_timeout, experimental);
+TAG_FLAG(hive_metastore_conn_timeout, runtime);
+
+namespace kudu {
+namespace hms {
+
+HmsCatalog::HmsCatalog(string master_addresses)
+    : master_addresses_(std::move(master_addresses)),
+      hms_client_(HostPort("", 0), hms_client_options_),
+      reconnect_after_(MonoTime::Now()),
+      reconnect_failure_(Status::OK()),
+      consecutive_reconnect_failures_(0),
+      reconnect_idx_(0) {
+}
+
+HmsCatalog::~HmsCatalog() {
+  Stop();
+}
+
+Status HmsCatalog::Start() {
+  if (threadpool_) {
+    return Status::IllegalState("HMS Catalog is already started");
+  }
+
+  RETURN_NOT_OK(ParseUris(FLAGS_hive_metastore_uris, &hms_addresses_));
+
+  // The thread pool must be capped at one thread to ensure serialized access 
to
+  // the fields of HmsCatalog.
+  RETURN_NOT_OK(ThreadPoolBuilder("hms-catalog")
+      .set_min_threads(1)
+      .set_max_threads(1)
+      .Build(&threadpool_));
+
+  return Status::OK();
+}
+
+void HmsCatalog::Stop() {
+  if (threadpool_) {
+    threadpool_->Shutdown();
+  }
+}
+
+Status HmsCatalog::CreateTable(const string& id,
+                               const string& name,
+                               const Schema& schema) {
+  hive::Table table;
+  RETURN_NOT_OK(PopulateTable(id, name, schema, &table));
+
+  return Execute([&] (HmsClient* client) {
+    return client->CreateTable(table);
+  });
+}
+
+Status HmsCatalog::DropTable(const string& id, const string& name) {
+  string hms_database;
+  string hms_table;
+
+  if (!ParseTableName(name, &hms_database, &hms_table).ok()) {
+    // This is a legacy table; it can't be in the HMS, so return success.
+    VLOG(1) << "Ignoring missing HMS table entry while dropping table "
+            << name;
+    return Status::OK();
+  }
+
+  hive::EnvironmentContext env_ctx;
+  env_ctx.__set_properties({ make_pair(HmsClient::kKuduTableIdKey, id) });
+
+  return Execute([&] (HmsClient* client) {
+    Status s = client->DropTableWithContext(hms_database, hms_table, env_ctx);
+    if (s.IsNotFound()) {
+      VLOG(1) << "Ignoring missing HMS table entry while dropping table "
+              << name << "(" << id << ")";
+      return Status::OK();
+    }
+    if (s.IsRemoteError() &&
+        s.message().ToString().find("Kudu table ID does not match the non-Kudu 
HMS entry")
+        != string::npos) {
+      // TODO(dan): the string match above is extraordinarily hacky, is there a
+      // better way to match this failure scenario?  Unfortunately errors
+      // occuring in the Kudu Metastore Plugin only manifest as a
+      // 'MetaException', so they can't have a more specific error code.
+      VLOG(1) << "Ignoring HMS table entry which does not belong to the Kudu 
table being dropped "
+              << name << "(" << id << ")";
+      return Status::OK();
+    }
+
+    return s;
+  });
+}
+
+Status HmsCatalog::AlterTable(const string& id,
+                              const string& name,
+                              const string& new_name,
+                              const Schema& schema) {
+
+  return Execute([&] (HmsClient* client) {
+      // The HMS does not have a way to alter individual fields of a table
+      // entry, so we must request the existing table entry from the HMS, 
update
+      // the fields, and write it back. Otherwise we'd overwrite metadata 
fields
+      // that other tools put into place, such as table statistics. We do
+      // overwrite all Kudu-specific entries such as the Kudu master addresses
+      // and the full set of columns. This ensures entries are fully 'repaired'
+      // during an alter operation.
+      //
+      // This can go wrong in a myriad of ways, including:
+      //
+      // - The original table name isn't a valid Hive database/table pair
+      // - The new table name isn't a valid Hive database/table pair
+      // - The original table entry does not exist in the HMS
+      // - The original table entry doesn't match the Kudu table being altered
+      //
+      // Where possible, we try to repair the HMS state to whatever it should 
be
+      // in these situations, unless we detect that such a repair would alter 
an
+      // unrelated table entry.
+
+      // A small function which creates the table, instead of altering it. We
+      // can't call HmsCatalog::CreateTable in this context because it would
+      // deadlock the single-thread executor.
+      auto create_table = [&] {
+        hive::Table table;
+        RETURN_NOT_OK(PopulateTable(id, new_name, schema, &table));
+        return client->CreateTable(table);
+      };
+
+      string hms_database;
+      string hms_table;
+      Status s = ParseTableName(name, &hms_database, &hms_table);
+      if (!s.ok()) {
+        // Parsing the original table name has failed, so it can not be 
present in
+        // the HMS. Instead of altering the table, create it in the HMS as a 
new table.
+        VLOG(1) << "Failed to parse the name of the table being altered as an "
+                   "HMS database/table pair, will attempt to create a new HMS 
table entry: "
+                << s.ToString();
+        return create_table();
+      }
+
+      hive::Table orig_table;
+      s = client->GetTable(hms_database, hms_table, &orig_table);
+      if (s.IsNotFound()) {
+        // The table doesn't already exist in the HMS, so create it.
+        VLOG(1) << "The table being altered does not have an existing HMS 
entry, "
+                   "will attempt to create a new HMS table entry.";
+        return create_table();
+      }
+
+      // All other errors are fatal.
+      RETURN_NOT_OK(s);
+
+      // Check that the HMS entry belongs to the table being altered.
+      if 
(orig_table.parameters[hive::g_hive_metastore_constants.META_TABLE_STORAGE] !=
+          HmsClient::kKuduStorageHandler ||
+          orig_table.parameters[HmsClient::kKuduTableIdKey] != id) {
+        // The original table isn't a Kudu table, or isn't the same Kudu table.
+        if (name != new_name) {
+          // If this is a rename table operation, then just attempt to create
+          // the table under the new name.
+          VLOG(1) << "The HMS entry for the table being altered belongs to 
another table, "
+                     "will attempt to create a new HMS table entry.";
+          return create_table();
+        }
+        // Otherwise we fail, since we should not alter an entry belonging to
+        // a different table.
+        return Status::IllegalState("the HMS entry does not belong to the Kudu 
table");
+      }
+
+      // Create a copy of the table object, and set the Kudu fields in it. If
+      // the original table object and the new table object match exactly then
+      // we don't need to alter the table in the HMS.
+      hive::Table table(orig_table);
+      RETURN_NOT_OK(PopulateTable(id, new_name, schema, &table));
+
+      if (orig_table == table) {
+        CHECK_EQ(name, new_name);
+        VLOG(1) << "Short-circuiting alter table for " << name << " (" << id 
<< ")";
+        return Status::OK();
+      }
+
+      return client->AlterTable(hms_database, hms_table, table);
+  });
+}
+
+template<typename Task>
+Status HmsCatalog::Execute(Task task) {
+  Synchronizer s;
+  auto callback = s.AsStdStatusCallback();
+
+  // TODO(todd): wrapping this in a TRACE_EVENT scope and a LOG_IF_SLOW and 
such
+  // would be helpful. Perhaps a TRACE message and/or a TRACE_COUNTER_INCREMENT
+  // too to keep track of how much time is spent in calls to HMS for a given
+  // CreateTable call. That will also require propagating the current Trace
+  // object into the 'Rpc' object. Note that the HmsClient class already has
+  // LOG_IF_SLOW calls internally.
+
+  RETURN_NOT_OK(threadpool_->SubmitFunc([=] {
+    Status s;
+
+    // The main run routine of the threadpool thread. Runs the task with
+    // exclusive access to the HMS client. If the task fails, it will be
+    // retried, unless the failure type is non-retriable or the maximum number
+    // of retries has been exceeded. Also handles re-connecting the HMS client
+    // after a fatal error.
+    //
+    // Since every task submitted to the (single thread) pool runs this, it's
+    // essentially a single iteration of a run loop which handles HMS client
+    // reconnection and task processing.
+    //
+    // Notes on error handling:
+    //
+    // There are three separate error scenarios below:
+    //
+    // * Error while (re)connecting the HMS client - This is considered a
+    // 'non-recoverable' error. The current task is immediately failed. In 
order
+    // to avoid hot-looping and hammering the HMS with reconnect attempts on
+    // every queued task, we set a backoff period. Any tasks which subsequently
+    // run during this backoff period are also immediately failed.
+    //
+    // * Task results in a fatal error - a fatal error is any error caused by a
+    // network or IO fault (not an application level failure). The HMS client
+    // will attempt to reconnect, and the task will be retried (up to a limit).
+    //
+    // * Task results in a non-fatal error - a non-fatal error is an 
application
+    // level error, and causes the task to be failed immediately (no retries).
+    for (int i = 0; i <= FLAGS_hive_metastore_retry_count; i++) {
+      if (!hms_client_.IsConnected()) {
+        if (reconnect_after_ > MonoTime::Now()) {
+          // Not yet ready to attempt reconnection; fail the task immediately.
+          return callback(reconnect_failure_);
+        }
+
+        // Attempt to reconnect.
+        Status s = Reconnect();
+        if (!s.ok()) {
+          // Reconnect failed; retry with exponential backoff capped at 10s and
+          // fail the task. We don't bother with jitter here because only the
+          // leader master should be attempting this in any given period per
+          // cluster.
+          consecutive_reconnect_failures_++;
+          reconnect_after_ = MonoTime::Now() +
+              std::min(MonoDelta::FromMilliseconds(100 << 
consecutive_reconnect_failures_),
+                       MonoDelta::FromSeconds(10));
+          reconnect_failure_ = std::move(s);
+          return callback(reconnect_failure_);
+        }
+
+        consecutive_reconnect_failures_ = 0;
+      }
+
+      // Execute the task.
+      s = task(&hms_client_);
+
+      // If the task succeeds, or it's a non-retriable error, return the 
result.
+      if (s.ok() || !IsFatalError(s)) {
+        return callback(s);
+      }
+
+      // A fatal error occurred. Tear down the connection, and try again. We
+      // don't log loudly here because odds are the reconnection will fail if
+      // it's a true fault, at which point we do log loudly.
+      VLOG(1) << "Call to HMS failed: " << s.ToString();
+      WARN_NOT_OK(hms_client_.Stop(), "Failed to stop Hive Metastore client");
+    }
+
+    // We've exhausted the allowed retries.
+    LOG(WARNING) << "Call to HMS failed after " << 
FLAGS_hive_metastore_retry_count
+                 << " retries: " << s.ToString();
+    return callback(s);
+  }));
+
+  return s.Wait();
+}
+
+Status HmsCatalog::Reconnect() {
+  Status s;
+
+  HmsClientOptions options;
+  options.send_timeout = 
MonoDelta::FromSeconds(FLAGS_hive_metastore_send_timeout);
+  options.recv_timeout = 
MonoDelta::FromSeconds(FLAGS_hive_metastore_recv_timeout);
+  options.conn_timeout = 
MonoDelta::FromSeconds(FLAGS_hive_metastore_conn_timeout);
+  options.enable_kerberos = FLAGS_hive_metastore_sasl_enabled;
+
+  // Try reconnecting to each HMS in sequence, returning the first one which
+  // succeeds. In order to avoid getting 'stuck' on a partially failed HMS, we
+  // remember which we connected to previously and try it last.
+  for (int i = 0; i < hms_addresses_.size(); i++) {
+    const auto& address = hms_addresses_[reconnect_idx_];
+    reconnect_idx_ = (reconnect_idx_ + 1) % hms_addresses_.size();
+
+    hms_client_ = HmsClient(address, options);
+    Status s = hms_client_.Start();
+    if (s.ok()) {
+      VLOG(1) << "Connected to Hive Metastore " << address.ToString();
+      return Status::OK();
+    }
+
+    WARN_NOT_OK(s, Substitute("Failed to connect to Hive Metastore ($0)", 
address.ToString()))
+  }
+
+  WARN_NOT_OK(hms_client_.Stop(), "Failed to stop Hive Metastore client");
+  return s;
+}
+
+bool HmsCatalog::IsFatalError(const Status& status) {
+  // Whitelist of errors which are not fatal. This errs on the side of
+  // considering an error fatal since the consequences are low; just an
+  // unnecessary reconnect. If a fatal error is not recognized it could cause
+  // another RPC to fail, since there is no way to check the status of the
+  // connection before sending an RPC.
+  return !(status.IsAlreadyPresent()
+        || status.IsNotFound()
+        || status.IsInvalidArgument()
+        || status.IsIllegalState()
+        || status.IsRemoteError());
+}
+
+namespace {
+
+string column_to_field_type(const ColumnSchema& column) {
+  // See org.apache.hadoop.hive.serde.serdeConstants.
+  switch (column.type_info()->type()) {
+    case BOOL: return "boolean";
+    case INT8: return "tinyint";
+    case INT16: return "smallint";
+    case INT32: return "int";
+    case INT64: return "bigint";
+    case DECIMAL32:
+    case DECIMAL64:
+    case DECIMAL128: return Substitute("decimal($0,$1)",
+                                       column.type_attributes().precision,
+                                       column.type_attributes().scale);
+    case FLOAT: return "float";
+    case DOUBLE: return "double";
+    case STRING: return "string";
+    case BINARY: return "binary";
+    case UNIXTIME_MICROS: return "timestamp";
+    default: LOG(FATAL) << "unhandled column type: " << column.TypeToString();
+  }
+}
+
+hive::FieldSchema column_to_field(const ColumnSchema& column) {
+  hive::FieldSchema field;
+  field.name = column.name();
+  field.type = column_to_field_type(column);
+  return field;
+}
+
+} // anonymous namespace
+
+Status HmsCatalog::PopulateTable(const string& id,
+                                 const string& name,
+                                 const Schema& schema,
+                                 hive::Table* table) {
+  RETURN_NOT_OK(ParseTableName(name, &table->dbName, &table->tableName));
+  table->tableType = HmsClient::kManagedTable;
+
+  // Add the Kudu-specific parameters. This intentionally avoids overwriting
+  // other parameters.
+  table->parameters[HmsClient::kKuduTableIdKey] = id;
+  table->parameters[HmsClient::kKuduMasterAddrsKey] = master_addresses_;
+  table->parameters[hive::g_hive_metastore_constants.META_TABLE_STORAGE] =
+      HmsClient::kKuduStorageHandler;
+
+  // Overwrite the entire set of columns.
+  vector<hive::FieldSchema> fields;
+  for (const auto& column : schema.columns()) {
+    fields.emplace_back(column_to_field(column));
+  }
+  table->sd.cols = std::move(fields);
+
+  return Status::OK();
+}
+
+Status HmsCatalog::ParseTableName(const string& table,
+                                  string* hms_database,
+                                  string* hms_table) {
+  // We do minimal parsing or validating of the identifiers, since Hive has
+  // different validation rules based on configuration (and probably version).
+  // The only rule we enforce is that there be exactly one period to separate
+  // the database and table names, we leave checking of everything else to the
+  // HMS.
+  //
+  // See org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
+
+  vector<string> identifiers = strings::Split(table, ".");
+  if (identifiers.size() != 2) {
+    return Status::InvalidArgument(
+        "when the Hive Metastore integration is enabled, Kudu table names must 
be a "
+        "period ('.') separated database and table name pair",
+        table);
+  }
+
+  *hms_database = std::move(identifiers[0]);
+  *hms_table = std::move(identifiers[1]);
+  return Status::OK();
+}
+
+Status HmsCatalog::ParseUris(const string& metastore_uris, vector<HostPort>* 
hostports) {
+  hostports->clear();
+
+  vector<string> uris = strings::Split(metastore_uris, ",", 
strings::SkipEmpty());
+  const string kSchemeSeparator = "://";
+
+  for (auto& uri : uris) {
+    auto scheme_idx = uri.find(kSchemeSeparator, 1);
+    if (scheme_idx == string::npos) {
+      return Status::InvalidArgument("invalid Hive Metastore URI: missing 
scheme", uri);
+    }
+    uri.erase(0, scheme_idx + kSchemeSeparator.size());
+
+    HostPort hp;
+    RETURN_NOT_OK(hp.ParseString(uri, HmsClient::kDefaultHmsPort));
+    // Note: the Java HMS client canonicalizes the hostname to a FQDN at this
+    // point. We skip that because the krb5 library should handle it for us
+    // (when rdns = true), whereas the Java GSSAPI implementation apparently
+    // never canonicalizes.
+    //
+    // See org.apache.hadoop.hive.metastore.HiveMetastoreClient.resolveUris.
+    hostports->emplace_back(std::move(hp));
+  }
+
+  return Status::OK();
+}
+
+// Validates the hive_metastore_uris gflag.
+bool HmsCatalog::ValidateUris(const char* flag_name, const string& 
metastore_uris) {
+  vector<HostPort> host_ports;
+  Status s = HmsCatalog::ParseUris(metastore_uris, &host_ports);
+  if (!s.ok()) {
+    LOG(ERROR) << "invalid flag " << flag_name << ": " << s.ToString();
+  }
+  return s.ok();
+}
+
+} // namespace hms
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/hms/hms_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
new file mode 100644
index 0000000..fbd2839
--- /dev/null
+++ b/src/kudu/hms/hms_catalog.h
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <vector> // IWYU pragma: keep
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/gscoped_ptr.h" // IWYU pragma: keep
+#include "kudu/gutil/port.h"
+#include "kudu/hms/hms_client.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h" // IWYU pragma: keep
+#include "kudu/util/status.h"
+
+namespace hive {
+class Table;
+}
+
+namespace kudu {
+
+class Schema;
+class ThreadPool; // IWYU pragma: keep
+
+namespace hms {
+
+// A high-level API above the HMS which handles converting to and from
+// Kudu-specific types, retries, reconnections, HA, error handling, and
+// concurrent requests.
+//
+// This class is thread-safe after Start() is called.
+class HmsCatalog {
+ public:
+
+  explicit HmsCatalog(std::string master_addresses);
+  ~HmsCatalog();
+
+  // Starts the HmsCatalog instance.
+  Status Start();
+
+  // Stops the HmsCatalog instance.
+  void Stop();
+
+  // Creates a new table entry in the HMS.
+  //
+  // Fails the HMS is unreachable, or a table with the same name is already 
present.
+  Status CreateTable(const std::string& id,
+                     const std::string& name,
+                     const Schema& schema) WARN_UNUSED_RESULT;
+
+  // Drops a table entry from the HMS, if it exists.
+  //
+  // This method will fail if the HMS is unreachable, or if the table entry in
+  // the HMS doesn't match the specified Kudu table ID.
+  Status DropTable(const std::string& id,
+                   const std::string& name) WARN_UNUSED_RESULT;
+
+  // Alters a table entry in the HMS, if it exists. If the table entry does not
+  // exist it will be created instead.
+  //
+  // This method will fail if the HMS is unreachable, or if the table entry in
+  // the HMS doesn't match the specified Kudu table ID.
+  Status AlterTable(const std::string& id,
+                    const std::string& name,
+                    const std::string& new_name,
+                    const Schema& schema) WARN_UNUSED_RESULT;
+
+  // Validates the hive_metastore_uris gflag.
+  static bool ValidateUris(const char* flag_name, const std::string& 
metastore_uris);
+
+  // Validates the Hive Metastore SASL gflags.
+  static bool ValidateSasl();
+
+ private:
+
+  FRIEND_TEST(HmsCatalogStaticTest, TestParseTableName);
+  FRIEND_TEST(HmsCatalogStaticTest, TestParseUris);
+
+  // Synchronously executes a task with exclusive access to the HMS client.
+  template<typename Task>
+  Status Execute(Task task) WARN_UNUSED_RESULT;
+
+  // Reconnects hms_client_ to an HMS, or returns an error if all HMS instances
+  // are unavailable.
+  Status Reconnect();
+
+  // Returns true if the RPC status is 'fatal', e.g. the Thrift connection on
+  // which it occurred should be shut down.
+  bool IsFatalError(const Status& status);
+
+  // Sets the Kudu-specific fields in the table without overwriting unrelated 
fields.
+  Status PopulateTable(const std::string& id,
+                       const std::string& name,
+                       const Schema& schema,
+                       hive::Table* table) WARN_UNUSED_RESULT;
+
+  // Parses a Kudu table name into a Hive database and table name.
+  // Returns an error if the Kudu table name is not correctly formatted.
+  static Status ParseTableName(const std::string& table,
+                               std::string* hms_database,
+                               std::string* hms_table) WARN_UNUSED_RESULT;
+
+  // Parses a Hive Metastore URI string into a sequence of HostPorts.
+  static Status ParseUris(const std::string& metastore_uris, 
std::vector<HostPort>* hostports);
+
+  // Kudu master addresses.
+  const std::string master_addresses_;
+
+  // Initialized during Start().
+  std::vector<HostPort> hms_addresses_;
+  gscoped_ptr<ThreadPool> threadpool_;
+
+  // Fields only used by the threadpool thread:
+
+  // Options to use when creating the HMS client.
+  hms::HmsClientOptions hms_client_options_;
+  // The HMS client.
+  hms::HmsClient hms_client_;
+
+  // Fields which track consecutive reconnection attempts and backoff.
+  MonoTime reconnect_after_;
+  Status reconnect_failure_;
+  int consecutive_reconnect_failures_;
+  int reconnect_idx_;
+};
+
+} // namespace hms
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/hms/mini_hms.cc
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.cc b/src/kudu/hms/mini_hms.cc
index f571b6c..fa09c08 100644
--- a/src/kudu/hms/mini_hms.cc
+++ b/src/kudu/hms/mini_hms.cc
@@ -167,6 +167,10 @@ Status MiniHms::Resume() {
   return Status::OK();
 }
 
+string MiniHms::uris() const {
+  return Substitute("thrift://127.0.0.1:$0", port_);
+}
+
 Status MiniHms::CreateHiveSite(const string& tmp_dir) const {
 
   // - datanucleus.schema.autoCreateAll

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/hms/mini_hms.h
----------------------------------------------------------------------
diff --git a/src/kudu/hms/mini_hms.h b/src/kudu/hms/mini_hms.h
index 52f480b..9b76780 100644
--- a/src/kudu/hms/mini_hms.h
+++ b/src/kudu/hms/mini_hms.h
@@ -73,6 +73,10 @@ class MiniHms {
     return HostPort("127.0.0.1", port_);
   }
 
+  /// Returns the Metastore URIs, in the format that the Hive
+  /// hive.metastore.uris configuration expects.
+  std::string uris() const;
+
  private:
 
   // Creates a hive-site.xml for the mini HMS.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/master/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 5d5fe2c..5e1c528 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -51,6 +51,7 @@ target_link_libraries(master
   krpc
   kserver
   kudu_common
+  kudu_hms
   kudu_util
   master_proto
   rpc_header_proto

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe4d962c/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index ce16a46..41bb27e 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/cfile/block_cache.h"
@@ -54,6 +55,7 @@
 #include "kudu/tserver/tablet_copy_service.h"
 #include "kudu/tserver/tablet_service.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -78,6 +80,9 @@ DEFINE_int64(authn_token_validity_seconds, 60 * 60 * 24 * 7,
              "validity period expires.");
 TAG_FLAG(authn_token_validity_seconds, experimental);
 
+DECLARE_bool(hive_metastore_sasl_enabled);
+DECLARE_string(keytab_file);
+
 using std::min;
 using std::shared_ptr;
 using std::string;
@@ -93,6 +98,24 @@ using strings::Substitute;
 namespace kudu {
 namespace master {
 
+namespace {
+// Validates that if the HMS is configured with SASL enabled, the server has a
+// keytab available. This is located in master.cc because the HMS module (where
+// --hive_metastore_sasl_enabled is defined) doesn't link to the server module
+// (where --keytab_file is defined), and vice-versa. The master module is the
+// first module which links to both.
+bool ValidateHiveMetastoreSaslEnabled() {
+  if (FLAGS_hive_metastore_sasl_enabled && FLAGS_keytab_file.empty()) {
+    LOG(ERROR) << "When the Hive Metastore has SASL enabled "
+                  "(--hive_metastore_sasl_enabled), Kudu must be configured 
with "
+                  "a keytab (--keytab_file).";
+    return false;
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(hive_metastore_sasl_enabled, 
ValidateHiveMetastoreSaslEnabled);
+} // anonymous namespace
+
 Master::Master(const MasterOptions& opts)
   : KuduServer("Master", opts, "kudu.master"),
     state_(kStopped),

Reply via email to