This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c3d0502 HIVE-21841 : Leader election in HMS to run housekeeping
tasks. (Ashutosh Bapat reviewed by Mahesh Kumar Behera)
c3d0502 is described below
commit c3d05022f79ed8ef6c61ca9c088f6a8a992d6e8d
Author: Ashutosh Bapat <[email protected]>
AuthorDate: Thu Jun 20 09:48:36 2019 +0530
HIVE-21841 : Leader election in HMS to run housekeeping tasks. (Ashutosh
Bapat reviewed by Mahesh Kumar Behera)
Signed-off-by: mbehera <[email protected]>
---
.../MetastoreHousekeepingLeaderTestBase.java | 192 +++++++++++++++++++++
.../MetastoreTaskThreadAlwaysTestImpl.java | 60 +++++++
.../RemoteMetastoreTaskThreadTestImpl1.java | 60 +++++++
.../RemoteMetastoreTaskThreadTestImpl2.java | 60 +++++++
.../metastore/TestMetastoreHousekeepingLeader.java | 61 +++++++
...TestMetastoreHousekeepingLeaderEmptyConfig.java | 62 +++++++
.../TestMetastoreHousekeepingNonLeader.java | 71 ++++++++
.../hadoop/hive/ql/stats/StatsUpdaterThread.java | 5 +-
.../hadoop/hive/metastore/ReplChangeManager.java | 3 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 9 +
.../hadoop/hive/metastore/HiveMetaStore.java | 103 ++++++++---
.../hadoop/hive/metastore/MetaStoreTestUtils.java | 35 +++-
.../hive/metastore/TestMarkPartitionRemote.java | 2 +-
13 files changed, 686 insertions(+), 37 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
new file mode 100644
index 0000000..d89d67c
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.ql.stats.StatsUpdaterThread;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for HMS leader config testing.
+ */
+class MetastoreHousekeepingLeaderTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetastoreHousekeepingLeaderTestBase.class);
+ private static HiveMetaStoreClient client;
+ private static Configuration conf = MetastoreConf.newMetastoreConf();
+ private static Warehouse warehouse;
+ private static boolean isServerStarted = false;
+ private static int port;
+ private static MiniDFSCluster miniDFS;
+ // How long should we wait for the housekeeping threads to start in ms.
+ private static final long SLEEP_INTERVAL_FOR_THREADS_TO_START = 10000;
+ // Threads using ThreadPool will start after the configured interval. So,
start them some time
+ // before we check the existence of threads.
+ private static final long REMOTE_TASKS_INTERVAL =
SLEEP_INTERVAL_FOR_THREADS_TO_START - 3000;
+ static final String METASTORE_THREAD_TASK_FREQ_CONF =
"metastore.leader.test.task.freq";
+
+ static Map<String, Boolean> threadNames = new HashMap<>();
+ static Map<Class, Boolean> threadClasses = new HashMap<>();
+
+ void internalSetup(final String leaderHostName) throws Exception {
+ MetaStoreTestUtils.setConfForStandloneMode(conf);
+ MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost");
+ MetastoreConf.setVar(conf,
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
+
+ addHouseKeepingThreadConfigs();
+
+ warehouse = new Warehouse(conf);
+
+ if (isServerStarted) {
+ Assert.assertNotNull("Unable to connect to the MetaStore server",
client);
+ return;
+ }
+
+ port =
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
+ conf, true);
+ System.out.println("Starting MetaStore Server on port " + port);
+ isServerStarted = true;
+
+ // If the client connects the metastore service has started. This is used
as a signal to
+ // start tests.
+ client = createClient();
+ }
+
+ private HiveMetaStoreClient createClient() throws Exception {
+ MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" +
port);
+ MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false);
+ return new HiveMetaStoreClient(conf);
+ }
+
+ private void addHouseKeepingThreadConfigs() throws Exception {
+ conf.setTimeDuration(METASTORE_THREAD_TASK_FREQ_CONF,
REMOTE_TASKS_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ addStatsUpdaterThreadConfigs();
+ addReplChangeManagerConfigs();
+ addCompactorConfigs();
+ long numTasks = addRemoteOnlyTasksConfigs();
+ numTasks = numTasks + addAlwaysTasksConfigs();
+ MetastoreConf.setLongVar(conf, ConfVars.THREAD_POOL_SIZE, numTasks);
+ }
+ private void addStatsUpdaterThreadConfigs() {
+ MetastoreConf.setLongVar(conf, ConfVars.STATS_AUTO_UPDATE_WORKER_COUNT, 1);
+ MetastoreConf.setVar(conf, ConfVars.STATS_AUTO_UPDATE, "all");
+ threadClasses.put(StatsUpdaterThread.class, false);
+ threadNames.put(StatsUpdaterThread.WORKER_NAME_PREFIX, false);
+ }
+
+ private void addReplChangeManagerConfigs() throws Exception {
+ miniDFS = new MiniDFSCluster.Builder(new
Configuration()).numDataNodes(1).format(true).build();
+ MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true);
+ String cmroot = "hdfs://" + miniDFS.getNameNode().getHostAndPort() +
"/cmroot";
+ MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, cmroot);
+ threadNames.put(ReplChangeManager.CM_THREAD_NAME_PREFIX, false);
+ }
+
+ private void addCompactorConfigs() {
+ MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true);
+ MetastoreConf.setVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN,
"metastore");
+ MetastoreConf.setLongVar(conf, ConfVars.COMPACTOR_WORKER_THREADS, 1);
+ threadClasses.put(Initiator.class, false);
+ threadClasses.put(Worker.class, false);
+ threadClasses.put(Cleaner.class, false);
+ }
+
+ private long addRemoteOnlyTasksConfigs() {
+ String remoteTaskClassPaths =
+ RemoteMetastoreTaskThreadTestImpl1.class.getCanonicalName() + "," +
+
RemoteMetastoreTaskThreadTestImpl2.class.getCanonicalName();
+
+ MetastoreConf.setVar(conf, ConfVars.TASK_THREADS_REMOTE_ONLY,
remoteTaskClassPaths);
+
+ threadNames.put(RemoteMetastoreTaskThreadTestImpl1.TASK_NAME, false);
+ threadNames.put(RemoteMetastoreTaskThreadTestImpl2.TASK_NAME, false);
+
+ return 2;
+ }
+
+ private long addAlwaysTasksConfigs() throws Exception {
+ String alwaysTaskClassPaths =
MetastoreTaskThreadAlwaysTestImpl.class.getCanonicalName();
+ MetastoreConf.setVar(conf, ConfVars.TASK_THREADS_ALWAYS,
alwaysTaskClassPaths);
+ threadNames.put(MetastoreTaskThreadAlwaysTestImpl.TASK_NAME, false);
+ return 1;
+ }
+
+ private static String getAllThreadsAsString() {
+ Map<Thread, StackTraceElement[]> threadStacks = Thread.getAllStackTraces();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<Thread, StackTraceElement[]> entry :
threadStacks.entrySet()) {
+ Thread t = entry.getKey();
+ sb.append(System.lineSeparator());
+ sb.append("Name: ").append(t.getName()).append(" State:
").append(t.getState())
+ .append(" Class name: ").append(t.getClass().getCanonicalName());
+ }
+ return sb.toString();
+ }
+
+ void searchHousekeepingThreads() throws Exception {
+ // Client has been created so the metastore has started serving. Sleep for
few seconds for
+ // the housekeeping threads to start.
+ Thread.sleep(SLEEP_INTERVAL_FOR_THREADS_TO_START);
+
+ LOG.info(getAllThreadsAsString());
+
+ // Check if all the housekeeping threads have been started.
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ for (Thread thread : threads) {
+ // All house keeping threads should be alive.
+ if (!thread.isAlive()) {
+ continue;
+ }
+
+ // Account for the threads identifiable by their classes.
+ if (threadClasses.get(thread.getClass()) != null) {
+ threadClasses.put(thread.getClass(), true);
+ }
+
+ // Account for the threads identifiable by their names
+ String threadName = thread.getName();
+ if (threadName == null) {
+ continue;
+ }
+
+ if (threadName.startsWith(StatsUpdaterThread.WORKER_NAME_PREFIX)) {
+ threadNames.put(StatsUpdaterThread.WORKER_NAME_PREFIX, true);
+ } else if
(threadName.startsWith(ReplChangeManager.CM_THREAD_NAME_PREFIX)) {
+ threadNames.put(ReplChangeManager.CM_THREAD_NAME_PREFIX, true);
+ } else if (threadNames.get(threadName) != null) {
+ threadNames.put(threadName, true);
+ }
+ }
+ }
+}
+
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java
new file mode 100644
index 0000000..4cd2c58
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreTaskThreadAlwaysTestImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of MetastoreTaskThread for testing metastore leader
config.
+ */
+public class MetastoreTaskThreadAlwaysTestImpl implements MetastoreTaskThread {
+ static final String TASK_NAME = "metastore_task_thread_test_impl_3";
+ public static final Logger LOG =
LoggerFactory.getLogger(MetastoreTaskThreadAlwaysTestImpl.class);
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return
conf.getTimeDuration(MetastoreHousekeepingLeaderTestBase.METASTORE_THREAD_TASK_FREQ_CONF,
+ 0, unit);
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Name of thread " + Thread.currentThread().getName() + " changed
to " + TASK_NAME);
+ Thread.currentThread().setName(TASK_NAME);
+ try {
+ Thread.sleep(runFrequency(TimeUnit.MILLISECONDS));
+ } catch (InterruptedException ie) {
+ LOG.error("Task " + TASK_NAME + " interrupted: " + ie.getMessage(), ie);
+ }
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java
new file mode 100644
index 0000000..c590b6a
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl1.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of MetastoreTaskThread for testing metastore leader
config.
+ */
+public class RemoteMetastoreTaskThreadTestImpl1 implements MetastoreTaskThread
{
+ static final String TASK_NAME = "metastore_task_thread_test_impl_1";
+ public static final Logger LOG =
LoggerFactory.getLogger(RemoteMetastoreTaskThreadTestImpl1.class);
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return
conf.getTimeDuration(MetastoreHousekeepingLeaderTestBase.METASTORE_THREAD_TASK_FREQ_CONF,
+ 0, unit);
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Name of thread " + Thread.currentThread().getName() + " changed
to " + TASK_NAME);
+ Thread.currentThread().setName(TASK_NAME);
+ try {
+ Thread.sleep(runFrequency(TimeUnit.MILLISECONDS));
+ } catch (InterruptedException ie) {
+ LOG.error("Task " + TASK_NAME + " interrupted: " + ie.getMessage(), ie);
+ }
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java
new file mode 100644
index 0000000..5b50f66
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/RemoteMetastoreTaskThreadTestImpl2.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of MetastoreTaskThread for testing metastore leader
config.
+ */
+public class RemoteMetastoreTaskThreadTestImpl2 implements MetastoreTaskThread
{
+ static final String TASK_NAME = "metastore_task_thread_test_impl_2";
+ public static final Logger LOG =
LoggerFactory.getLogger(RemoteMetastoreTaskThreadTestImpl2.class);
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return
conf.getTimeDuration(MetastoreHousekeepingLeaderTestBase.METASTORE_THREAD_TASK_FREQ_CONF,
+ 0, unit);
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Name of thread " + Thread.currentThread().getName() + " changed
to " + TASK_NAME);
+ Thread.currentThread().setName(TASK_NAME);
+ try {
+ Thread.sleep(runFrequency(TimeUnit.MILLISECONDS));
+ } catch (InterruptedException ie) {
+ LOG.error("Task " + TASK_NAME + " interrupted: " + ie.getMessage(), ie);
+ }
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
new file mode 100644
index 0000000..e8b820d
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Test for specifying a valid hostname as HMS leader.
+ */
+public class TestMetastoreHousekeepingLeader extends
MetastoreHousekeepingLeaderTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestMetastoreHousekeepingLeader.class);
+
+ @Before
+ public void setUp() throws Exception {
+ internalSetup("localhost");
+ }
+
+ @Test
+ public void testHouseKeepingThreadExistence() throws Exception {
+ searchHousekeepingThreads();
+
+ // Verify existence of threads
+ for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+ if (entry.getValue()) {
+ LOG.info("Found thread with name " + entry.getKey());
+ }
+ Assert.assertTrue("No thread with name " + entry.getKey() + " found.",
entry.getValue());
+ }
+
+ for (Map.Entry<Class, Boolean> entry : threadClasses.entrySet()) {
+ if (entry.getValue()) {
+ LOG.info("Found thread for " + entry.getKey().getSimpleName());
+ }
+ Assert.assertTrue("No thread found for class " +
entry.getKey().getSimpleName(),
+ entry.getValue());
+ }
+ }
+}
+
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
new file mode 100644
index 0000000..202a677
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Test for specifying empty HMS leader.
+ */
+public class TestMetastoreHousekeepingLeaderEmptyConfig extends
MetastoreHousekeepingLeaderTestBase {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);
+
+ @Before
+ public void setUp() throws Exception {
+ // Empty string for leader indicates that the HMS is leader.
+ internalSetup("");
+ }
+
+ @Test
+ public void testHouseKeepingThreadExistence() throws Exception {
+ searchHousekeepingThreads();
+
+ // Verify existence of threads
+ for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+ if (entry.getValue()) {
+ LOG.info("Found thread with name " + entry.getKey());
+ }
+ Assert.assertTrue("No thread with name " + entry.getKey() + " found.",
entry.getValue());
+ }
+
+ for (Map.Entry<Class, Boolean> entry : threadClasses.entrySet()) {
+ if (entry.getValue()) {
+ LOG.info("Found thread for " + entry.getKey().getSimpleName());
+ }
+ Assert.assertTrue("No thread found for class " +
entry.getKey().getSimpleName(),
+ entry.getValue());
+ }
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
new file mode 100644
index 0000000..0341d3c
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Test for specifying HMS leader other than the current one.
+ */
+public class TestMetastoreHousekeepingNonLeader extends
MetastoreHousekeepingLeaderTestBase {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);
+
+ @Before
+ public void setUp() throws Exception {
+ // Empty string for leader indicates that the HMS is leader.
+ internalSetup("some_non_leader_host.domain1.domain");
+ }
+
+ @Test
+ public void testHouseKeepingThreadExistence() throws Exception {
+ searchHousekeepingThreads();
+
+ // Verify existence of threads
+ for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+ if (!entry.getValue()) {
+ LOG.info("No thread found with name " + entry.getKey());
+ }
+ Assert.assertFalse("Thread with name " + entry.getKey() + " found.",
entry.getValue());
+ }
+
+ for (Map.Entry<Class, Boolean> entry : threadClasses.entrySet()) {
+ // A non-leader HMS will still run the configured number of Compaction
worker threads.
+ if (entry.getKey() == Worker.class) {
+ if (entry.getValue()) {
+ LOG.info("Thread found for " + entry.getKey().getSimpleName());
+ }
+ Assert.assertTrue("No thread found for " +
entry.getKey().getSimpleName(), entry.getValue());
+ } else {
+ if (!entry.getValue()) {
+ LOG.info("No thread found for " + entry.getKey().getSimpleName());
+ }
+ Assert.assertFalse("Thread found for class " +
entry.getKey().getSimpleName(),
+ entry.getValue());
+ }
+ }
+ }
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index 7c1944f..8acb1c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -67,6 +67,7 @@ import com.google.common.collect.Lists;
public class StatsUpdaterThread extends Thread implements MetaStoreThread {
public static final String SKIP_STATS_AUTOUPDATE_PROPERTY =
"skip.stats.autoupdate";
+ public static final String WORKER_NAME_PREFIX = "Stats updater worker ";
private static final Logger LOG =
LoggerFactory.getLogger(StatsUpdaterThread.class);
protected Configuration conf;
@@ -144,12 +145,13 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
for (int i = 0; i < workers.length; ++i) {
workers[i] = new Thread(new WorkerRunnable(conf, user));
workers[i].setDaemon(true);
- workers[i].setName("Stats updater worker " + i);
+ workers[i].setName(WORKER_NAME_PREFIX + i);
}
}
@Override
public void run() {
+ LOG.info("Stats updater thread started");
startWorkers();
while (!stop.get()) {
boolean hadUpdates = runOneIteration();
@@ -167,6 +169,7 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
@VisibleForTesting
void startWorkers() {
for (int i = 0; i < workers.length; ++i) {
+ LOG.info("Stats updater worker thread " + workers[i].getName() + "
started");
workers[i].start();
}
}
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index d6c6d50..54289af 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -60,6 +60,7 @@ public class ReplChangeManager {
private static final String URI_FRAGMENT_SEPARATOR = "#";
public static final String SOURCE_OF_REPLICATION = "repl.source.for";
private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
+ static final String CM_THREAD_NAME_PREFIX = "cmclearer-";
public enum RecycleType {
MOVE,
@@ -457,7 +458,7 @@ public class ReplChangeManager {
if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder()
- .namingPattern("cmclearer-%d")
+ .namingPattern(CM_THREAD_NAME_PREFIX + "%d")
.daemon(true)
.build());
executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf,
ConfVars.REPLCMDIR),
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index c0d8919..927324e 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -386,6 +386,15 @@ public class MetastoreConf {
new RangeValidator(1, 20), "Number of consecutive compaction failures
(per table/partition) " +
"after which automatic compactions will not be scheduled any more.
Note that this must be less " +
"than hive.compactor.history.retention.failed."),
+
METASTORE_HOUSEKEEPING_LEADER_HOSTNAME("metastore.housekeeping.leader.hostname",
+ "hive.metastore.housekeeping.leader.hostname", "",
+"If there are multiple Thrift metastore services running, the hostname of
Thrift metastore " +
+ "service to run housekeeping tasks at. By default this values is
empty, which " +
+ "means that the current metastore will run the housekeeping tasks. If
configuration" +
+ "metastore.thrift.bind.host is set on the intended leader metastore,
this value should " +
+ "match that configuration. Otherwise it should be same as the hostname
returned by " +
+ "InetAddress#getLocalHost#getHostName(). Given the uncertainty in the
later " +
+ "it is desirable to configure metastore.thrift.bind.host on the
intended leader HMS."),
COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on",
"hive.compactor.initiator.on", false,
"Whether to run the initiator and cleaner threads on this metastore
instance or not.\n" +
"Set this to true on one instance of the Thrift metastore service
as part of turning\n" +
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index a5286b5..7e97f8d 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -609,22 +609,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
partitionValidationPattern = null;
}
- // We only initialize once the tasks that need to be run periodically
- if (alwaysThreadsInitialized.compareAndSet(false, true)) {
- ThreadPool.initialize(conf);
- Collection<String> taskNames =
- MetastoreConf.getStringCollection(conf,
ConfVars.TASK_THREADS_ALWAYS);
- for (String taskName : taskNames) {
- MetastoreTaskThread task =
- JavaUtils.newInstance(JavaUtils.getClass(taskName,
MetastoreTaskThread.class));
- task.setConf(conf);
- long freq = task.runFrequency(TimeUnit.MILLISECONDS);
- // For backwards compatibility, since some threads used to be hard
coded but only run if
- // frequency was > 0
- if (freq > 0) {
- ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq,
TimeUnit.MILLISECONDS);
- }
- }
+ // We only initialize once the tasks that need to be run periodically.
For remote metastore
+ // these threads are started along with the other housekeeping threads
only in the leader
+ // HMS.
+ String leaderHost = MetastoreConf.getVar(conf,
+ MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
+ if (!isMetaStoreRemote() && ((leaderHost == null) ||
leaderHost.trim().isEmpty())) {
+ startAlwaysTaskThreads(conf);
+ } else if (!isMetaStoreRemote()) {
+ LOG.info("Not starting tasks specified by " +
ConfVars.TASK_THREADS_ALWAYS.getVarname() +
+ " since " + leaderHost + " is configured to run these tasks.");
}
expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
fileMetadataManager = new FileMetadataManager(this.getMS(), conf);
@@ -653,6 +647,27 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
+ private static void startAlwaysTaskThreads(Configuration conf) throws
MetaException {
+ if (alwaysThreadsInitialized.compareAndSet(false, true)) {
+ ThreadPool.initialize(conf);
+ Collection<String> taskNames =
+ MetastoreConf.getStringCollection(conf,
ConfVars.TASK_THREADS_ALWAYS);
+ for (String taskName : taskNames) {
+ MetastoreTaskThread task =
+ JavaUtils.newInstance(JavaUtils.getClass(taskName,
MetastoreTaskThread.class));
+ task.setConf(conf);
+ long freq = task.runFrequency(TimeUnit.MILLISECONDS);
+ LOG.info("Scheduling for " + task.getClass().getCanonicalName() + "
service with " +
+ "frequency " + freq + "ms.");
+ // For backwards compatibility, since some threads used to be hard
coded but only run if
+ // frequency was > 0
+ if (freq > 0) {
+ ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq,
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
/**
*
* Filter is actually enabled only when the configured filter hook is
configured, not default, and
@@ -9723,7 +9738,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Lock startLock = new ReentrantLock();
Condition startCondition = startLock.newCondition();
AtomicBoolean startedServing = new AtomicBoolean();
- startMetaStoreThreads(conf, startLock, startCondition, startedServing);
startMetaStore(cli.getPort(), HadoopThriftAuthBridge.getBridge(), conf,
startLock,
startCondition, startedServing);
} catch (Throwable t) {
@@ -9928,6 +9942,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
HMSHandler.LOG.info("Direct SQL optimization = {}", directSqlEnabled);
if (startLock != null) {
+ startMetaStoreThreads(conf, startLock, startCondition, startedServing,
+ isMetastoreHousekeepingLeader(conf, getServerHostName()));
signalOtherThreadsToStart(tServer, startLock, startCondition,
startedServing);
}
@@ -9950,6 +9966,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
tServer.serve();
}
+ private static boolean isMetastoreHousekeepingLeader(Configuration conf,
String serverHost) {
+ String leaderHost =
+ MetastoreConf.getVar(conf,
+
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
+
+ // For the sake of backward compatibility, when the current HMS becomes
the leader when no
+ // leader is specified.
+ if (leaderHost == null || leaderHost.isEmpty()) {
+ LOG.info(ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME + " is empty.
Start all the " +
+ "housekeeping threads.");
+ return true;
+ }
+
+ LOG.info(ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME + " is set to " +
leaderHost);
+ return leaderHost.trim().equals(serverHost);
+ }
+
/**
* @param port where metastore server is running
* @return metastore server instance URL. If the metastore server was bound
to a configured
@@ -9958,13 +9991,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
* @throws Exception
*/
private static String getServerInstanceURI(int port) throws Exception {
- String hostName;
+ return getServerHostName() + ":" + port;
+ }
+
+ private static String getServerHostName() throws Exception {
if (msHost != null && !msHost.trim().isEmpty()) {
- hostName = msHost;
+ return msHost.trim();
} else {
- hostName = InetAddress.getLocalHost().getHostName();
+ return InetAddress.getLocalHost().getHostName();
}
- return hostName + ":" + port;
}
private static void cleanupRawStore() {
@@ -10014,14 +10049,15 @@ public class HiveMetaStore extends
ThriftHiveMetastore {
t.start();
}
-
/**
* Start threads outside of the thrift service, such as the compactor
threads.
* @param conf Hive configuration object
+ * @param isLeader true if this metastore is a leader. Most of the
housekeeping threads are
+ * started only in a leader HMS.
*/
private static void startMetaStoreThreads(final Configuration conf, final
Lock startLock,
final Condition startCondition,
final
- AtomicBoolean startedServing) {
+ AtomicBoolean startedServing,
boolean isLeader) {
// A thread is spun up to start these other threads. That's because we
can't start them
// until after the TServer has started, but once TServer.serve is called
we aren't given back
// control.
@@ -10050,13 +10086,21 @@ public class HiveMetaStore extends
ThriftHiveMetastore {
while (!startedServing.get()) {
startCondition.await();
}
- startCompactorInitiator(conf);
+
+ if (isLeader) {
+ startCompactorInitiator(conf);
+ startCompactorCleaner(conf);
+ startRemoteOnlyTasks(conf);
+ startStatsUpdater(conf);
+ HiveMetaStore.HMSHandler.startAlwaysTaskThreads(conf);
+ }
+
+ // The leader HMS may not necessarily have sufficient compute
capacity required to run
+ // actual compaction work. So it can run on a non-leader HMS with
sufficient capacity
+ // or a configured HS2 instance.
if (MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) {
startCompactorWorkers(conf);
}
- startCompactorCleaner(conf);
- startRemoteOnlyTasks(conf);
- startStatsUpdater(conf);
} catch (Throwable e) {
LOG.error("Failure when starting the compactor, compactions may not
happen, " +
StringUtils.stringifyException(e));
@@ -10064,7 +10108,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
startLock.unlock();
}
- ReplChangeManager.scheduleCMClearer(conf);
+ if (isLeader) {
+ ReplChangeManager.scheduleCMClearer(conf);
+ }
}
};
t.setDaemon(true);
@@ -10141,6 +10187,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
JavaUtils.newInstance(JavaUtils.getClass(taskName,
MetastoreTaskThread.class));
task.setConf(conf);
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
+ LOG.info("Scheduling for " + task.getClass().getCanonicalName() + "
service.");
ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq,
TimeUnit.MILLISECONDS);
}
}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
index 92ce351..2702e69 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
@@ -29,6 +29,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -65,7 +69,7 @@ public class MetaStoreTestUtils {
* @throws Exception
*/
public static void startMetaStore(final int port,
- final HadoopThriftAuthBridge bridge, Configuration conf)
+ final HadoopThriftAuthBridge bridge, Configuration conf, boolean
withHouseKeepingThreads)
throws Exception{
if (conf == null) {
conf = MetastoreConf.newMetastoreConf();
@@ -75,7 +79,16 @@ public class MetaStoreTestUtils {
@Override
public void run() {
try {
- HiveMetaStore.startMetaStore(port, bridge, finalConf);
+ Lock startLock = null;
+ Condition startCondition = null;
+ AtomicBoolean startedServing = null;
+ if (withHouseKeepingThreads) {
+ startLock = new ReentrantLock();
+ startCondition = startLock.newCondition();
+ startedServing = new AtomicBoolean();
+ }
+ HiveMetaStore.startMetaStore(port, bridge, finalConf, startLock,
startCondition,
+ startedServing);
} catch (Throwable e) {
LOG.error("Metastore Thrift Server threw an exception...", e);
}
@@ -111,7 +124,7 @@ public class MetaStoreTestUtils {
boolean keepWarehousePath)
throws Exception {
return
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
conf,
- keepJdbcUri, keepWarehousePath);
+ keepJdbcUri, keepWarehousePath, false);
}
public static int startMetaStoreWithRetry() throws Exception {
@@ -121,7 +134,13 @@ public class MetaStoreTestUtils {
public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
Configuration conf) throws
Exception {
- return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false,
false);
+ return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false,
false, false);
+ }
+
+ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
+ Configuration conf, boolean
withHouseKeepingThreads)
+ throws Exception {
+ return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false,
false, withHouseKeepingThreads);
}
/**
@@ -133,11 +152,14 @@ public class MetaStoreTestUtils {
* @param conf The configuration to use
* @param keepJdbcUri If set to true, then the JDBC url is not changed
* @param keepWarehousePath If set to true, then the Warehouse directory is
not changed
+ * @param withHouseKeepingThreads
* @return The port on which the MetaStore finally started
* @throws Exception
*/
public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
- Configuration conf, boolean keepJdbcUri, boolean keepWarehousePath)
throws Exception {
+ Configuration conf, boolean
keepJdbcUri,
+ boolean keepWarehousePath,
+ boolean withHouseKeepingThreads)
throws Exception {
Exception metaStoreException = null;
String warehouseDir = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE);
@@ -164,7 +186,8 @@ public class MetaStoreTestUtils {
if (MetastoreConf.getVar(conf,
ConfVars.THRIFT_SERVICE_DISCOVERY_MODE).trim().isEmpty()) {
MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS,
"thrift://localhost:" + metaStorePort);
}
- MetaStoreTestUtils.startMetaStore(metaStorePort, bridge, conf);
+
+ MetaStoreTestUtils.startMetaStore(metaStorePort, bridge, conf,
withHouseKeepingThreads);
// Creating warehouse dir, if not exists
Warehouse wh = new Warehouse(conf);
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java
index df474e6..2837845 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java
@@ -29,6 +29,6 @@ public class TestMarkPartitionRemote extends
TestMarkPartition {
@Before
public void startServer() throws Exception {
MetaStoreTestUtils.setConfForStandloneMode(conf);
-
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
conf);
+
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
conf, true);
}
}