Repository: ambari Updated Branches: refs/heads/trunk a10d56f0b -> fcce59e8c
AMBARI-9163. Intermittent Preparing NAMENODE fails during RU due to JOURNALNODE quorum not established (alejandro) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fcce59e8 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fcce59e8 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fcce59e8 Branch: refs/heads/trunk Commit: fcce59e8ca5410ea1ac893b21406861c3d003a80 Parents: a10d56f Author: Alejandro Fernandez <[email protected]> Authored: Thu Jan 15 14:05:33 2015 -0800 Committer: Alejandro Fernandez <[email protected]> Committed: Thu Jan 15 14:44:24 2015 -0800 ---------------------------------------------------------------------- .../common-services/HDFS/2.1.0.2.0/metainfo.xml | 9 ++ .../2.1.0.2.0/package/scripts/journalnode.py | 7 + .../package/scripts/journalnode_upgrade.py | 147 +++++++++++++++++++ .../package/scripts/namenode_upgrade.py | 5 +- .../HDFS/2.1.0.2.0/package/scripts/utils.py | 39 ++++- 5 files changed, 202 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/fcce59e8/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml index ce0ab29..ab7c95c 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml @@ -124,6 +124,15 @@ <scriptType>PYTHON</scriptType> <timeout>1200</timeout> </commandScript> + <dependencies> + <dependency> + <name>HDFS/HDFS_CLIENT</name> + <scope>host</scope> + <auto-deploy> + <enabled>true</enabled> + </auto-deploy> + </dependency> + </dependencies> </component> <component> http://git-wip-us.apache.org/repos/asf/ambari/blob/fcce59e8/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py index 15e0689..d3bb27a 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py @@ -27,6 +27,7 @@ from resource_management.libraries.functions.security_commons import build_expec from utils import service from hdfs import hdfs +import journalnode_upgrade class JournalNode(Script): @@ -64,6 +65,12 @@ class JournalNode(Script): create_log_dir=True ) + def post_rolling_restart(self, env): + Logger.info("Executing Rolling Upgrade post-restart") + import params + env.set_params(params) + journalnode_upgrade.post_upgrade_check() + def stop(self, env, rolling_restart=False): import params http://git-wip-us.apache.org/repos/asf/ambari/blob/fcce59e8/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py new file mode 100644 index 0000000..d8455e8 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py @@ -0,0 +1,147 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import time +import json + +from resource_management.core.logger import Logger +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.default import default +from resource_management.core.exceptions import Fail +from utils import get_jmx_data, get_port + + + +def post_upgrade_check(): + """ + Ensure all journal nodes are up and quorum is established + :return: + """ + import params + Logger.info("Ensuring Journalnode quorum is established") + + time.sleep(5) + hdfs_roll_edits() + time.sleep(5) + + all_journal_node_hosts = default("/clusterHostInfo/journalnode_hosts", []) + + if len(all_journal_node_hosts) < 3: + raise Fail("Need at least 3 Journalnodes to maintain a quorum") + + # TODO, test with HTTPS + policy = default("/configurations/hdfs-site/dfs.http.policy", None) + if not policy: + raise Fail("Could not retrieve dfs.http.policy") + encrypted = policy.upper == "HTTPS_ONLY" + + nn_address = default("/configurations/hdfs-site/dfs.namenode.https-address", None) if encrypted else \ + default("/configurations/hdfs-site/dfs.namenode.http-address", None) + + if not nn_address: + raise Fail("Could not retrieve dfs.namenode.http(s)-address for policy %s" % str(policy)) + + nn_data = get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo', + encrypted) + if not nn_data: + raise Fail("Could not retrieve JournalTransactionInfo from JMX") + + try: + last_txn_id = int(nn_data['LastAppliedOrWrittenTxId']) + success = ensure_jns_have_new_txn(all_journal_node_hosts, last_txn_id) + + if not success: + raise Fail("Could not ensure that all Journal nodes have a new log transaction id") + except KeyError: + raise Fail("JournalTransactionInfo does not have key LastAppliedOrWrittenTxId from JMX info") + + +def hdfs_roll_edits(): + """ + HDFS_CLIENT needs to be a dependency of JOURNALNODE + Roll the logs so that Namenode will be able to connect to the Journalnode. + """ + import params + + # TODO, this will be to be doc'ed since existing HDP 2.2 clusters will needs HDFS_CLIENT on all JOURNALNODE hosts + if params.security_enabled: + Execute(params.dn_kinit_cmd, user=params.hdfs_user) + + command = 'hdfs dfsadmin -rollEdits' + Execute(command, user=params.hdfs_user, tries=1) + + +def ensure_jns_have_new_txn(nodes, last_txn_id): + """ + :param nodes: List of Journalnodes + :param last_txn_id: Integer of last transaction id + :return: Return true on success, false otherwise + """ + import params + + num_of_jns = len(nodes) + actual_txn_ids = {} + jns_updated = 0 + protocol = 'http' + + journal_node_address = default("/configurations/hdfs-site/dfs.journalnode.https-address", None) + if journal_node_address: + protocol = "https" + else: + journal_node_address = default("/configurations/hdfs-site/dfs.journalnode.http-address", None) + + if not journal_node_address: + raise Fail("Could not retrieve Journal node address") + + jn_port = get_port(journal_node_address) # default is 8480, encrypted is 8481 + if not jn_port: + raise Fail("Could not retrieve Journalnode port") + + time_out_secs = 3 * 60 + step_time_secs = 10 + iterations = int(time_out_secs/step_time_secs) + + Logger.info("Checking if all Journalnodes are updated.") + for i in range(iterations): + Logger.info('Try %d out of %d' % (i+1, iterations)) + for node in nodes: + # if all JNS are updated break + if jns_updated == num_of_jns: + Logger.info("All journal nodes are updated") + return True + + # JN already meets condition, skip it + if node in actual_txn_ids and actual_txn_ids[node] and actual_txn_ids[node] >= last_txn_id: + continue + + url = '%s://%s:%s' % (protocol, node, jn_port) + data = get_jmx_data(url, 'Journal-', 'LastWrittenTxId') + if data: + actual_txn_ids[node] = int(data) + if actual_txn_ids[node] >= last_txn_id: + Logger.info("Journalnode %s has a higher transaction id: %s" + str(data)) + jns_updated += 1 + else: + Logger.info("Journalnode %s is still on transaction id: %s" + str(data)) + + Logger.info("Sleeping for %d secs" % step_time_secs) + time.sleep(step_time_secs) + + return jns_updated == num_of_jns \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/fcce59e8/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py index 93efae3..bc37240 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py @@ -23,7 +23,6 @@ from resource_management.core.resources.system import Execute from resource_management.libraries.functions.format import format from resource_management.core.shell import call from resource_management.core.exceptions import Fail -from resource_management.libraries.functions.decorator import retry class SAFEMODE: @@ -34,7 +33,6 @@ class SAFEMODE: safemode_to_instruction = {SAFEMODE.ON: "enter", SAFEMODE.OFF: "leave"} -@retry(times=3, sleep_time=6, err_class=Fail) def reach_safemode_state(user, safemode_state, in_ha): """ Enter or leave safemode for the Namenode. @@ -68,6 +66,7 @@ def reach_safemode_state(user, safemode_state, in_ha): def prepare_rolling_upgrade(): """ Rolling Upgrade for HDFS Namenode requires the following. + 0. Namenode must be up 1. Leave safemode if the safemode status is not OFF 2. Execute a rolling upgrade "prepare" 3. Execute a rolling upgrade "query" @@ -83,7 +82,7 @@ def prepare_rolling_upgrade(): safemode_transition_successful = reach_safemode_state(user, SAFEMODE.OFF, True) if not safemode_transition_successful: - raise Fail("Could leave safemode") + raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SAFEMODE.OFF)) prepare = "hdfs dfsadmin -rollingUpgrade prepare" query = "hdfs dfsadmin -rollingUpgrade query" http://git-wip-us.apache.org/repos/asf/ambari/blob/fcce59e8/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py index f185ea0..2634ce8 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py @@ -18,6 +18,8 @@ limitations under the License. """ import os import re +import urllib2 +import json from resource_management import * from resource_management.libraries.functions.format import format @@ -83,7 +85,7 @@ def failover_namenode(): else: Execute(check_standby_cmd, user=params.hdfs_user, - tries=30, + tries=50, try_sleep=6, logoutput=True) @@ -216,6 +218,39 @@ def service(action=None, name=None, user=None, options="", create_pid_dir=False, ) +def get_jmx_data(nn_address, modeler_type, metric, encrypted=False): + """ + :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already. + If not preceded, will use the encrypted param to determine. + :param modeler_type: Modeler type to query using startswith function + :param metric: Metric to return + :return: Return an object representation of the metric, or None if it does not exist + """ + if not nn_address or not modeler_type or not metric: + return None + + nn_address = nn_address.strip() + if not nn_address.startswith("http"): + nn_address = ("https://" if encrypted else "http://") + nn_address + if not nn_address.endswith("/"): + nn_address = nn_address + "/" + + nn_address = nn_address + "jmx" + Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address)) + + data = urllib2.urlopen(nn_address).read() + data_dict = json.loads(data) + my_data = None + if data_dict: + for el in data_dict['beans']: + if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type): + if metric in el: + my_data = el[metric] + if my_data: + my_data = json.loads(str(my_data)) + break + return my_data + def get_port(address): """ Extracts port from the address like 0.0.0.0:1019 @@ -223,7 +258,7 @@ def get_port(address): if address is None: return None m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address) - if m is not None: + if m is not None and len(m.groups()) >= 2: return int(m.group(2)) else: return None
