This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 5960d2a219b5a2bf464c6021a90886b8decf956d Author: Oliver Szabo <[email protected]> AuthorDate: Thu Jun 21 01:16:15 2018 +0200 AMBARI-23945. Infra Solr Migration - check collections are empty or not --- .../src/main/python/migrationHelper.py | 195 +++++++++++++-------- 1 file changed, 126 insertions(+), 69 deletions(-) diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py index 2832691..04d4cd8 100755 --- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py +++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py @@ -757,7 +757,7 @@ def copy_znode(options, config, copy_src, copy_dest, copy_from_local=False, copy sys.stdout.flush() logger.debug(str(out)) -def list_collections(options, config, output_file): +def list_collections(options, config, output_file, include_number_of_docs=False): dump_json_files_list=[] skip_dump=False if options.skip_json_dump_files: @@ -772,7 +772,10 @@ def list_collections(options, config, output_file): print "{0}FAIL{1}: Collection dump file '{2}' does not exist.".format(colors.FAIL, colors.ENDC, output_file) sys.exit(1) else: - solr_cli_command=create_infra_solr_client_command(options, config, '--dump-collections --output {0}'.format(output_file), appendZnode=True) + command_suffix = '--dump-collections --output {0}'.format(output_file) + if include_number_of_docs: + command_suffix+=' --include-doc-number' + solr_cli_command=create_infra_solr_client_command(options, config, command_suffix, appendZnode=True) logger.debug("Solr cli command: {0}".format(solr_cli_command)) sys.stdout.write('Dumping collections data to {0} ... '.format(output_file)) sys.stdout.flush() @@ -937,6 +940,24 @@ def generate_core_pairs(original_collection, collection, config, options): json.dump(core_pairs_data, outfile) return core_pairs_data +def get_number_of_docs_map(collection_dump_file): + collections_data = get_collections_data(collection_dump_file) + doc_num_map={} + for collection in collections_data: + number_of_docs=collections_data[collection]['numberOfDocs'] + doc_num_map[collection]=number_of_docs + return doc_num_map + +def is_collection_empty(docs_map, collection): + result = False + if collection in docs_map: + num_docs=docs_map[collection] + if num_docs == -1: + print "Number of documents: -1. That means the number of docs was not provided in the collection dump." + elif num_docs == 0: + result = True + return result + def update_state_json(original_collection, collection, config, options): solr_znode='/infra-solr' if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'): @@ -1118,32 +1139,45 @@ def get_atlas_index_location(collection, config, options): return atlas_index_location def backup_collections(options, accessor, parser, config, service_filter): - collections=list_collections(options, config, COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json")) + collections=list_collections(options, config, COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json"), include_number_of_docs=True) collections=filter_collections(options, collections) + num_docs_map = get_number_of_docs_map(COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json")) if is_ranger_available(config, service_filter): collection_name = config.get('ranger_collection', 'ranger_collection_name') if collection_name in collections: - ranger_index_location=get_ranger_index_location(collection_name, config, options) - do_backup_request(options, accessor, parser, config, collection_name, ranger_index_location) + if is_collection_empty(num_docs_map, collection_name): + print "Collection '{0}' is empty. Backup is not required.".format(collection_name) + else: + ranger_index_location=get_ranger_index_location(collection_name, config, options) + do_backup_request(options, accessor, parser, config, collection_name, ranger_index_location) else: print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(collection_name) if is_atlas_available(config, service_filter): fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name') if fulltext_index_collection in collections: - fulltext_index_location = get_atlas_index_location(fulltext_index_collection, config, options) - do_backup_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location) + if is_collection_empty(num_docs_map, fulltext_index_collection): + print "Collection '{0}' is empty. Backup is not required.".format(fulltext_index_collection) + else: + fulltext_index_location = get_atlas_index_location(fulltext_index_collection, config, options) + do_backup_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location) else: print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(fulltext_index_collection) vertex_index_collection = config.get('atlas_collections', 'vertex_index_name') if vertex_index_collection in collections: - vertex_index_location = get_atlas_index_location(vertex_index_collection, config, options) - do_backup_request(options, accessor, parser, config, vertex_index_collection, vertex_index_location) + if is_collection_empty(num_docs_map, vertex_index_collection): + print "Collection '{0}' is empty. Backup is not required.".format(vertex_index_collection) + else: + vertex_index_location = get_atlas_index_location(vertex_index_collection, config, options) + do_backup_request(options, accessor, parser, config, vertex_index_collection, vertex_index_location) else: print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(vertex_index_collection) edge_index_collection = config.get('atlas_collections', 'edge_index_name') if edge_index_collection in collections: - edge_index_location = get_atlas_index_location(edge_index_collection, config, options) - do_backup_request(options, accessor, parser, config, edge_index_collection, edge_index_location) + if is_collection_empty(num_docs_map, edge_index_collection): + print "Collection '{0}' is empty. Backup is not required.".format(edge_index_collection) + else: + edge_index_location = get_atlas_index_location(edge_index_collection, config, options) + do_backup_request(options, accessor, parser, config, edge_index_collection, edge_index_location) else: print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(edge_index_collection) @@ -1154,83 +1188,106 @@ def migrate_snapshots(options, accessor, parser, config, service_filter): ranger_index_location=get_ranger_index_location(collection_name, config, options) do_migrate_request(options, accessor, parser, config, collection_name, ranger_index_location) else: - print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(collection_name) + print "Collection '{0}' backup index has filtered out. Skipping migrate operation.".format(collection_name) if is_atlas_available(config, service_filter): fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name') if options.collection is None or options.collection == fulltext_index_collection: fulltext_index_location=get_atlas_index_location(fulltext_index_collection, config, options) do_migrate_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location) else: - print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(fulltext_index_collection) + print "Collection '{0}' backup index has filtered out. Skipping migrate operation.".format(fulltext_index_collection) vertex_index_collection = config.get('atlas_collections', 'vertex_index_name') if options.collection is None or options.collection == vertex_index_collection: vertex_index_location=get_atlas_index_location(vertex_index_collection, config, options) - do_migrate_request(options, accessor, parser, config, fulltext_index_collection, vertex_index_location) + do_migrate_request(options, accessor, parser, config, vertex_index_collection, vertex_index_location) else: - print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(vertex_index_collection) + print "Collection '{0}' backup index has filtered out. Skipping migrate operation.".format(vertex_index_collection) edge_index_collection = config.get('atlas_collections', 'edge_index_name') if options.collection is None or options.collection == edge_index_collection: edge_index_location=get_atlas_index_location(edge_index_collection, config, options) do_migrate_request(options, accessor, parser, config, edge_index_collection, edge_index_location) else: - print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(edge_index_collection) + print "Collection '{0}' backup index has filtered out. Skipping migrate operation.".format(edge_index_collection) def create_backup_collections(options, accessor, parser, config, service_filter): collections_json_location = COLLECTIONS_DATA_JSON_LOCATION.format("before_restore_collections.json") + num_docs_map = get_number_of_docs_map(COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json")) collections=list_collections(options, config, collections_json_location) replica_number = "1" # hard coded if is_ranger_available(config, service_filter): + original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name') backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name') - if backup_ranger_collection not in collections: - if options.collection is not None and options.collection != backup_ranger_collection: - print "Collection {0} has filtered out. Skipping create operation.".format(backup_ranger_collection) + if original_ranger_collection in collections: + if is_collection_empty(num_docs_map, original_ranger_collection): + print "Collection '{0}' was empty during backup. It won't need a backup collection.".format(original_ranger_collection) else: - solr_urls = get_solr_urls(options, config, backup_ranger_collection, collections_json_location) - backup_ranger_config_set = config.get('ranger_collection', 'backup_ranger_config_set_name') - backup_ranger_shards = config.get('ranger_collection', 'ranger_collection_shards') - backup_ranger_max_shards = config.get('ranger_collection', 'ranger_collection_max_shards_per_node') - retry(create_collection, options, config, solr_urls, backup_ranger_collection, backup_ranger_config_set, + if backup_ranger_collection not in collections: + if options.collection is not None and options.collection != backup_ranger_collection: + print "Collection {0} has filtered out. Skipping create operation.".format(backup_ranger_collection) + else: + solr_urls = get_solr_urls(options, config, backup_ranger_collection, collections_json_location) + backup_ranger_config_set = config.get('ranger_collection', 'backup_ranger_config_set_name') + backup_ranger_shards = config.get('ranger_collection', 'ranger_collection_shards') + backup_ranger_max_shards = config.get('ranger_collection', 'ranger_collection_max_shards_per_node') + retry(create_collection, options, config, solr_urls, backup_ranger_collection, backup_ranger_config_set, backup_ranger_shards, replica_number, backup_ranger_max_shards, context="[Create Solr Collections]") - else: - print "Collection {0} has already exist. Skipping create operation.".format(backup_ranger_collection) + else: + print "Collection {0} has already exist. Skipping create operation.".format(backup_ranger_collection) if is_atlas_available(config, service_filter): backup_atlas_config_set = config.get('atlas_collections', 'config_set') backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name') - if backup_fulltext_index_name not in collections: - if options.collection is not None and options.collection != backup_fulltext_index_name: - print "Collection {0} has filtered out. Skipping create operation.".format(backup_fulltext_index_name) + original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name') + if original_fulltext_index_name in collections: + if is_collection_empty(num_docs_map, original_fulltext_index_name): + print "Collection '{0}' was empty during backup. It won't need a backup collection.".format(original_fulltext_index_name) else: - solr_urls = get_solr_urls(options, config, backup_fulltext_index_name, collections_json_location) - backup_fulltext_index_shards = config.get('atlas_collections', 'fulltext_index_shards') - backup_fulltext_index_max_shards = config.get('atlas_collections', 'fulltext_index_max_shards_per_node') - retry(create_collection, options, config, solr_urls, backup_fulltext_index_name, backup_atlas_config_set, + if backup_fulltext_index_name not in collections: + if options.collection is not None and options.collection != backup_fulltext_index_name: + print "Collection {0} has filtered out. Skipping create operation.".format(backup_fulltext_index_name) + else: + solr_urls = get_solr_urls(options, config, backup_fulltext_index_name, collections_json_location) + backup_fulltext_index_shards = config.get('atlas_collections', 'fulltext_index_shards') + backup_fulltext_index_max_shards = config.get('atlas_collections', 'fulltext_index_max_shards_per_node') + retry(create_collection, options, config, solr_urls, backup_fulltext_index_name, backup_atlas_config_set, backup_fulltext_index_shards, replica_number, backup_fulltext_index_max_shards, context="[Create Solr Collections]") - else: - print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name) + else: + print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name) + backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name') - if backup_edge_index_name not in collections: - if options.collection is not None and options.collection != backup_edge_index_name: - print "Collection {0} has filtered out. Skipping create operation.".format(backup_edge_index_name) + original_edge_index_name = config.get('atlas_collections', 'edge_index_name') + if original_edge_index_name in collections: + if is_collection_empty(num_docs_map, original_edge_index_name): + print "Collection '{0}' was empty during backup. It won't need a backup collection.".format(original_edge_index_name) else: - solr_urls = get_solr_urls(options, config, backup_edge_index_name, collections_json_location) - backup_edge_index_shards = config.get('atlas_collections', 'edge_index_shards') - backup_edge_index_max_shards = config.get('atlas_collections', 'edge_index_max_shards_per_node') - retry(create_collection, options, config, solr_urls, backup_edge_index_name, backup_atlas_config_set, + if backup_edge_index_name not in collections: + if options.collection is not None and options.collection != backup_edge_index_name: + print "Collection {0} has filtered out. Skipping create operation.".format(backup_edge_index_name) + else: + solr_urls = get_solr_urls(options, config, backup_edge_index_name, collections_json_location) + backup_edge_index_shards = config.get('atlas_collections', 'edge_index_shards') + backup_edge_index_max_shards = config.get('atlas_collections', 'edge_index_max_shards_per_node') + retry(create_collection, options, config, solr_urls, backup_edge_index_name, backup_atlas_config_set, backup_edge_index_shards, replica_number, backup_edge_index_max_shards, context="[Create Solr Collections]") - else: - print "Collection {0} has already exist. Skipping create operation.".format(backup_edge_index_name) + else: + print "Collection {0} has already exist. Skipping create operation.".format(backup_edge_index_name) + backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name') - if backup_vertex_index_name not in collections: - if options.collection is not None and options.collection != backup_vertex_index_name: - print "Collection {0} has filtered out. Skipping create operation.".format(backup_vertex_index_name) + original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name') + if original_vertex_index_name in collections: + if is_collection_empty(num_docs_map, original_vertex_index_name): + print "Collection '{0}' was empty during backup. It won't need a backup collection.".format(original_vertex_index_name) else: - solr_urls = get_solr_urls(options, config, backup_vertex_index_name, collections_json_location) - backup_vertex_index_shards = config.get('atlas_collections', 'vertex_index_shards') - backup_vertex_index_max_shards = config.get('atlas_collections', 'vertex_index_max_shards_per_node') - retry(create_collection, options, config, solr_urls, backup_vertex_index_name, backup_atlas_config_set, + if backup_vertex_index_name not in collections: + if options.collection is not None and options.collection != backup_vertex_index_name: + print "Collection {0} has filtered out. Skipping create operation.".format(backup_vertex_index_name) + else: + solr_urls = get_solr_urls(options, config, backup_vertex_index_name, collections_json_location) + backup_vertex_index_shards = config.get('atlas_collections', 'vertex_index_shards') + backup_vertex_index_max_shards = config.get('atlas_collections', 'vertex_index_max_shards_per_node') + retry(create_collection, options, config, solr_urls, backup_vertex_index_name, backup_atlas_config_set, backup_vertex_index_shards, replica_number, backup_vertex_index_max_shards, context="[Create Solr Collections]") - else: - print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name) + else: + print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name) def restore_collections(options, accessor, parser, config, service_filter): collections=list_collections(options, config, COLLECTIONS_DATA_JSON_LOCATION.format("restore_collections.json")) @@ -1253,7 +1310,7 @@ def restore_collections(options, accessor, parser, config, service_filter): ranger_index_location=get_ranger_index_location(collection_name, config, options) do_restore_request(options, accessor, parser, config, collection_name, backup_ranger_collection, backup_ranger_config_set_name, ranger_index_location, backup_ranger_shards, hdfs_base_path) else: - print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(backup_ranger_collection) + print "Collection '{0}' does not exist or filtered out. Skipping restore operation.".format(backup_ranger_collection) if is_atlas_available(config, service_filter): hdfs_base_path = None @@ -1272,7 +1329,7 @@ def restore_collections(options, accessor, parser, config, service_filter): fulltext_index_location=get_atlas_index_location(fulltext_index_collection, config, options) do_restore_request(options, accessor, parser, config, fulltext_index_collection, backup_fulltext_index_name, atlas_config_set, fulltext_index_location, backup_fulltext_index_shards, hdfs_base_path) else: - print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(fulltext_index_collection) + print "Collection '{0}' does not exist or filtered out. Skipping restore operation.".format(fulltext_index_collection) edge_index_collection = config.get('atlas_collections', 'edge_index_name') backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name') @@ -1281,7 +1338,7 @@ def restore_collections(options, accessor, parser, config, service_filter): edge_index_location=get_atlas_index_location(edge_index_collection, config, options) do_restore_request(options, accessor, parser, config, edge_index_collection, backup_edge_index_name, atlas_config_set, edge_index_location, backup_edge_index_shards, hdfs_base_path) else: - print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(edge_index_collection) + print "Collection '{0}' does not exist or filtered out. Skipping restore operation.".format(edge_index_collection) vertex_index_collection = config.get('atlas_collections', 'vertex_index_name') backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name') @@ -1290,7 +1347,7 @@ def restore_collections(options, accessor, parser, config, service_filter): vertex_index_location=get_atlas_index_location(vertex_index_collection, config, options) do_restore_request(options, accessor, parser, config, vertex_index_collection, backup_vertex_index_name, atlas_config_set, vertex_index_location, backup_vertex_index_shards, hdfs_base_path) else: - print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(vertex_index_collection) + print "Collection '{0}' does not exist or filtered out. Skipping restore operation.".format(vertex_index_collection) def reload_collections(options, accessor, parser, config, service_filter): collections_json_location = config, COLLECTIONS_DATA_JSON_LOCATION.format("reload_collections.json") @@ -1302,26 +1359,26 @@ def reload_collections(options, accessor, parser, config, service_filter): solr_urls = get_solr_urls(options, config, backup_ranger_collection, collections_json_location) retry(reload_collection, options, config, solr_urls, backup_ranger_collection, context="[Reload Solr Collections]") else: - print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_ranger_collection) + print "Collection '{0}' does not exist or filtered out. Skipping reload operation.".format(backup_ranger_collection) if is_atlas_available(config, service_filter): backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name') if backup_fulltext_index_name in collections: solr_urls = get_solr_urls(options, config, backup_fulltext_index_name, collections_json_location) retry(reload_collection, options, config, solr_urls, backup_fulltext_index_name, context="[Reload Solr Collections]") else: - print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name) backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name') if backup_edge_index_name in collections: solr_urls = get_solr_urls(options, config, backup_edge_index_name, collections_json_location) retry(reload_collection, options, config, solr_urls, backup_edge_index_name, context="[Reload Solr Collections]") else: - print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_edge_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping reload operation.".format(backup_edge_index_name) backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name') if backup_vertex_index_name in collections: solr_urls = get_solr_urls(options, config, backup_vertex_index_name, collections_json_location) retry(reload_collection, options, config, solr_urls, backup_vertex_index_name, context="[Reload Solr Collections]") else: - print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name) def validate_ini_file(options, parser): if options.ini_file is None: @@ -1351,26 +1408,26 @@ def update_state_jsons(options, accessor, parser, config, service_filter): if backup_ranger_collection in collections: update_state_json(original_ranger_collection, backup_ranger_collection, config, options) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection) if is_atlas_available(config, service_filter): original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name') backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name') if backup_fulltext_index_name in collections: update_state_json(original_fulltext_index_name, backup_fulltext_index_name, config, options) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name) original_edge_index_name = config.get('atlas_collections', 'edge_index_name') backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name') if backup_edge_index_name in collections: update_state_json(original_edge_index_name, backup_edge_index_name, config, options) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name) original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name') backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name') if backup_vertex_index_name in collections: update_state_json(original_vertex_index_name, backup_vertex_index_name, config, options) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name) def disable_solr_authorization(options, accessor, parser, config): solr_znode='/infra-solr' @@ -1394,23 +1451,23 @@ def check_shards(options, accessor, parser, config): if ranger_collection in collections: check_shard_for_collection(ranger_collection) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(ranger_collection) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(ranger_collection) if is_atlas_available(config, service_filter): fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name') if fulltext_index_name in collections: check_shard_for_collection(fulltext_index_name) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(fulltext_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(fulltext_index_name) edge_index_name = config.get('atlas_collections', 'edge_index_name') if edge_index_name in collections: check_shard_for_collection(edge_index_name) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(edge_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(edge_index_name) vertex_index_name = config.get('atlas_collections', 'vertex_index_name') if vertex_index_name in collections: check_shard_for_collection(vertex_index_name) else: - print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(fulltext_index_name) + print "Collection '{0}' does not exist or filtered out. Skipping update collection state operation.".format(fulltext_index_name) if __name__=="__main__":
