This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit d10f8cc061ee1206983ef1a4ac6ec657a568067e Author: Oliver Szabo <[email protected]> AuthorDate: Wed Jun 27 15:05:53 2018 +0200 AMBARI-23945. Simplify old Solr data transport --- .../src/main/python/migrationHelper.py | 111 +++++++++++++++++++-- .../src/main/python/solrDataManager.py | 55 ++++++---- .../src/main/resources/ambariSolrMigration.sh | 13 ++- 3 files changed, 146 insertions(+), 33 deletions(-) diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py index fb82ac2..56ab9ad 100755 --- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py +++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py @@ -31,7 +31,9 @@ import socket import time import traceback import ConfigParser +import solrDataManager as solr_data_manager +from datetime import datetime, timedelta from random import randrange from subprocess import Popen, PIPE @@ -133,11 +135,10 @@ def retry(func, *args, **kwargs): logger.info("\n{0}: waiting for {1} seconds before retyring again (retry count: {2})".format(context, delay, r+1)) time.sleep(delay) print '{0} operation {1}FAILED{2}'.format(context, colors.FAIL, colors.ENDC) - exit(1) + sys.exit(1) -def create_solr_api_request_command(request_url, config, output=None): - user='infra-solr' - kerberos_enabled='false' +def get_keytab_and_principal(config): + kerberos_enabled = 'false' keytab=None principal=None if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'): @@ -151,7 +152,14 @@ def create_solr_api_request_command(request_url, config, output=None): keytab=config.get('infra_solr', 'keytab') if config.has_option('infra_solr', 'principal'): principal=config.get('infra_solr', 'principal') + return keytab, principal +def create_solr_api_request_command(request_url, config, output=None): + user='infra-solr' + kerberos_enabled='false' + if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'): + kerberos_enabled=config.get('cluster', 'kerberos_enabled') + keytab, principal=get_keytab_and_principal(config) use_infra_solr_user="sudo -u {0}".format(user) curl_prefix = "curl -k" if output is not None: @@ -202,7 +210,7 @@ def create_infra_solr_client_command(options, config, command, appendZnode=False return solr_cli_cmd -def get_random_solr_url(solr_urls, options): +def get_random_solr_url(solr_urls, options = None): random_index = randrange(0, len(solr_urls)) result = solr_urls[random_index] logger.debug("Use {0} solr address for next request.".format(result)) @@ -753,7 +761,7 @@ def get_solr_urls(options, config, collection, collections_json): solr_hosts = config.get('infra_solr', 'hosts') splitted_solr_hosts = solr_hosts.split(',') - filter_solr_hosts_if_match_any(splitted_solr_hosts, collection, collections_json) + splitted_solr_hosts = filter_solr_hosts_if_match_any(splitted_solr_hosts, collection, collections_json) if options.include_solr_hosts: # keep only included ones, do not override any include_solr_hosts_list = options.include_solr_hosts.split(',') @@ -778,6 +786,26 @@ def get_solr_urls(options, config, collection, collections_json): return solr_urls +def get_input_output_solr_url(src_solr_urls, target_solr_urls): + """ + Choose random solr urls for the source and target collections, prefer localhost and common urls + """ + def intersect(a, b): + return list(set(a) & set(b)) + input_solr_urls = src_solr_urls + output_solr_urls = target_solr_urls + hostname = socket.getfqdn() + if any(hostname in s for s in input_solr_urls): + input_solr_urls = filter(lambda x: hostname in x, input_solr_urls) + if any(hostname in s for s in output_solr_urls): + output_solr_urls = filter(lambda x: hostname in x, output_solr_urls) + common_url_list = intersect(input_solr_urls, output_solr_urls) + if common_url_list: + input_solr_urls = common_url_list + output_solr_urls = common_url_list + + return get_random_solr_url(input_solr_urls), get_random_solr_url(output_solr_urls) + def is_atlas_available(config, service_filter): return 'ATLAS' in service_filter and config.has_section('atlas_collections') \ and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true' @@ -1040,6 +1068,7 @@ def evaluate_check_shard_result(collection, result, skip_index_size = False): all_shards = result['all_shards'] warnings = 0 print 30 * "-" + print "Number of shards: {0}".format(str(len(all_shards))) for shard in all_shards: if shard in active_shards: print "{0}OK{1}: Found active leader replica for {2}" \ @@ -1241,7 +1270,6 @@ def update_state_json(original_collection, collection, config, options): copy_znode(options, config, "{0}/new_state.json".format(coll_data_dir), "{0}/collections/{1}/state.json".format(solr_znode, collection), copy_from_local=True) - def delete_znodes(options, config, service_filter): solr_znode='/infra-solr' if is_logsearch_available(config, service_filter): @@ -1726,13 +1754,65 @@ def check_docs(options, accessor, parser, config): else: print "Check number of documents - Not found any collections." +def run_solr_data_manager_on_collection(options, config, collections, src_collection, target_collection, + collections_json_location, num_docs, skip_date_usage = True): + if target_collection in collections and src_collection in collections: + source_solr_urls = get_solr_urls(options, config, src_collection, collections_json_location) + target_solr_urls = get_solr_urls(options, config, target_collection, collections_json_location) + if is_collection_empty(num_docs, src_collection): + print "Collection '{0}' is empty. Skipping transport data operation.".format(target_collection) + else: + src_solr_url, target_solr_url = get_input_output_solr_url(source_solr_urls, target_solr_urls) + keytab, principal = get_keytab_and_principal(config) + date_format = "%Y-%m-%dT%H:%M:%S.%fZ" + d = datetime.now() + timedelta(days=365) + end = d.strftime(date_format) + print "Running solrDataManager.py (solr input collection: {0}, solr output collection: {1})"\ + .format(src_collection, target_collection) + solr_data_manager.verbose = options.verbose + solr_data_manager.set_log_level(True) + solr_data_manager.save("archive", src_solr_url, src_collection, "evtTime", "id", end, + options.transport_read_block_size, options.transport_write_block_size, + False, None, None, keytab, principal, False, "none", None, None, None, + None, None, None, None, None, target_collection, + target_solr_url, "_version_", skip_date_usage) + else: + print "Collection '{0}' or {1} does not exist or filtered out. Skipping transport data operation.".format(target_collection, src_collection) + +def transfer_old_data(options, accessor, parser, config): + collections_json_location = COLLECTIONS_DATA_JSON_LOCATION.format("transport_collections.json") + collections=list_collections(options, config, collections_json_location, include_number_of_docs=True) + collections=filter_collections(options, collections) + docs_map = get_number_of_docs_map(collections_json_location) if collections else {} + if is_ranger_available(config, service_filter): + original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name') + backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name') + run_solr_data_manager_on_collection(options, config, collections, backup_ranger_collection, + original_ranger_collection, collections_json_location, docs_map, skip_date_usage=False) + if is_atlas_available(config, service_filter): + original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name') + backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name') + run_solr_data_manager_on_collection(options, config, collections, backup_fulltext_index_name, + original_fulltext_index_name, collections_json_location, docs_map) + + original_edge_index_name = config.get('atlas_collections', 'edge_index_name') + backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name') + run_solr_data_manager_on_collection(options, config, collections, backup_edge_index_name, + original_edge_index_name, collections_json_location, docs_map) + + original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name') + backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name') + run_solr_data_manager_on_collection(options, config, collections, backup_vertex_index_name, + original_vertex_index_name, collections_json_location, docs_map) + + if __name__=="__main__": parser = optparse.OptionParser("usage: %prog [options]") parser.add_option("-a", "--action", dest="action", type="string", help="delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore |' \ ' rolling-restart-solr | rolling-restart-atlas | rolling-restart-ranger | check-shards | check-backup-shards | enable-solr-authorization | disable-solr-authorization |'\ ' fix-solr5-kerberos-config | fix-solr7-kerberos-config | upgrade-solr-clients | upgrade-solr-instances | upgrade-logsearch-portal | upgrade-logfeeders | stop-logsearch |'\ - ' restart-solr |restart-logsearch | restart-ranger | restart-atlas") + ' restart-solr |restart-logsearch | restart-ranger | restart-atlas | transport-old-data") parser.add_option("-i", "--ini-file", dest="ini_file", type="string", help="Config ini file to parse (required)") parser.add_option("-f", "--force", dest="force", default=False, action="store_true", help="force index upgrade even if it's the right version") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="use for verbose logging") @@ -1747,6 +1827,8 @@ if __name__=="__main__": parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request") parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request") parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command") + parser.add_option("--transport-read-block-size", dest="transport_read_block_size", type="string", help="block size to use for reading from solr during transport",default=10000) + parser.add_option("--transport-write-block-size", dest="transport_write_block_size", type="string", help="number of records in the output files during transport", default=100000) parser.add_option("--include-solr-hosts", dest="include_solr_hosts", type="string", help="comma separated list of included solr hosts") parser.add_option("--exclude-solr-hosts", dest="exclude_solr_hosts", type="string", help="comma separated list of excluded solr hosts") parser.add_option("--disable-solr-host-check", dest="disable_solr_host_check", action="store_true", default=False, help="Disable to check solr hosts are good for the collection backups") @@ -1790,6 +1872,13 @@ if __name__=="__main__": username = config.get('ambari_server', 'username') password = config.get('ambari_server', 'password') accessor = api_accessor(host, username, password, protocol, port) + + if config.has_section('infra_solr') and config.has_option('infra_solr', 'hosts'): + local_host = socket.getfqdn() + solr_hosts = config.get('infra_solr', 'hosts') + if solr_hosts and local_host not in solr_hosts.split(","): + print "{0}WARNING{1}: Host '{2}' is not found in Infra Solr hosts ({3}). Migration commands won't work from here." \ + .format(colors.WARNING, colors.ENDC, local_host, solr_hosts) if options.action.lower() == 'backup': backup_ranger_configs(options, config, service_filter) backup_collections(options, accessor, parser, config, service_filter) @@ -1881,13 +1970,17 @@ if __name__=="__main__": check_shards(options, accessor, parser, config, backup_shards=True) elif options.action.lower() == 'check-docs': check_docs(options, accessor, parser, config) + elif options.action.lower() == 'transport-old-data': + check_docs(options, accessor, parser, config) + transfer_old_data(options, accessor, parser, config) + check_docs(options, accessor, parser, config) else: parser.print_help() print 'action option is invalid (available actions: delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore |' \ ' rolling-restart-solr | rolling-restart-ranger | rolling-restart-atlas | check-shards | check-backup-shards | check-docs | enable-solr-authorization |'\ ' disable-solr-authorization | fix-solr5-kerberos-config | fix-solr7-kerberos-config | upgrade-solr-clients | upgrade-solr-instances | upgrade-logsearch-portal |' \ ' upgrade-logfeeders | stop-logsearch | restart-solr |' \ - ' restart-logsearch | restart-ranger | restart-atlas)' + ' restart-logsearch | restart-ranger | restart-atlas | transport-old-data )' sys.exit(1) command_elapsed_time = time.time() - command_start_time time_to_print = time.strftime("%H:%M:%S", time.gmtime(command_elapsed_time)) diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py index b34873d..e02c491 100755 --- a/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py +++ b/ambari-infra/ambari-infra-solr-client/src/main/python/solrDataManager.py @@ -47,7 +47,7 @@ def parse_arguments(): parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION)) parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save") - parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port") + parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port and protocol") parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection") parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on") parser.add_option("-r", "--read-block-size", dest="read_block_size", type="int", help="block size to use for reading from solr", @@ -90,6 +90,7 @@ def parse_arguments(): parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False) parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None) + parser.add_option("--solr-output-url", dest="solr_output_url", default=None, type="string", help="the url of the output solr server including the port and protocol") parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None) parser.add_option("--skip-date-usage", dest="skip_date_usage", action="store_true", default=False, help="datestamp field won't be used for queries (sort based on id field)") @@ -208,6 +209,8 @@ def parse_arguments(): print(" compression: " + options.compression) if options.__dict__["solr_output_collection"] is not None: print(" solr output collection: " + options.solr_output_collection) + if options.__dict__["solr_output_url"] is not None: + print(" solr output url: " + options.solr_output_collection) if (options.__dict__["hdfs_keytab"] is not None): print(" hdfs-keytab: " + options.hdfs_keytab) print(" hdfs-principal: " + options.hdfs_principal) @@ -237,11 +240,13 @@ def parse_arguments(): return options -def set_log_level(): +def set_log_level(disable=False): if verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) + if disable: + logger.removeHandler(handler) def get_end(options): if options.end: @@ -272,7 +277,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal, def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size, ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file, compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, - solr_output_collection, exclude_fields, skip_date_usage): + solr_output_collection, solr_output_url, exclude_fields, skip_date_usage): solr_kinit_command = None if solr_keytab: solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal) @@ -284,17 +289,18 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo if hdfs_keytab: hdfs_kinit_command = "sudo -u {0} kinit -kt {1} {2}".format(hdfs_user, hdfs_keytab, hdfs_principal) - if options.hdfs_path: + if hdfs_path: ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path) working_dir = get_working_dir(solr_url, collection) if mode == "archive": - handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading) + handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, + ignore_unfinished_uploading, skip_date_usage) save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression, - hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields, - skip_date_usage) + hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url, + exclude_fields, skip_date_usage) def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path): if hdfs_kinit_command: @@ -329,7 +335,7 @@ def get_working_dir(solr_url, collection): logger.debug("Working directory is %s", working_dir) return working_dir -def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading): +def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading, skip_date_usage): command_json_path = "{0}/command.json".format(working_dir) if os.path.isfile(command_json_path): with open(command_json_path) as command_file: @@ -345,7 +351,8 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing") if command["upload"]["type"] == "solr": - upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["solr_output_collection"]) + upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], + command["upload"]["solr_output_collection"]) elif command["upload"]["type"] == "hdfs": upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["hdfs_path"], command["upload"]["hdfs_user"]) @@ -361,14 +368,14 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre if "delete" in command.keys(): delete_data(solr_kinit_command, curl_prefix, command["delete"]["command"], command["delete"]["collection"], command["delete"]["filter_field"], command["delete"]["id_field"], command["delete"]["prev_lot_end_value"], - command["delete"]["prev_lot_end_id"]) + command["delete"]["prev_lot_end_id"], skip_date_usage) os.remove(command_json_path) def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, - exclude_fields, skip_date_usage): + solr_output_url, exclude_fields, skip_date_usage): logger.info("Starting to save data") tmp_file_path = "{0}/tmp.json".format(working_dir) @@ -410,7 +417,7 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, - skip_date_usage) + solr_output_url, skip_date_usage) total_records += records logger.info("A total of %d records are saved", total_records) @@ -493,7 +500,8 @@ def finish_file(tmp_file, json_file): def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, - key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, skip_date_usage): + key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, solr_output_url, + skip_date_usage): if name: file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_') else: @@ -503,7 +511,8 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, - key_file_path, bucket, key_prefix, local_path, solr_output_collection, skip_date_usage) + key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url, + skip_date_usage) if solr_output_collection: upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection) elif hdfs_user: @@ -518,10 +527,10 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None, - skip_date_usage) + None, skip_date_usage) if mode == "archive": delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, - prev_lot_end_id) + prev_lot_end_id, skip_date_usage) os.remove("{0}/command.json".format(working_dir)) def compress_file(working_dir, tmp_file_path, file_name, compression): @@ -567,7 +576,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression): def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, - local_path, solr_output_collection, skip_date_usage): + local_path, solr_output_collection, solr_output_url, skip_date_usage): commands = {} if upload: @@ -577,8 +586,9 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c if upload: if solr_output_collection: + command_url = solr_output_url if solr_output_url else solr_url upload_command = "{0}/{1}/update/json/docs?commit=true&wt=json --data-binary @{2}"\ - .format(solr_url, solr_output_collection, upload_file_path) + .format(command_url, solr_output_collection, upload_file_path) upload_command_data = {} upload_command_data["type"] = "solr" upload_command_data["command"] = upload_command @@ -726,11 +736,14 @@ def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_ logger.info("Save data to collection: %s", collection) def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, - prev_lot_end_id): + prev_lot_end_id, skip_date_usage): delete_cmd = delete_command.split(" --data-binary")[0] delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ") query_solr(solr_kinit_command, delete_cmd, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data) - logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value, + if skip_date_usage: + logger.info("Deleted data from collection %s where %s < %s", collection, id_field, prev_lot_end_id) + else: + logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id) def query_solr(solr_kinit_command, url, curl_command, action, data=None): @@ -800,7 +813,7 @@ if __name__ == '__main__': options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file, options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection, - options.exclude_fields, options.skip_date_usage) + options.solr_output_url, options.exclude_fields, options.skip_date_usage) else: logger.warn("Unknown mode: %s", options.mode) diff --git a/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh b/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh index 938d649..e054a89 100755 --- a/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh +++ b/ambari-infra/ambari-infra-solr-client/src/main/resources/ambariSolrMigration.sh @@ -18,7 +18,7 @@ function print_help() { cat << EOF Usage: /usr/lib/ambari-infra-solr-client/ambariSolrMigration.sh --mode <MODE> --ini-file <ini_file> [additional options] - -m, --mode <MODE> available migration modes: delete-only | backup-only | migrate-restore | all + -m, --mode <MODE> available migration modes: delete-only | backup-only | migrate-restore | all | transport -i, --ini-file <INI_FILE> ini-file location (used by migrationHelper.py) -s, --migration-script-location <file> migrateHelper.py location (default: /usr/lib/ambari-infra-solr-client/migrationHelper.py) -w, --wait-between-steps <seconds> wait between different migration steps in seconds (default: 15) @@ -89,6 +89,13 @@ function run_migrate_commands() { start_date=$(date +%s) + # execute on: transport + if [[ "$mode" == "transport" ]] ; then + log_command "$python_location $script_location --ini-file $ini_file --action transport-old-data $verbose_val" + $python_location $script_location --ini-file $ini_file --action transport-old-data $verbose_val + handle_result "$?" "Transport Old Solr Data" "$python_location" "$start_date" + fi + # execute on: backup - all if [[ "$mode" == "backup" || "$mode" == "all" ]] ; then log_command "$python_location $script_location --ini-file $ini_file --action check-shards $verbose_val $skip_warnings_val" @@ -97,7 +104,7 @@ function run_migrate_commands() { fi # execute on: backup - delete - all - if [[ "$mode" != "migrate-restore" ]] ; then + if [[ "$mode" == "delete" || "$mode" == "backup" || "$mode" == "all" ]] ; then if [[ "$skip_solr_client_upgrade" != "true" ]]; then log_command "$python_location $script_location --ini-file $ini_file --action upgrade-solr-clients $verbose_val" $python_location $script_location --ini-file $ini_file --action upgrade-solr-clients $verbose_val @@ -302,7 +309,7 @@ function main() { print_help exit 1 else - if [[ "$MODE" == "delete" || "$MODE" == "backup" || "$MODE" == "migrate-restore" || "$MODE" == "all" ]]; then + if [[ "$MODE" == "delete" || "$MODE" == "backup" || "$MODE" == "migrate-restore" || "$MODE" == "all" || "$MODE" == "transport" ]]; then run_migrate_commands "$MODE" "$SCRIPT_LOCATION" "$PYTHON_PATH_FOR_MIGRATION" "$INI_FILE" "$WAIT" "$SKIP_SOLR_CLIENT_UPGRADE" "$SKIP_SOLR_SERVER_UPGRADE" "$SKIP_LOGSEARCH_UPGRADE" "$SKIP_WARNINGS" "$BATCH_INTERVAL" "$KEEP_BACKUP" "$VERBOSE" else echo "mode '$MODE' is not supported"
