This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit c17b9f8d093447274ac2a9b90f705f3b5a943b17 Author: Oliver Szabo <[email protected]> AuthorDate: Sun May 13 19:12:56 2018 +0200 AMBARI-23822. Infra Solr: Add restore support if index is on HDFS. --- .../src/main/python/migrationHelper.py | 7 ++ .../AMBARI_INFRA_SOLR/0.1.0/metainfo.xml | 3 +- .../0.1.0/package/scripts/collection.py | 121 ++++++++++++++++----- .../0.1.0/package/scripts/command_commons.py | 96 +++++++++++++++- 4 files changed, 194 insertions(+), 33 deletions(-) diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py index 8c7f6b1..4d7f1d2 100755 --- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py +++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py @@ -148,6 +148,10 @@ def fill_parameters(options): params['solr_skip_cores'] = options.skip_cores if options.solr_shards: params['solr_shards'] = options.solr_shards + if options.solr_hdfs_path: + params['solr_hdfs_path'] = options.solr_hdfs_path + if options.solr_keep_backup: + params['solr_keep_backup'] = True return params def validte_common_options(options, parser): @@ -235,6 +239,9 @@ if __name__=="__main__": parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders") parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)") parser.add_option("--shards", dest="solr_shards", type="int", default=0, help="number of shards (required to set properly for restore)") + parser.add_option("--solr-hdfs-path", dest="solr_hdfs_path", type="string", default=None, help="Base path of Solr (where collections are located) if HDFS is used (like /user/infra-solr)") + parser.add_option("--solr-keep-backup", dest="solr_keep_backup", default=False, action="store_true", help="If it is turned on, Snapshot Solr data will not be deleted from the filesystem during restore.") + (options, args) = parser.parse_args() protocol = 'https' if options.ssl else 'http' diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml index 015a7bb..de75d05 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/metainfo.xml @@ -58,7 +58,8 @@ <commandScript> <script>scripts/infra_solr.py</script> <scriptType>PYTHON</scriptType> - <timeout>1200</timeout> + <background>true</background> + <timeout>36000</timeout> </commandScript> </customCommand> <customCommand> diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py index e32bc1f..4f51071 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/collection.py @@ -105,9 +105,11 @@ def restore_collection(env): if command_commons.collection in ["ranger_audits", "history", "hadoop_logs", "audit_logs", "vertex_index", "edge_index", "fulltext_index"]: # Make sure ambari wont delete an important collection - raise Exeption(format( + raise Exception(format( "Selected collection for restore is: {collection}. It is not recommended to restore on default collections.")) + hdfs_cores_on_host=[] + for core_data in core_pairs: src_core = core_data['src_core'] target_core = core_data['target_core'] @@ -124,16 +126,26 @@ def restore_collection(env): core_root_dir = format("{solr_datadir}/backup_{target_core}") core_root_without_backup_dir = format("{solr_datadir}/{target_core}") - Directory([format("{core_root_dir}/data/index"), - format("{core_root_dir}/data/tlog"), - format("{core_root_dir}/data/snapshot_metadata")], - mode=0755, - cd_access='a', - create_parents=True, - owner=params.infra_solr_user, - group=params.user_group, - only_if=only_if_cmd - ) + if command_commons.solr_hdfs_path: + Directory([core_root_dir], + mode=0755, + cd_access='a', + create_parents=True, + owner=params.infra_solr_user, + group=params.user_group, + only_if=only_if_cmd + ) + else: + Directory([format("{core_root_dir}/data/index"), + format("{core_root_dir}/data/tlog"), + format("{core_root_dir}/data/snapshot_metadata")], + mode=0755, + cd_access='a', + create_parents=True, + owner=params.infra_solr_user, + group=params.user_group, + only_if=only_if_cmd + ) core_details = core_data[target_core] core_properties = {} @@ -144,15 +156,47 @@ def restore_collection(env): core_properties['collection'] = command_commons.collection core_properties['coreNodeName'] = core_details['node'] core_properties['shard'] = core_details['shard'] + if command_commons.solr_hdfs_path: + hdfs_solr_node_folder=command_commons.solr_hdfs_path + format("/backup_{collection}/") + core_details['node'] + source_folder=format("{index_location}/snapshot.{src_core}/") + if command_commons.check_folder_exists(source_folder): + hdfs_cores_on_host.append(target_core) + command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/index/"), + type="directory", + action="create_on_execute", + source=source_folder, + owner=params.infra_solr_user, + mode=0755, + recursive_chown=True, + recursive_chmod=True + ) + command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/tlog"), + type="directory", + action="create_on_execute", + owner=params.infra_solr_user, + mode=0755 + ) + command_commons.HdfsResource(format("{hdfs_solr_node_folder}/data/snapshot_metadata"), + type="directory", + action="create_on_execute", + owner=params.infra_solr_user, + mode=0755 + ) + if command_commons.solr_keep_backup: + Directory(format("{index_location}/snapshot.{src_core}"), + action="delete", + only_if=only_if_cmd, + owner=params.infra_solr_user) + else: + copy_cmd = format( + "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \ + else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") + Execute( + copy_cmd, only_if=only_if_cmd, + user=params.infra_solr_user, + logoutput=True + ) - copy_cmd = format( - "mv {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") if command_commons.solr_keep_backup \ - else format("cp -r {index_location}/snapshot.{src_core}/* {core_root_dir}/data/index/") - Execute( - copy_cmd, only_if=only_if_cmd, - user=params.infra_solr_user, - logoutput=True - ) PropertiesFile( core_root_dir + '/core.properties', properties=core_properties, @@ -165,7 +209,6 @@ def restore_collection(env): Execute(format("rm -rf {solr_datadir}/{collection}*"), user=params.infra_solr_user, logoutput=True) - for core_data in core_pairs: src_core = core_data['src_core'] target_core = core_data['target_core'] @@ -179,6 +222,25 @@ def restore_collection(env): core_root_dir = format("{solr_datadir}/backup_{target_core}") core_root_without_backup_dir = format("{solr_datadir}/{target_core}") + + if command_commons.solr_hdfs_path: + if target_core in hdfs_cores_on_host: + Logger.info(format("Core data '{target_core}' is located on this host, processing...")) + core_data=host_cores_map[command_commons.CORE_DATA] + core_details=core_data[target_core] + core_node=core_details['node'] + collection_core_dir=command_commons.solr_hdfs_path + format("/{collection}/{core_node}") + backup_collection_core_dir=command_commons.solr_hdfs_path + format("/backup_{collection}/{core_node}") + command_commons.HdfsResource(collection_core_dir, + type="directory", + action="delete_on_execute", + owner=params.infra_solr_user + ) + if command_commons.check_hdfs_folder_exists(backup_collection_core_dir): + command_commons.move_hdfs_folder(backup_collection_core_dir, collection_core_dir) + else: + Logger.info(format("Core data '{target_core}' is not located on this host, skipping...")) + Execute( format("mv {core_root_dir} {core_root_without_backup_dir}"), user=params.infra_solr_user, @@ -186,12 +248,13 @@ def restore_collection(env): only_if=format("test -d {core_root_dir}") ) - Directory([format("{core_root_without_backup_dir}")], - mode=0755, - cd_access='a', - create_parents=True, - owner=params.infra_solr_user, - group=params.user_group, - recursive_ownership=True, - only_if=format("test -d {core_root_without_backup_dir}") - ) + Directory( + [format("{core_root_without_backup_dir}")], + mode=0755, + cd_access='a', + create_parents=True, + owner=params.infra_solr_user, + group=params.user_group, + recursive_ownership=True, + only_if=format("test -d {core_root_without_backup_dir}") + ) diff --git a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py index f7dc92e..a8a17e7 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_INFRA_SOLR/0.1.0/package/scripts/command_commons.py @@ -24,10 +24,12 @@ import socket import time import traceback +from resource_management.core.shell import call from resource_management.core.logger import Logger from resource_management.core.resources.system import Execute from resource_management.libraries.functions.default import default from resource_management.libraries.functions.format import format +from resource_management.libraries.resources.hdfs_resource import HdfsResource index_helper_script = '/usr/lib/ambari-infra-solr-client/solrIndexHelper.sh' @@ -81,8 +83,54 @@ solr_keep_backup=default("/commandParams/solr_keep_backup", False) solr_num_shards = int(default("/commandParams/solr_shards", "0")) +solr_hdfs_path=default("/commandParams/solr_hdfs_path", None) + if solr_num_shards == 0: - raise Exeption(format("The 'solr_shards' command parameter is required to set.")) + raise Exception(format("The 'solr_shards' command parameter is required to set.")) + +if solr_hdfs_path: + + import functools + from resource_management.libraries.functions import conf_select + from resource_management.libraries.functions import stack_select + from resource_management.libraries.functions import get_klist_path + from resource_management.libraries.functions import get_kinit_path + from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources + + klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None)) + kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) + + # hadoop default parameters + hdfs_user = params.config['configurations']['hadoop-env']['hdfs_user'] + hadoop_bin = stack_select.get_hadoop_dir("sbin") + hadoop_bin_dir = stack_select.get_hadoop_dir("bin") + hadoop_conf_dir = conf_select.get_hadoop_conf_dir() + hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure") + hadoop_lib_home = stack_select.get_hadoop_dir("lib") + hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None) + hdfs_user_keytab = params.config['configurations']['hadoop-env']['hdfs_user_keytab'] + + dfs_type = default("/commandParams/dfs_type", "") + + hdfs_site = params.config['configurations']['hdfs-site'] + default_fs = params.config['configurations']['core-site']['fs.defaultFS'] + #create partial functions with common arguments for every HdfsResource call + #to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code + HdfsResource = functools.partial( + HdfsResource, + user=params.infra_solr_user, + hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", + security_enabled = params.security_enabled, + keytab = hdfs_user_keytab, + kinit_path_local = kinit_path_local, + hadoop_bin_dir = hadoop_bin_dir, + hadoop_conf_dir = hadoop_conf_dir, + principal_name = hdfs_principal_name, + hdfs_site = hdfs_site, + default_fs = default_fs, + immutable_paths = get_not_managed_resources(), + dfs_type = dfs_type + ) if params.security_enabled: keytab = params.infra_solr_kerberos_keytab @@ -255,7 +303,10 @@ def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores core_host_map[core]=domain core_data_map[core]['host']=domain core_data_map[core]['node']=replica - core_data_map[core]['type']=core_data['type'] + if 'type' in core_data: + core_data_map[core]['type']=core_data['type'] + else: + core_data_map[core]['type']='NRT' core_data_map[core]['shard']=shard Logger.info(format("Found leader/active replica: {replica} (core '{core}') in {shard} on {domain}")) else: @@ -334,4 +385,43 @@ def resolve_ip_to_hostname(ip): return host_name if host_name == fqdn_name else fqdn_name except socket.error: pass - return ip \ No newline at end of file + return ip + +def create_command(command): + """ + Create hdfs command. Append kinit to the command if required. + """ + kinit_cmd = "{0} -kt {1} {2};".format(kinit_path_local, params.infra_solr_kerberos_keytab, params.infra_solr_kerberos_principal) if params.security_enabled else "" + return kinit_cmd + command + +def execute_commad(command): + """ + Run hdfs command by infra-solr user + """ + return call(command, user=params.infra_solr_user, timeout=300) + +def move_hdfs_folder(source_dir, target_dir): + cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}")) + returncode, stdout = execute_commad(cmd) + if returncode: + raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode))) + return stdout.strip() + +def check_hdfs_folder_exists(hdfs_dir): + """ + Check that hdfs folder exists or not + """ + cmd=create_command(format("hdfs dfs -ls {hdfs_dir}")) + returncode, stdout = execute_commad(cmd) + if returncode: + return False + return True + +def check_folder_exists(dir): + """ + Check that folder exists or not + """ + returncode, stdout = call(format("test -d {dir}"), user=params.infra_solr_user, timeout=300) + if returncode: + return False + return True -- To stop receiving notification emails like this one, please contact [email protected].
