http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/balancer.log ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/balancer.log b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/balancer.log new file mode 100644 index 0000000..2010c02 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/balancer.log @@ -0,0 +1,29 @@ +Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved +Jul 28, 2014 5:01:49 PM 0 0 B 5.74 GB 9.79 GB +Jul 28, 2014 5:03:00 PM 1 0 B 5.58 GB 9.79 GB +Jul 28, 2014 5:04:07 PM 2 0 B 5.40 GB 9.79 GB +Jul 28, 2014 5:05:14 PM 3 0 B 5.06 GB 9.79 GB +Jul 28, 2014 5:05:50 PM 4 0 B 5.06 GB 9.79 GB +Jul 28, 2014 5:06:56 PM 5 0 B 4.81 GB 9.79 GB +Jul 28, 2014 5:07:33 PM 6 0 B 4.80 GB 9.79 GB +Jul 28, 2014 5:09:11 PM 7 0 B 4.29 GB 9.79 GB +Jul 28, 2014 5:09:47 PM 8 0 B 4.29 GB 9.79 GB +Jul 28, 2014 5:11:24 PM 9 0 B 3.89 GB 9.79 GB +Jul 28, 2014 5:12:00 PM 10 0 B 3.86 GB 9.79 GB +Jul 28, 2014 5:13:37 PM 11 0 B 3.23 GB 9.79 GB +Jul 28, 2014 5:15:13 PM 12 0 B 2.53 GB 9.79 GB +Jul 28, 2014 5:15:49 PM 13 0 B 2.52 GB 9.79 GB +Jul 28, 2014 5:16:25 PM 14 0 B 2.51 GB 9.79 GB +Jul 28, 2014 5:17:01 PM 15 0 B 2.39 GB 9.79 GB +Jul 28, 2014 5:17:37 PM 16 0 B 2.38 GB 9.79 GB +Jul 28, 2014 5:18:14 PM 17 0 B 2.31 GB 9.79 GB +Jul 28, 2014 5:18:50 PM 18 0 B 2.30 GB 9.79 GB +Jul 28, 2014 5:19:26 PM 19 0 B 2.21 GB 9.79 GB +Jul 28, 2014 5:20:02 PM 20 0 B 2.10 GB 9.79 GB +Jul 28, 2014 5:20:38 PM 21 0 B 2.06 GB 9.79 GB +Jul 28, 2014 5:22:14 PM 22 0 B 1.68 GB 9.79 GB +Jul 28, 2014 5:23:20 PM 23 0 B 1.00 GB 9.79 GB +Jul 28, 2014 5:23:56 PM 24 0 B 1016.16 MB 9.79 GB +Jul 28, 2014 5:25:33 PM 25 0 B 30.55 MB 9.79 GB +The cluster is balanced. Exiting... +Balancing took 24.858033333333335 minutes
http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/hdfs-command.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/hdfs-command.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/hdfs-command.py new file mode 100644 index 0000000..88529b4 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/balancer-emulator/hdfs-command.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import time +import sys +from threading import Thread + + +def write_function(path, handle, interval): + with open(path) as f: + for line in f: + handle.write(line) + handle.flush() + time.sleep(interval) + +thread = Thread(target = write_function, args = ('balancer.out', sys.stdout, 1.5)) +thread.start() + +threaderr = Thread(target = write_function, args = ('balancer.err', sys.stderr, 1.5 * 0.023)) +threaderr.start() + +thread.join() + + +def rebalancer_out(): + write_function('balancer.out', sys.stdout) + +def rebalancer_err(): + write_function('balancer.err', sys.stdout) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode.py new file mode 100644 index 0000000..130c021 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode.py @@ -0,0 +1,178 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import datanode_upgrade +from hdfs_datanode import datanode +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML +from resource_management.core.logger import Logger +from hdfs import hdfs +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst +from utils import get_hdfs_binary + +class DataNode(Script): + + def get_component_name(self): + return "hadoop-hdfs-datanode" + + def get_hdfs_binary(self): + """ + Get the name or path to the hdfs binary depending on the component name. + """ + component_name = self.get_component_name() + return get_hdfs_binary(component_name) + + + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + + def configure(self, env): + import params + env.set_params(params) + hdfs("datanode") + datanode(action="configure") + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + datanode(action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + # pre-upgrade steps shutdown the datanode, so there's no need to call + + hdfs_binary = self.get_hdfs_binary() + if upgrade_type == "rolling": + stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary) + if not stopped: + datanode(action="stop") + else: + datanode(action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + datanode(action = "status") + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class DataNodeDefault(DataNode): + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing DataNode Stack Upgrade pre-restart") + import params + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-datanode", params.version) + + def post_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing DataNode Stack Upgrade post-restart") + import params + env.set_params(params) + hdfs_binary = self.get_hdfs_binary() + # ensure the DataNode has started and rejoined the cluster + datanode_upgrade.post_upgrade_check(hdfs_binary) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.datanode.keytab.file', + 'dfs.datanode.kerberos.principal'] + props_read_check = ['dfs.datanode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML, + 'hdfs-site.xml': FILE_TYPE_XML}) + + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.datanode.keytab.file' not in security_params['hdfs-site'] or + 'dfs.datanode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.datanode.keytab.file'], + security_params['hdfs-site']['dfs.datanode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + def get_log_folder(self): + import params + return params.hdfs_log_dir + + def get_user(self): + import params + return params.hdfs_user + + def get_pid_files(self): + import status_params + return [status_params.datanode_pid_file] + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class DataNodeWindows(DataNode): + def install(self, env): + import install_params + self.install_packages(env) + +if __name__ == "__main__": + DataNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode_upgrade.py new file mode 100644 index 0000000..b55237d --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/datanode_upgrade.py @@ -0,0 +1,156 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import re + +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Execute +from resource_management.core import shell +from resource_management.libraries.functions import format +from resource_management.libraries.functions.decorator import retry +from resource_management.libraries.functions import check_process_status +from resource_management.core import ComponentIsNotRunning +from utils import get_dfsadmin_base_command + + +def pre_rolling_upgrade_shutdown(hdfs_binary): + """ + Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the + DataNode in preparation for an upgrade. This will then periodically check + "getDatanodeInfo" to ensure the DataNode has shutdown correctly. + This function will obtain the Kerberos ticket if security is enabled. + :param hdfs_binary: name/path of the HDFS binary to use + :return: Return True if ran ok (even with errors), and False if need to stop the datanode forcefully. + """ + import params + + Logger.info('DataNode executing "shutdownDatanode" command in preparation for upgrade...') + if params.security_enabled: + Execute(params.dn_kinit_cmd, user = params.hdfs_user) + + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + command = format('{dfsadmin_base_command} -shutdownDatanode {dfs_dn_ipc_address} upgrade') + + code, output = shell.call(command, user=params.hdfs_user) + if code == 0: + # verify that the datanode is down + _check_datanode_shutdown(hdfs_binary) + else: + # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it. + if output is not None and re.search("Shutdown already in progress", output): + Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command)) + return False + return True + + +def post_upgrade_check(hdfs_binary): + """ + Verifies that the DataNode has rejoined the cluster. This function will + obtain the Kerberos ticket if security is enabled. + :param hdfs_binary: name/path of the HDFS binary to use + :return: + """ + import params + + Logger.info("Checking that the DataNode has rejoined the cluster after upgrade...") + if params.security_enabled: + Execute(params.dn_kinit_cmd, user=params.hdfs_user) + + # verify that the datanode has started and rejoined the HDFS cluster + _check_datanode_startup(hdfs_binary) + + +def is_datanode_process_running(): + import params + try: + check_process_status(params.datanode_pid_file) + return True + except ComponentIsNotRunning: + return False + +@retry(times=24, sleep_time=5, err_class=Fail) +def _check_datanode_shutdown(hdfs_binary): + """ + Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo" + several times, pausing in between runs. Once the DataNode stops responding + this method will return, otherwise it will raise a Fail(...) and retry + automatically. + The stack defaults for retrying for HDFS are also way too slow for this + command; they are set to wait about 45 seconds between client retries. As + a result, a single execution of dfsadmin will take 45 seconds to retry and + the DataNode may be marked as dead, causing problems with HBase. + https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the + times for ipc.client.connect.retry.interval. In the meantime, override them + here, but only for RU. + :param hdfs_binary: name/path of the HDFS binary to use + :return: + """ + import params + + # override stock retry timeouts since after 30 seconds, the datanode is + # marked as dead and can affect HBase during RU + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}') + + try: + Execute(command, user=params.hdfs_user, tries=1) + except: + Logger.info("DataNode has successfully shutdown for upgrade.") + return + + Logger.info("DataNode has not shutdown.") + raise Fail('DataNode has not shutdown.') + + +@retry(times=30, sleep_time=30, err_class=Fail) # keep trying for 15 mins +def _check_datanode_startup(hdfs_binary): + """ + Checks that a DataNode process is running and DataNode is reported as being alive via the + "hdfs dfsadmin -fs {namenode_address} -report -live" command. Once the DataNode is found to be + alive this method will return, otherwise it will raise a Fail(...) and retry + automatically. + :param hdfs_binary: name/path of the HDFS binary to use + :return: + """ + + if not is_datanode_process_running(): + Logger.info("DataNode process is not running") + raise Fail("DataNode process is not running") + + import params + import socket + + try: + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + command = dfsadmin_base_command + ' -report -live' + return_code, hdfs_output = shell.call(command, user=params.hdfs_user) + except: + raise Fail('Unable to determine if the DataNode has started after upgrade.') + + if return_code == 0: + hostname = params.hostname.lower() + hostname_ip = socket.gethostbyname(params.hostname.lower()) + if hostname in hdfs_output.lower() or hostname_ip in hdfs_output.lower(): + Logger.info("DataNode {0} reports that it has rejoined the cluster.".format(params.hostname)) + return + else: + raise Fail("DataNode {0} was not found in the list of live DataNodes".format(params.hostname)) + + # return_code is not 0, fail + raise Fail("Unable to determine if the DataNode has started after upgrade (result code {0})".format(str(return_code))) http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs.py new file mode 100644 index 0000000..d9b62e2 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs.py @@ -0,0 +1,178 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" + +from resource_management.libraries.script.script import Script +from resource_management.core.resources.system import Directory, File, Link +from resource_management.core.resources import Package +from resource_management.core.source import Template +from resource_management.core.resources.service import ServiceConfig +from resource_management.libraries.resources.xml_config import XmlConfig +import os +from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl +from ambari_commons import OSConst + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def hdfs(name=None): + import params + + if params.create_lib_snappy_symlinks: + install_snappy() + + # On some OS this folder could be not exists, so we will create it before pushing there files + Directory(params.limits_conf_dir, + create_parents = True, + owner='root', + group='root' + ) + + File(os.path.join(params.limits_conf_dir, 'hdfs.conf'), + owner='root', + group='root', + mode=0644, + content=Template("hdfs.conf.j2") + ) + + if params.security_enabled: + tc_mode = 0644 + tc_owner = "root" + else: + tc_mode = None + tc_owner = params.hdfs_user + + if "hadoop-policy" in params.config['configurations']: + XmlConfig("hadoop-policy.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hadoop-policy'], + configuration_attributes=params.config['configuration_attributes']['hadoop-policy'], + owner=params.hdfs_user, + group=params.user_group + ) + + if "ssl-client" in params.config['configurations']: + XmlConfig("ssl-client.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['ssl-client'], + configuration_attributes=params.config['configuration_attributes']['ssl-client'], + owner=params.hdfs_user, + group=params.user_group + ) + + Directory(params.hadoop_conf_secure_dir, + create_parents = True, + owner='root', + group=params.user_group, + cd_access='a', + ) + + XmlConfig("ssl-client.xml", + conf_dir=params.hadoop_conf_secure_dir, + configurations=params.config['configurations']['ssl-client'], + configuration_attributes=params.config['configuration_attributes']['ssl-client'], + owner=params.hdfs_user, + group=params.user_group + ) + + if "ssl-server" in params.config['configurations']: + XmlConfig("ssl-server.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['ssl-server'], + configuration_attributes=params.config['configuration_attributes']['ssl-server'], + owner=params.hdfs_user, + group=params.user_group + ) + + XmlConfig("hdfs-site.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hdfs-site'], + configuration_attributes=params.config['configuration_attributes']['hdfs-site'], + owner=params.hdfs_user, + group=params.user_group + ) + + XmlConfig("core-site.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['core-site'], + configuration_attributes=params.config['configuration_attributes']['core-site'], + owner=params.hdfs_user, + group=params.user_group, + mode=0644 + ) + + File(os.path.join(params.hadoop_conf_dir, 'slaves'), + owner=tc_owner, + content=Template("slaves.j2") + ) + + if params.lzo_enabled and len(params.lzo_packages) > 0: + Package(params.lzo_packages, + retry_on_repo_unavailability=params.agent_stack_retry_on_unavailability, + retry_count=params.agent_stack_retry_count) + +def install_snappy(): + import params + Directory([params.so_target_dir_x86, params.so_target_dir_x64], + create_parents = True, + ) + Link(params.so_target_x86, + to=params.so_src_x86, + ) + Link(params.so_target_x64, + to=params.so_src_x64, + ) + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def hdfs(component=None): + import params + if component == "namenode": + directories = params.dfs_name_dir.split(",") + Directory(directories, + owner=params.hdfs_user, + mode="(OI)(CI)F", + create_parents = True + ) + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=params.hdfs_user, + mode="f", + ) + if params.service_map.has_key(component): + service_name = params.service_map[component] + ServiceConfig(service_name, + action="change_user", + username=params.hdfs_user, + password=Script.get_password(params.hdfs_user)) + + if "hadoop-policy" in params.config['configurations']: + XmlConfig("hadoop-policy.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hadoop-policy'], + owner=params.hdfs_user, + mode="f", + configuration_attributes=params.config['configuration_attributes']['hadoop-policy'] + ) + + XmlConfig("hdfs-site.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hdfs-site'], + owner=params.hdfs_user, + mode="f", + configuration_attributes=params.config['configuration_attributes']['hdfs-site'] + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_client.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_client.py new file mode 100644 index 0000000..4dabdbc --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_client.py @@ -0,0 +1,122 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML +from hdfs import hdfs +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst +from resource_management.core.exceptions import ClientComponentHasNoStatus + +class HdfsClient(Script): + + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + hdfs() + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + def status(self, env): + raise ClientComponentHasNoStatus() + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class HdfsClientDefault(HdfsClient): + + def get_component_name(self): + return "hadoop-client" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-client", params.version) + + def security_status(self, env): + import status_params + env.set_params(status_params) + + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + hdfs_expectations ={} + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML}) + + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + if status_params.hdfs_user_principal or status_params.hdfs_user_keytab: + try: + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + status_params.hdfs_user_keytab, + status_params.hdfs_user_principal, + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + self.put_structured_out({"securityIssuesFound": "hdfs principal and/or keytab file is not specified"}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + + else: + self.put_structured_out({"securityState": "UNSECURED"}) + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class HdfsClientWindows(HdfsClient): + def install(self, env): + import install_params + self.install_packages(env) + self.configure(env) + +if __name__ == "__main__": + HdfsClient().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_datanode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_datanode.py new file mode 100644 index 0000000..2d3d4f5 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_datanode.py @@ -0,0 +1,85 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +from resource_management.core.resources.system import Directory, Execute, File +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions.mounted_dirs_helper import handle_mounted_dirs +from utils import service +from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from ambari_commons import OSConst + + +def create_dirs(data_dir): + """ + :param data_dir: The directory to create + :param params: parameters + """ + import params + Directory(data_dir, + create_parents = True, + cd_access="a", + mode=0755, + owner=params.hdfs_user, + group=params.user_group, + ignore_failures=True + ) + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def datanode(action=None): + if action == "configure": + import params + Directory(params.dfs_domain_socket_dir, + create_parents = True, + mode=0751, + owner=params.hdfs_user, + group=params.user_group) + + # handle_mounted_dirs ensures that we don't create dfs data dirs which are temporary unavailable (unmounted), and intended to reside on a different mount. + data_dir_to_mount_file_content = handle_mounted_dirs(create_dirs, params.dfs_data_dirs, params.data_dir_mount_file, params) + # create a history file used by handle_mounted_dirs + File(params.data_dir_mount_file, + owner=params.hdfs_user, + group=params.user_group, + mode=0644, + content=data_dir_to_mount_file_content + ) + + elif action == "start" or action == "stop": + import params + service( + action=action, name="datanode", + user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + elif action == "status": + import status_params + check_process_status(status_params.datanode_pid_file) + + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def datanode(action=None): + if action == "configure": + pass + elif(action == "start" or action == "stop"): + import params + Service(params.datanode_win_service_name, action=action) + elif action == "status": + import status_params + check_windows_service_status(status_params.datanode_win_service_name) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_namenode.py new file mode 100644 index 0000000..23119f0 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_namenode.py @@ -0,0 +1,562 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os.path +import time + +from resource_management.core import shell +from resource_management.core.source import Template +from resource_management.core.resources.system import File, Execute, Directory +from resource_management.core.resources.service import Service +from resource_management.libraries.functions import namenode_ha_utils +from resource_management.libraries.functions.decorator import retry +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop +from resource_management.libraries.functions import Direction +from ambari_commons import OSCheck, OSConst +from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from utils import get_dfsadmin_base_command + +if OSCheck.is_windows_family(): + from resource_management.libraries.functions.windows_service_utils import check_windows_service_status + +from resource_management.core.exceptions import Fail +from resource_management.core.logger import Logger + +from utils import service, safe_zkfc_op, is_previous_fs_image +from setup_ranger_hdfs import setup_ranger_hdfs, create_ranger_audit_hdfs_directories + +import namenode_upgrade + +def wait_for_safemode_off(hdfs_binary, afterwait_sleep=0, execute_kinit=False): + """ + During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting + all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode. + If HA is present, then this command will run individually on each NameNode, which checks for its own address. + """ + import params + + retries = 115 + sleep_seconds = 10 + sleep_minutes = int(sleep_seconds * retries / 60) + + Logger.info("Waiting up to {0} minutes for the NameNode to leave Safemode...".format(sleep_minutes)) + + if params.security_enabled and execute_kinit: + kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") + Execute(kinit_command, user=params.hdfs_user, logoutput=True) + + try: + # Note, this fails if namenode_address isn't prefixed with "params." + + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary, use_specific_namenode=True) + is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'" + + # Wait up to 30 mins + Execute(is_namenode_safe_mode_off, tries=retries, try_sleep=sleep_seconds, + user=params.hdfs_user, logoutput=True) + + # Wait a bit more since YARN still depends on block reports coming in. + # Also saw intermittent errors with HBASE service check if it was done too soon. + time.sleep(afterwait_sleep) + except Fail: + Logger.error("The NameNode is still in Safemode. Please be careful with commands that need Safemode OFF.") + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, + upgrade_suspended=False, env=None): + + if action is None: + raise Fail('"action" parameter is required for function namenode().') + + if action in ["start", "stop"] and hdfs_binary is None: + raise Fail('"hdfs_binary" parameter is required for function namenode().') + + if action == "configure": + import params + #we need this directory to be present before any action(HA manual steps for + #additional namenode) + create_name_dirs(params.dfs_name_dir) + elif action == "start": + Logger.info("Called service {0} with upgrade_type: {1}".format(action, str(upgrade_type))) + setup_ranger_hdfs(upgrade_type=upgrade_type) + import params + if do_format and not params.hdfs_namenode_format_disabled: + format_namenode() + pass + + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=params.hdfs_user, + group=params.user_group + ) + + if params.dfs_ha_enabled and \ + params.dfs_ha_namenode_standby is not None and \ + params.hostname == params.dfs_ha_namenode_standby: + # if the current host is the standby NameNode in an HA deployment + # run the bootstrap command, to start the NameNode in standby mode + # this requires that the active NameNode is already up and running, + # so this execute should be re-tried upon failure, up to a timeout + success = bootstrap_standby_namenode(params) + if not success: + raise Fail("Could not bootstrap standby namenode") + + if upgrade_type == "rolling" and params.dfs_ha_enabled: + # Most likely, ZKFC is up since RU will initiate the failover command. However, if that failed, it would have tried + # to kill ZKFC manually, so we need to start it if not already running. + safe_zkfc_op(action, env) + + options = "" + if upgrade_type == "rolling": + if params.upgrade_direction == Direction.UPGRADE: + options = "-rollingUpgrade started" + elif params.upgrade_direction == Direction.DOWNGRADE: + options = "-rollingUpgrade downgrade" + elif upgrade_type == "nonrolling": + is_previous_image_dir = is_previous_fs_image() + Logger.info("Previous file system image dir present is {0}".format(str(is_previous_image_dir))) + + if params.upgrade_direction == Direction.UPGRADE: + options = "-rollingUpgrade started" + elif params.upgrade_direction == Direction.DOWNGRADE: + options = "-rollingUpgrade downgrade" + elif upgrade_type is None and upgrade_suspended is True: + # the rollingUpgrade flag must be passed in during a suspended upgrade when starting NN + if os.path.exists(namenode_upgrade.get_upgrade_in_progress_marker()): + options = "-rollingUpgrade started" + else: + Logger.info("The NameNode upgrade marker file {0} does not exist, yet an upgrade is currently suspended. " + "Assuming that the upgrade of NameNode has not occurred yet.".format(namenode_upgrade.get_upgrade_in_progress_marker())) + + Logger.info("Options for start command are: {0}".format(options)) + + service( + action="start", + name="namenode", + user=params.hdfs_user, + options=options, + create_pid_dir=True, + create_log_dir=True + ) + + if params.security_enabled: + Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"), + user = params.hdfs_user) + + # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____| + # no-HA | ON -> OFF | Yes | + # HA and active | ON -> OFF | Yes | + # HA and standby | no change | No | + # RU with HA on active | ON -> OFF | Yes | + # RU with HA on standby | ON -> OFF | Yes | + # EU with HA on active | ON -> OFF | No | + # EU with HA on standby | ON -> OFF | No | + # EU non-HA | ON -> OFF | No | + + # because we do things like create directories after starting NN, + # the vast majority of the time this should be True - it should only + # be False if this is HA and we are the Standby NN + ensure_safemode_off = True + + # True if this is the only NameNode (non-HA) or if its the Active one in HA + is_active_namenode = True + + if params.dfs_ha_enabled: + Logger.info("Waiting for the NameNode to broadcast whether it is Active or Standby...") + + if is_this_namenode_active() is False: + # we are the STANDBY NN + is_active_namenode = False + + # we are the STANDBY NN and this restart is not part of an upgrade + if upgrade_type is None: + ensure_safemode_off = False + + + # During an Express Upgrade, NameNode will not leave SafeMode until the DataNodes are started, + # so always disable the Safemode check + if upgrade_type == "nonrolling": + ensure_safemode_off = False + + # some informative logging separate from the above logic to keep things a little cleaner + if ensure_safemode_off: + Logger.info("Waiting for this NameNode to leave Safemode due to the following conditions: HA: {0}, isActive: {1}, upgradeType: {2}".format( + params.dfs_ha_enabled, is_active_namenode, upgrade_type)) + else: + Logger.info("Skipping Safemode check due to the following conditions: HA: {0}, isActive: {1}, upgradeType: {2}".format( + params.dfs_ha_enabled, is_active_namenode, upgrade_type)) + + + # wait for Safemode to end + if ensure_safemode_off: + wait_for_safemode_off(hdfs_binary) + + # Always run this on the "Active" NN unless Safemode has been ignored + # in the case where safemode was ignored (like during an express upgrade), then + # NN will be in SafeMode and cannot have directories created + if is_active_namenode and ensure_safemode_off: + create_hdfs_directories() + create_ranger_audit_hdfs_directories() + else: + Logger.info("Skipping creation of HDFS directories since this is either not the Active NameNode or we did not wait for Safemode to finish.") + + elif action == "stop": + import params + service( + action="stop", name="namenode", + user=params.hdfs_user + ) + elif action == "status": + import status_params + check_process_status(status_params.namenode_pid_file) + elif action == "decommission": + decommission() + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, + upgrade_suspended=False, env=None): + + if action is None: + raise Fail('"action" parameter is required for function namenode().') + + if action in ["start", "stop"] and hdfs_binary is None: + raise Fail('"hdfs_binary" parameter is required for function namenode().') + + if action == "configure": + pass + elif action == "start": + import params + #TODO: Replace with format_namenode() + namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED") + if not os.path.exists(namenode_format_marker): + hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd")) + Execute("%s namenode -format" % (hadoop_cmd)) + open(namenode_format_marker, 'a').close() + Service(params.namenode_win_service_name, action=action) + elif action == "stop": + import params + Service(params.namenode_win_service_name, action=action) + elif action == "status": + import status_params + check_windows_service_status(status_params.namenode_win_service_name) + elif action == "decommission": + decommission() + +def create_name_dirs(directories): + import params + + dirs = directories.split(",") + Directory(dirs, + mode=0755, + owner=params.hdfs_user, + group=params.user_group, + create_parents = True, + cd_access="a", + ) + + +def create_hdfs_directories(): + import params + + params.HdfsResource(params.hdfs_tmp_dir, + type="directory", + action="create_on_execute", + owner=params.hdfs_user, + mode=0777, + ) + params.HdfsResource(params.smoke_hdfs_user_dir, + type="directory", + action="create_on_execute", + owner=params.smoke_user, + mode=params.smoke_hdfs_user_mode, + ) + params.HdfsResource(None, + action="execute", + ) + +def format_namenode(force=None): + import params + + old_mark_dir = params.namenode_formatted_old_mark_dirs + mark_dir = params.namenode_formatted_mark_dirs + dfs_name_dir = params.dfs_name_dir + hdfs_user = params.hdfs_user + hadoop_conf_dir = params.hadoop_conf_dir + + if not params.dfs_ha_enabled: + if force: + ExecuteHadoop('namenode -format', + bin_dir=params.hadoop_bin_dir, + conf_dir=hadoop_conf_dir) + else: + if not is_namenode_formatted(params): + Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"), + user = params.hdfs_user, + path = [params.hadoop_bin_dir] + ) + for m_dir in mark_dir: + Directory(m_dir, + create_parents = True + ) + else: + if params.dfs_ha_namenode_active is not None and \ + params.hostname == params.dfs_ha_namenode_active: + # check and run the format command in the HA deployment scenario + # only format the "active" namenode in an HA deployment + if force: + ExecuteHadoop('namenode -format', + bin_dir=params.hadoop_bin_dir, + conf_dir=hadoop_conf_dir) + else: + nn_name_dirs = params.dfs_name_dir.split(',') + if not is_namenode_formatted(params): + try: + Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"), + user = params.hdfs_user, + path = [params.hadoop_bin_dir] + ) + except Fail: + # We need to clean-up mark directories, so we can re-run format next time. + for nn_name_dir in nn_name_dirs: + Execute(format("rm -rf {nn_name_dir}/*"), + user = params.hdfs_user, + ) + raise + for m_dir in mark_dir: + Directory(m_dir, + create_parents = True + ) + +def is_namenode_formatted(params): + old_mark_dirs = params.namenode_formatted_old_mark_dirs + mark_dirs = params.namenode_formatted_mark_dirs + nn_name_dirs = params.dfs_name_dir.split(',') + marked = False + # Check if name directories have been marked as formatted + for mark_dir in mark_dirs: + if os.path.isdir(mark_dir): + marked = True + Logger.info(format("{mark_dir} exists. Namenode DFS already formatted")) + + # Ensure that all mark dirs created for all name directories + if marked: + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True + ) + return marked + + # Move all old format markers to new place + for old_mark_dir in old_mark_dirs: + if os.path.isdir(old_mark_dir): + for mark_dir in mark_dirs: + Execute(('cp', '-ar', old_mark_dir, mark_dir), + sudo = True + ) + marked = True + Directory(old_mark_dir, + action = "delete" + ) + elif os.path.isfile(old_mark_dir): + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True, + ) + Directory(old_mark_dir, + action = "delete" + ) + marked = True + + if marked: + return True + + # Check if name dirs are not empty + for name_dir in nn_name_dirs: + code, out = shell.call(("ls", name_dir)) + dir_exists_and_valid = bool(not code) + + if not dir_exists_and_valid: # situations if disk exists but is crashed at the moment (ls: reading directory ...: Input/output error) + Logger.info(format("NameNode will not be formatted because the directory {name_dir} is missing or cannot be checked for content. {out}")) + return True + + try: + Execute(format("ls {name_dir} | wc -l | grep -q ^0$"), + ) + except Fail: + Logger.info(format("NameNode will not be formatted since {name_dir} exists and contains content")) + return True + + return False + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def decommission(): + import params + + hdfs_user = params.hdfs_user + conf_dir = params.hadoop_conf_dir + user_group = params.user_group + nn_kinit_cmd = params.nn_kinit_cmd + + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=hdfs_user, + group=user_group + ) + + if not params.update_exclude_file_only: + Execute(nn_kinit_cmd, + user=hdfs_user + ) + + if params.dfs_ha_enabled: + # due to a bug in hdfs, refreshNodes will not run on both namenodes so we + # need to execute each command scoped to a particular namenode + nn_refresh_cmd = format('dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes') + else: + nn_refresh_cmd = format('dfsadmin -fs {namenode_address} -refreshNodes') + ExecuteHadoop(nn_refresh_cmd, + user=hdfs_user, + conf_dir=conf_dir, + bin_dir=params.hadoop_bin_dir) + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def decommission(): + import params + hdfs_user = params.hdfs_user + conf_dir = params.hadoop_conf_dir + + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=hdfs_user + ) + + if params.dfs_ha_enabled: + # due to a bug in hdfs, refreshNodes will not run on both namenodes so we + # need to execute each command scoped to a particular namenode + nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes') + else: + nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs {namenode_address} -refreshNodes') + Execute(nn_refresh_cmd, user=hdfs_user) + + +def bootstrap_standby_namenode(params, use_path=False): + mark_dirs = params.namenode_bootstrapped_mark_dirs + bin_path = os.path.join(params.hadoop_bin_dir, '') if use_path else "" + try: + iterations = 50 + bootstrapped = False + bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive") + # Blue print based deployments start both NN in parallel and occasionally + # the first attempt to bootstrap may fail. Depending on how it fails the + # second attempt may not succeed (e.g. it may find the folder and decide that + # bootstrap succeeded). The solution is to call with -force option but only + # during initial start + if params.command_phase == "INITIAL_START": + # force bootstrap in INITIAL_START phase + bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive -force") + elif is_namenode_bootstrapped(params): + # Once out of INITIAL_START phase bootstrap only if we couldnt bootstrap during cluster deployment + return True + Logger.info("Boostrapping standby namenode: %s" % (bootstrap_cmd)) + for i in range(iterations): + Logger.info('Try %d out of %d' % (i+1, iterations)) + code, out = shell.call(bootstrap_cmd, logoutput=False, user=params.hdfs_user) + if code == 0: + Logger.info("Standby namenode bootstrapped successfully") + bootstrapped = True + break + elif code == 5: + Logger.info("Standby namenode already bootstrapped") + bootstrapped = True + break + else: + Logger.warning('Bootstrap standby namenode failed with %d error code. Will retry' % (code)) + except Exception as ex: + Logger.error('Bootstrap standby namenode threw an exception. Reason %s' %(str(ex))) + if bootstrapped: + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True + ) + return bootstrapped + +def is_namenode_bootstrapped(params): + mark_dirs = params.namenode_bootstrapped_mark_dirs + nn_name_dirs = params.dfs_name_dir.split(',') + marked = False + # Check if name directories have been marked as formatted + for mark_dir in mark_dirs: + if os.path.isdir(mark_dir): + marked = True + Logger.info(format("{mark_dir} exists. Standby Namenode already bootstrapped")) + break + + # Ensure that all mark dirs created for all name directories + if marked: + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True + ) + + return marked + + +@retry(times=125, sleep_time=5, backoff_factor=2, err_class=Fail) +def is_this_namenode_active(): + """ + Gets whether the current NameNode is Active. This function will wait until the NameNode is + listed as being either Active or Standby before returning a value. This is to ensure that + that if the other NameNode is Active, we ensure that this NameNode has fully loaded and + registered in the event that the other NameNode is going to be restarted. This prevents + a situation where we detect the other NameNode as Active before this NameNode has fully booted. + If the other Active NameNode is then restarted, there can be a loss of service if this + NameNode has not entered Standby. + """ + import params + + # returns ([('nn1', 'c6401.ambari.apache.org:50070')], [('nn2', 'c6402.ambari.apache.org:50070')], []) + # 0 1 2 + # or + # returns ([], [('nn1', 'c6401.ambari.apache.org:50070')], [('nn2', 'c6402.ambari.apache.org:50070')], []) + # 0 1 2 + # + namenode_states = namenode_ha_utils.get_namenode_states(params.hdfs_site, params.security_enabled, + params.hdfs_user, times=5, sleep_time=5, backoff_factor=2) + + # unwraps [('nn1', 'c6401.ambari.apache.org:50070')] + active_namenodes = [] if len(namenode_states[0]) < 1 else namenode_states[0] + + # unwraps [('nn2', 'c6402.ambari.apache.org:50070')] + standby_namenodes = [] if len(namenode_states[1]) < 1 else namenode_states[1] + + # check to see if this is the active NameNode + for entry in active_namenodes: + if params.namenode_id in entry: + return True + + # if this is not the active NameNode, then we must wait for it to register as standby + for entry in standby_namenodes: + if params.namenode_id in entry: + return False + + # this this point, this NameNode is neither active nor standby - we must wait to ensure it + # enters at least one of these roles before returning a verdict - the annotation will catch + # this failure and retry the fuction automatically + raise Fail(format("The NameNode {namenode_id} is not listed as Active or Standby, waiting...")) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_nfsgateway.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_nfsgateway.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_nfsgateway.py new file mode 100644 index 0000000..672312a --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_nfsgateway.py @@ -0,0 +1,75 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.core.exceptions import Fail +from resource_management.core.logger import Logger +from resource_management.core.resources import Directory +from resource_management.core import shell +from utils import service +import subprocess,os + +# NFS GATEWAY is always started by root using jsvc due to rpcbind bugs +# on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542 + +def prepare_rpcbind(): + Logger.info("check if native nfs server is running") + p, output = shell.call("pgrep nfsd") + if p == 0 : + Logger.info("native nfs server is running. shutting it down...") + # shutdown nfs + shell.call("service nfs stop") + shell.call("service nfs-kernel-server stop") + Logger.info("check if the native nfs server is down...") + p, output = shell.call("pgrep nfsd") + if p == 0 : + raise Fail("Failed to shutdown native nfs service") + + Logger.info("check if rpcbind or portmap is running") + p, output = shell.call("pgrep rpcbind") + q, output = shell.call("pgrep portmap") + + if p!=0 and q!=0 : + Logger.info("no portmap or rpcbind running. starting one...") + p, output = shell.call(("service", "rpcbind", "start"), sudo=True) + q, output = shell.call(("service", "portmap", "start"), sudo=True) + if p!=0 and q!=0 : + raise Fail("Failed to start rpcbind or portmap") + + Logger.info("now we are ready to start nfs gateway") + + +def nfsgateway(action=None, format=False): + import params + + if action== "start": + prepare_rpcbind() + + if action == "configure": + Directory(params.nfs_file_dump_dir, + owner = params.hdfs_user, + group = params.user_group, + ) + elif action == "start" or action == "stop": + service( + action=action, + name="nfs3", + user=params.root_user, + create_pid_dir=True, + create_log_dir=True + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_rebalance.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_rebalance.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_rebalance.py new file mode 100644 index 0000000..1dc545e --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_rebalance.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import re + +class HdfsParser(): + def __init__(self): + self.initialLine = None + self.state = None + + def parseLine(self, line): + hdfsLine = HdfsLine() + type, matcher = hdfsLine.recognizeType(line) + if(type == HdfsLine.LineType.HeaderStart): + self.state = 'PROCESS_STARTED' + elif (type == HdfsLine.LineType.Progress): + self.state = 'PROGRESS' + hdfsLine.parseProgressLog(line, matcher) + if(self.initialLine == None): self.initialLine = hdfsLine + + return hdfsLine + elif (type == HdfsLine.LineType.ProgressEnd): + self.state = 'PROCESS_FINISED' + return None + +class HdfsLine(): + + class LineType: + HeaderStart, Progress, ProgressEnd, Unknown = range(4) + + + MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB'] + MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))' + + HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved') + PROGRESS_PATTERN = re.compile( + "(?P<date>.*?)\s+" + + "(?P<iteration>\d+)\s+" + + MEMORY_PATTERN % (1,1,1) + "\s+" + + MEMORY_PATTERN % (2,2,2) + "\s+" + + MEMORY_PATTERN % (3,3,3) + ) + PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)') + + def __init__(self): + self.date = None + self.iteration = None + self.bytesAlreadyMoved = None + self.bytesLeftToMove = None + self.bytesBeingMoved = None + self.bytesAlreadyMovedStr = None + self.bytesLeftToMoveStr = None + self.bytesBeingMovedStr = None + + def recognizeType(self, line): + for (type, pattern) in ( + (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN), + (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN), + (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN) + ): + m = re.match(pattern, line) + if m: + return type, m + return HdfsLine.LineType.Unknown, None + + def parseProgressLog(self, line, m): + ''' + Parse the line of 'hdfs rebalancer' output. The example output being parsed: + + Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved + Jul 28, 2014 5:01:49 PM 0 0 B 5.74 GB 9.79 GB + Jul 28, 2014 5:03:00 PM 1 0 B 5.58 GB 9.79 GB + + Throws AmbariException in case of parsing errors + + ''' + m = re.match(self.PROGRESS_PATTERN, line) + if m: + self.date = m.group('date') + self.iteration = int(m.group('iteration')) + + self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), m.group('mult_1')) + self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), m.group('mult_2')) + self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), m.group('mult_3')) + + self.bytesAlreadyMovedStr = m.group('memmult_1') + self.bytesLeftToMoveStr = m.group('memmult_2') + self.bytesBeingMovedStr = m.group('memmult_3') + else: + raise AmbariException("Failed to parse line [%s]") + + def parseMemory(self, memorySize, multiplier_type): + try: + factor = self.MEMORY_SUFFIX.index(multiplier_type) + except ValueError: + raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type)) + + return float(memorySize) * (1024 ** factor) + def toJson(self): + return { + 'timeStamp' : self.date, + 'iteration' : self.iteration, + + 'dataMoved': self.bytesAlreadyMovedStr, + 'dataLeft' : self.bytesLeftToMoveStr, + 'dataBeingMoved': self.bytesBeingMovedStr, + + 'bytesMoved': self.bytesAlreadyMoved, + 'bytesLeft' : self.bytesLeftToMove, + 'bytesBeingMoved': self.bytesBeingMoved, + } + def __str__(self): + return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_snamenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_snamenode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_snamenode.py new file mode 100644 index 0000000..8d4c40c --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/hdfs_snamenode.py @@ -0,0 +1,66 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from utils import service +from resource_management.core.resources.system import Directory, File +from resource_management.core.source import Template +from resource_management.libraries.functions.check_process_status import check_process_status +from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from ambari_commons import OSConst + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def snamenode(action=None, format=False): + if action == "configure": + import params + for fs_checkpoint_dir in params.fs_checkpoint_dirs: + Directory(fs_checkpoint_dir, + create_parents = True, + cd_access="a", + mode=0755, + owner=params.hdfs_user, + group=params.user_group) + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=params.hdfs_user, + group=params.user_group) + elif action == "start" or action == "stop": + import params + service( + action=action, + name="secondarynamenode", + user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + elif action == "status": + import status_params + check_process_status(status_params.snamenode_pid_file) + + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def snamenode(action=None, format=False): + if action == "configure": + pass + elif action == "start" or action == "stop": + import params + Service(params.snamenode_win_service_name, action=action) + elif action == "status": + import status_params + check_windows_service_status(status_params.snamenode_win_service_name) + http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/install_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/install_params.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/install_params.py new file mode 100644 index 0000000..fe488c3 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/install_params.py @@ -0,0 +1,39 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from ambari_commons import OSCheck + +# These parameters are supposed to be referenced at installation time, before the Hadoop environment variables have been set +if OSCheck.is_windows_family(): + exclude_packages = [] +else: + from resource_management.libraries.functions.default import default + from resource_management.libraries.functions.get_lzo_packages import get_lzo_packages + from resource_management.libraries.script.script import Script + + _config = Script.get_config() + stack_version_unformatted = str(_config['hostLevelParams']['stack_version']) + + # The logic for LZO also exists in OOZIE's params.py + io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None) + lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower() + lzo_packages = get_lzo_packages(stack_version_unformatted) + + exclude_packages = [] + if not lzo_enabled: + exclude_packages += lzo_packages http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/journalnode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/journalnode.py new file mode 100644 index 0000000..46df454 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/scripts/journalnode.py @@ -0,0 +1,203 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML +from resource_management.core.logger import Logger +from resource_management.core.resources.system import Directory +from utils import service +from hdfs import hdfs +import journalnode_upgrade +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst + +class JournalNode(Script): + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class JournalNodeDefault(JournalNode): + + def get_component_name(self): + return "hadoop-hdfs-journalnode" + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-journalnode", params.version) + + def start(self, env, upgrade_type=None): + import params + + env.set_params(params) + self.configure(env) + service( + action="start", name="journalnode", user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + + def post_upgrade_restart(self, env, upgrade_type=None): + if upgrade_type == "nonrolling": + return + + Logger.info("Executing Stack Upgrade post-restart") + import params + env.set_params(params) + journalnode_upgrade.post_upgrade_check() + + def stop(self, env, upgrade_type=None): + import params + + env.set_params(params) + service( + action="stop", name="journalnode", user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + + def configure(self, env): + import params + + Directory(params.jn_edits_dir, + create_parents = True, + cd_access="a", + owner=params.hdfs_user, + group=params.user_group + ) + env.set_params(params) + hdfs() + pass + + def status(self, env): + import status_params + + env.set_params(status_params) + check_process_status(status_params.journalnode_pid_file) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + + props_value_check = None + props_empty_check = ['dfs.journalnode.keytab.file', + 'dfs.journalnode.kerberos.principal'] + props_read_check = ['dfs.journalnode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(hdfs_site_expectations) + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML}) + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.journalnode.kerberos.keytab.file' not in security_params['hdfs-site'] or + 'dfs.journalnode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.journalnode.kerberos.keytab.file'], + security_params['hdfs-site']['dfs.journalnode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + def get_log_folder(self): + import params + return params.hdfs_log_dir + + def get_user(self): + import params + return params.hdfs_user + + def get_pid_files(self): + import status_params + return [status_params.journalnode_pid_file] + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class JournalNodeWindows(JournalNode): + def install(self, env): + import install_params + self.install_packages(env) + + def start(self, env): + import params + self.configure(env) + Service(params.journalnode_win_service_name, action="start") + + def stop(self, env): + import params + Service(params.journalnode_win_service_name, action="stop") + + def configure(self, env): + import params + env.set_params(params) + hdfs("journalnode") + pass + + def status(self, env): + import status_params + env.set_params(status_params) + check_windows_service_status(status_params.journalnode_win_service_name) + +if __name__ == "__main__": + JournalNode().execute()
