http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py b/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py new file mode 100644 index 0000000..98d4373 --- /dev/null +++ b/utils/usergrid-util-python/es_tools/index_iterator_size_checker.py @@ -0,0 +1,270 @@ +import json +import re +import traceback +from multiprocessing.pool import Pool +import requests + +index_url_template = 'http://localhost:9200/{index_name}/_search?size={size}&from={from_var}&q=-edgeName:zzzcollzzz|logs' + +index_names = [ + 'es-index-name' +] + +baas_url = 'http://localhost:8080/org/{app_id}/{collection}/{entity_id}' + +field_part_map = { + 'mockdata': 'mockData' +} + + +def update_entity_field(entity, field_name, field_value): + entity_copy = entity.copy() + + worked = True + is_array = False + array_length = 0 + + try: + parts = field_name.split('.') + + if parts[len(parts) - 1] != 'size': + print parts + exit() + + change_me = entity_copy + + for i, field_part in enumerate(parts): + field_part = field_part_map.get(field_part, field_part) + + if field_part == 'size': + break + + if isinstance(change_me, dict): + if field_part not in change_me: + worked = False + # print 'ERROR! field [%s] not in entity: %s' % (field_part, json.dumps(change_me)) + break + + change_me = change_me[field_part] + + elif isinstance(change_me, list): + array_length = len(change_me) + + if i == len(parts) - 2 and len(parts) > i + 1 and parts[i + 1] == 'size': + + for j in xrange(0, len(change_me)): + print 'arrau!' + change_me[j] = update_entity_field(change_me[j], '.'.join(parts[i:]), field_value) + # element['size'] = field_value + + elif len(change_me) == 1: + print 'single array' + change_me = change_me[0][field_part] + else: + print 'WTF!' + try: + change_me['size'] = field_value + except: + if array_length != 1: + print traceback.format_exc() + print 'damn' + + except: + print '---Error updating field [%s] in document: %s' % (field_name, json.dumps(entity)) + print traceback.format_exc() + + if array_length > 1: + print '++++++++ARRAY!!!!! %s' % array_length + + return entity_copy + + +def update_entity_fields(entity, fields): + entity_copy = entity.copy() + + for field in fields: + field_name = field.get('name') + + if 'string' in field: + field_value = field.get('string') + + elif 'long' in field: + field_value = field.get('long') + + else: + print 'WTF! %s' % json.dumps(field) + return entity_copy + + entity_copy = update_entity_field(entity_copy, field_name, field_value) + + return entity_copy + + +my = { + 'foo': { + 'bar': { + 'color': 'red' + } + } +} + +fields = [ + { + 'name': 'foo.size', + 'string': '2' + }, + { + 'name': 'foo.bar.size', + 'long': 2 + } +] + + +def work(item): + try: + url = 'http://localhost:8080/org/{app_id}/{collection}/{entity_id}'.format( + app_id=item[0], + collection=item[1], + entity_id=item[2] + ) + print url + r_get = requests.get(url) + + if r_get.status_code != 200: + print 'ERROR GETTING ENTITY AT URL: %s' % url + return + + response_json = r_get.json() + + entities = response_json.get('entities') + + if len(entities) <= 0: + print 'TOO MANY ENTITIES AT URL: %s' % url + return + + entity = entities[0] + + new_entity = update_entity_fields(entity, item[3]) + + with open('/Users/ApigeeCorporation/tmp/hack/%s.json' % item[2], 'w') as f: + json.dump(entity, f, indent=2) + + with open('/Users/ApigeeCorporation/tmp/hack/%s_new.json' % item[2], 'w') as f: + json.dump(new_entity, f, indent=2) + + r_put = requests.put(url, data=json.dumps(new_entity)) + + if r_put.status_code == 200: + print 'PUT [%s]: %s' % (r_put.status_code, url) + pass + elif r_put.status_code: + print 'PUT [%s]: %s | %s' % (r_put.status_code, url, r_put.text) + + except: + print traceback.format_exc() + + +POOL_SIZE = 4 + +counter = 0 +size = POOL_SIZE * 10 +size = 1000 + +total_docs = 167501577 +start_from = 0 +from_var = 0 +page = 0 + +work_items = [] + +pool = Pool(POOL_SIZE) + +keep_going = True + +while keep_going: + work_items = [] + + if from_var > total_docs: + keep_going = False + break + + from_var = start_from + (page * size) + page += 1 + + for index_name in index_names: + + index_url = index_url_template.format(index_name=index_name, size=size, from_var=from_var) + + print 'Getting URL: ' + index_url + + r = requests.get(index_url) + + if r.status_code != 200: + print r.text + exit() + + response = r.json() + + hits = response.get('hits', {}).get('hits') + + re_app_id = re.compile('appId\((.+),') + re_ent_id = re.compile('entityId\((.+),') + re_type = re.compile('entityId\(.+,(.+)\)') + + print 'Index: %s | hits: %s' % (index_name, len(hits)) + + if len(hits) == 0: + keep_going = False + break + + for hit_data in hits: + source = hit_data.get('_source') + + application_id = source.get('applicationId') + + app_id_find = re_app_id.findall(application_id) + + if len(app_id_find) > 0: + app_id = app_id_find[0] + + if app_id != '5f20f423-f2a8-11e4-a478-12a5923b55dc': + print 'SKIPP APP ID: ' + app_id + continue + + entity_id_tmp = source.get('entityId') + + entity_id_find = re_ent_id.findall(entity_id_tmp) + entity_type_find = re_type.findall(entity_id_tmp) + + if len(entity_id_find) > 0 and len(entity_type_find) > 0: + entity_id = entity_id_find[0] + collection = entity_type_find[0] + fields_to_update = [] + + for field in source.get('fields'): + if field.get('name')[-5:] == '.size': + fields_to_update.append(field) + + print json.dumps(source) + + work_items.append((app_id, collection, entity_id, fields_to_update)) + + counter += 1 + + print 'Work Items: %s' % len(work_items) + + try: + pool.map(work, work_items) + + + except: + print traceback.format_exc() + + try: + pool.map(work, work_items) + except: + pass + + print 'Work Done!' + +print 'done: %s' % counter
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_prefix_checker.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/index_prefix_checker.py b/utils/usergrid-util-python/es_tools/index_prefix_checker.py new file mode 100644 index 0000000..d72ff3d --- /dev/null +++ b/utils/usergrid-util-python/es_tools/index_prefix_checker.py @@ -0,0 +1,81 @@ +import json +from collections import defaultdict +import requests +import logging + +__author__ = 'Jeff West @ ApigeeCorporation' + +# This script iterates all the indexes in an ES cluster and aggregates the size by the prefix + +url_base = 'http://localhost:9200' + +r = requests.get(url_base + "/_stats") +response = r.json() + +indices = r.json()['indices'] + +print 'retrieved %s indices' % len(indices) + +NUMBER_VALUE = 0 + +includes = [ + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9', +] + +excludes = [ + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8', +] + +counter = 0 +process = False + +counts = defaultdict(int) +sizes = defaultdict(int) +indexes = {} + +for index, index_data in indices.iteritems(): + process = False + counter += 1 + + if 'management' in index: + print index + + # print 'index %s of %s' % (counter, len(indices)) + + if len(includes) == 0: + process = True + else: + for include in includes: + + if include in index: + process = True + + if len(excludes) > 0: + for exclude in excludes: + if exclude in index: + process = False + + if process: + # print index + if '__' in index: + index_prefix = index.split('__')[0] + elif '^' in index: + index_prefix = index.split('^')[0] + else: + index_prefix = index.split('_')[0] + + if index_prefix not in indexes: + indexes[index_prefix] = [] + + indexes[index_prefix].append(index) + + counts[index_prefix] += 1 + counts['total'] += 1 + sizes[index_prefix] += (float(index_data.get('total', {}).get('store', {}).get('size_in_bytes')) / 1e+9) + sizes['total'] += (float(index_data.get('total', {}).get('store', {}).get('size_in_bytes')) / 1e+9) + +print 'Number of indices (US-EAST):' +print json.dumps(counts, indent=2) +print 'Size in GB' +print json.dumps(sizes, indent=2) +print json.dumps(indexes, indent=2) http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_replica_setter.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/index_replica_setter.py b/utils/usergrid-util-python/es_tools/index_replica_setter.py new file mode 100644 index 0000000..7180fed --- /dev/null +++ b/utils/usergrid-util-python/es_tools/index_replica_setter.py @@ -0,0 +1,118 @@ +from multiprocessing import Pool +import requests +import time + +__author__ = 'Jeff West @ ApigeeCorporation' + +# utility for updating the replicas of a set of indexes that are no longer needed. Given: +# A) a set of strings to include when evaluating the index names to update +# B) a set of strings to Exclude when evaluating the index names to update +# +# The general logic is: +# 1) If the include set is empty, or if the index name contains a string in the 'include' set, then update +# 2) If the index contains a string in the exclude list, do not update + + +url_base = 'http://localhost:9200' + +# r = requests.get(url_base + "/_cat/indices?v") +# print r.text + +r = requests.get(url_base + "/_stats") + +# print json.dumps(r.json(), indent=2) + +indices = r.json()['indices'] + +print 'retrieved %s indices' % len(indices) + +NUMBER_VALUE = 1 + +payload = { + "index.number_of_replicas": NUMBER_VALUE, +} + +# indices = ['usergrid__a34ad389-b626-11e4-848f-06b49118d7d0__application_manual'] + +includes = [ + # '70be096e-c2e1-11e4-8a55-12b4f5e28868', + # 'b0c640af-bc6c-11e4-b078-12b4f5e28868', + # 'e62e465e-bccc-11e4-b078-12b4f5e28868', + # 'd82b6413-bccc-11e4-b078-12b4f5e28868', + # '45914256-c27f-11e4-8a55-12b4f5e28868', + # '2776a776-c27f-11e4-8a55-12b4f5e28868', + # 'a54f878c-bc6c-11e4-b044-0e4cd56e19cd', + # 'ed5b47ea-bccc-11e4-b078-12b4f5e28868', + # 'bd4874ab-bccc-11e4-b044-0e4cd56e19cd', + # '3d748996-c27f-11e4-8a55-12b4f5e28868', + # '1daab807-c27f-11e4-8a55-12b4f5e28868', + # 'd0c4f0da-d961-11e4-849d-12b4f5e28868', + # '93e756ac-bc4e-11e4-92ae-12b4f5e28868', +] + +excludes = [ + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8', + # 'b6768a08-b5d5-11e3-a495-10ddb1de66c3', + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9', + # 'a34ad389-b626-11e4-848f-06b49118d7d0' +] + +counter = 0 +update = False +# print 'sleeping 1200s' +# time.sleep(1200) + +index_names = sorted([index for index in indices]) + + +def update_shards(index_name): + update = False + # counter += 1 + # + # print 'index %s of %s' % (counter, len(indices)) + + if len(includes) == 0: + update = True + else: + for include in includes: + + if include in index_name: + update = True + + if len(excludes) > 0: + for exclude in excludes: + if exclude in index_name: + update = False + + if update: + print index_name + + # url = '%s/%s/_settings' % (url_base, index) + # print url + # + # response = requests.get('%s/%s/_settings' % (url_base, index)) + # settings = response.json() + # + # index_settings = settings[index]['settings']['index'] + # + # current_replicas = int(index_settings.get('number_of_replicas')) + # + # if current_replicas == NUMBER_VALUE: + # continue + + success = False + + while not success: + + response = requests.put('%s/%s/_settings' % (url_base, index_name), data=payload) + + if response.status_code == 200: + success = True + print '200: %s: %s' % (index_name, response.text) + else: + print '%s: %s: %s' % (response.status_code, index_name, response.text) + + +pool = Pool(8) + +pool.map(update_shards, index_names) http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/index_shard_allocator.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/index_shard_allocator.py b/utils/usergrid-util-python/es_tools/index_shard_allocator.py new file mode 100644 index 0000000..ecee095 --- /dev/null +++ b/utils/usergrid-util-python/es_tools/index_shard_allocator.py @@ -0,0 +1,148 @@ +import json +from multiprocessing import Pool + +import requests + +__author__ = 'Jeff West @ ApigeeCorporation' + +# The purpose of this script is to update the shard allocation of ElasticSearch for specific indexes to be set to +# specific nodes. The reason for doing this is to isolate the nodes on which certain indexes run for specific +# customers due to load, disk size or any other factors. + + +nodes_c32xl = [ + 'res000eu', + 'res001eu', + 'res002eu', + 'res003eu', + 'res004eu', + 'res005eu', + 'res009eu', + 'res010eu', + 'res011eu', + 'res012eu', + 'res013eu', + 'res014eu', +] + +nodes_c34xl = [ + 'res015eu', + 'res018eu', + 'res019eu', + 'res020eu', + 'res021eu', + 'res022eu', + 'res023eu', + 'res024eu', + 'res025eu', + 'res026eu', + 'res027eu', + 'res028eu' +] + +nodes = nodes_c34xl + +url_base = 'http://localhost:9200' + +nodes_string = ",".join(nodes) + +payload = { + "index.routing.allocation.include._host": "", + "index.routing.allocation.exclude._host": nodes_string +} + +# payload = { +# "index.routing.allocation.include._host": "", +# "index.routing.allocation.exclude._host": "" +# } + +print json.dumps(payload ) + + +r = requests.get(url_base + "/_stats") +indices = r.json()['indices'] + +print 'retrieved %s indices' % len(indices) + +includes = [ + # '70be096e-c2e1-11e4-8a55-12b4f5e28868', + # 'b0c640af-bc6c-11e4-b078-12b4f5e28868', + # 'e62e465e-bccc-11e4-b078-12b4f5e28868', + # 'd82b6413-bccc-11e4-b078-12b4f5e28868', + # '45914256-c27f-11e4-8a55-12b4f5e28868', + # '2776a776-c27f-11e4-8a55-12b4f5e28868', + # 'a54f878c-bc6c-11e4-b044-0e4cd56e19cd', + # 'ed5b47ea-bccc-11e4-b078-12b4f5e28868', + # 'bd4874ab-bccc-11e4-b044-0e4cd56e19cd', + # '3d748996-c27f-11e4-8a55-12b4f5e28868', + # '1daab807-c27f-11e4-8a55-12b4f5e28868', + # 'd0c4f0da-d961-11e4-849d-12b4f5e28868', + # '93e756ac-bc4e-11e4-92ae-12b4f5e28868', + # + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8', + # 'b6768a08-b5d5-11e3-a495-10ddb1de66c3', + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9', +] + +excludes = [ + # + # '70be096e-c2e1-11e4-8a55-12b4f5e28868', + # 'b0c640af-bc6c-11e4-b078-12b4f5e28868', + # 'e62e465e-bccc-11e4-b078-12b4f5e28868', + # 'd82b6413-bccc-11e4-b078-12b4f5e28868', + # '45914256-c27f-11e4-8a55-12b4f5e28868', + # '2776a776-c27f-11e4-8a55-12b4f5e28868', + # 'a54f878c-bc6c-11e4-b044-0e4cd56e19cd', + # 'ed5b47ea-bccc-11e4-b078-12b4f5e28868', + # 'bd4874ab-bccc-11e4-b044-0e4cd56e19cd', + # '3d748996-c27f-11e4-8a55-12b4f5e28868', + # '1daab807-c27f-11e4-8a55-12b4f5e28868', + # 'd0c4f0da-d961-11e4-849d-12b4f5e28868', + # '93e756ac-bc4e-11e4-92ae-12b4f5e28868', + # + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c8', + # 'b6768a08-b5d5-11e3-a495-10ddb1de66c3', + # 'b6768a08-b5d5-11e3-a495-11ddb1de66c9', +] + +counter = 0 +update = False + +for index_name in indices: + update = False + counter += 1 + + # print 'Checking index %s of %s: %s' % (counter, len(indices), index_name) + + if len(includes) == 0: + update = True + else: + for include in includes: + + if include in index_name: + update = True + + if len(excludes) > 0: + for exclude in excludes: + if exclude in index_name: + update = False + + if not update: + print 'Skipping %s of %s: %s' % (counter, len(indices), index_name) + else: + print '+++++Processing %s of %s: %s' % (counter, len(indices), index_name) + + url_template = '%s/%s/_settings' % (url_base, index_name) + print url_template + + success = False + + while not success: + + response = requests.put('%s/%s/_settings' % (url_base, index_name), data=json.dumps(payload)) + + if response.status_code == 200: + success = True + print '200: %s: %s' % (index_name, response.text) + else: + print '%s: %s: %s' % (response.status_code, index_name, response.text) http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/mapping_deleter.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/mapping_deleter.py b/utils/usergrid-util-python/es_tools/mapping_deleter.py new file mode 100644 index 0000000..74ad898 --- /dev/null +++ b/utils/usergrid-util-python/es_tools/mapping_deleter.py @@ -0,0 +1,34 @@ +import json + +import requests + + +__author__ = 'Jeff West @ ApigeeCorporation' + +url_base = 'http://localhost:9200' + +SOURCE_INDEX = '5f20f423-f2a8-11e4-a478-12a5923b55dc__application_v6' + +url_template = '%s/{index_name}/_mapping' % url_base + +source_index_url = '%s/%s' % (url_base, SOURCE_INDEX) + +index_name = SOURCE_INDEX + +index_data = requests.get(url_template.format(index_name=index_name)).json() + +mappings = index_data.get(index_name, {}).get('mappings', {}) + +for type_name, mapping_detail in mappings.iteritems(): + print 'Index: %s | Type: %s: | Properties: %s' % (index_name, type_name, len(mappings[type_name]['properties'])) + + if type_name == '_default_': + continue + + r = requests.delete('%s/%s/_mapping/%s' % (url_base, index_name, type_name)) + + print '%s: %s' % (r.status_code, r.text) + + # print json.dumps(r.json(), indent=2) + # time.sleep(5) + print '---' http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/mapping_retriever.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/mapping_retriever.py b/utils/usergrid-util-python/es_tools/mapping_retriever.py new file mode 100644 index 0000000..0da123b --- /dev/null +++ b/utils/usergrid-util-python/es_tools/mapping_retriever.py @@ -0,0 +1,45 @@ +import json +import requests + +__author__ = 'Jeff West @ ApigeeCorporation' + +# Utility to iterate the mappings for an index and save them locally + +url_base = 'http://localhost:9200' + +# r = requests.get(url_base + "/_stats") +# +# indices = r.json()['indices'] + +url_template = '%s/{index_name}/_mapping' % url_base + +SOURCE_INDEX = '5f20f423-f2a8-11e4-a478-12a5923b55dc__application_v7' + +source_index_url = '%s/%s' % (url_base, SOURCE_INDEX) + +index_name = SOURCE_INDEX +print 'Getting ' + url_template.format(index_name=index_name) + +r = requests.get(url_template.format(index_name=index_name)) +index_data = r.json() + +mappings = index_data.get(index_name, {}).get('mappings', {}) + +for type_name, mapping_detail in mappings.iteritems(): + print 'Index: %s | Type: %s: | Properties: %s' % (index_name, type_name, len(mappings[type_name]['properties'])) + + print 'Processing %s' % type_name + + filename = '/Users/ApigeeCorporation/tmp/%s_%s_source_mapping.json' % ( + SOURCE_INDEX, type_name) + + print filename + + with open(filename, 'w') as f: + json.dump({type_name: mapping_detail}, f, indent=2) + + # print '%s' % (r.status_code, r.text) + + # print json.dumps(r.json(), indent=2) + # time.sleep(5) + print 'Done!' http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/es_tools/monitor_tasks.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/es_tools/monitor_tasks.py b/utils/usergrid-util-python/es_tools/monitor_tasks.py new file mode 100644 index 0000000..df23d49 --- /dev/null +++ b/utils/usergrid-util-python/es_tools/monitor_tasks.py @@ -0,0 +1,41 @@ +import datetime +import requests +import time + +__author__ = 'Jeff West @ ApigeeCorporation' + +# Utility for monitoring pending tasks in ElasticSearch + +def total_milliseconds(td): + return (td.microseconds + td.seconds * 1000000) / 1000 + + +url_template = "http://localhost:9200/_cat/pending_tasks?v'" + +x = 0 + +SLEEP_TIME = 3 + +while True: + x += 13 + try: + + r = requests.get(url=url_template) + lines = r.text.split('\n') + + print '\n-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-' + print '+++++++++++++++++++++++++++++++++++++++++++++++++++++++++' + print datetime.datetime.utcnow() + if len(lines) > 1: + print r.text + else: + print 'None' + + print '-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-' + print '-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-\n' + + except: + pass + + time.sleep(SLEEP_TIME) + http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/README.md ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/index_test/README.md b/utils/usergrid-util-python/index_test/README.md new file mode 100644 index 0000000..eed7f1c --- /dev/null +++ b/utils/usergrid-util-python/index_test/README.md @@ -0,0 +1 @@ +This set of scripts was intended to test indexing times and sizes for the new indexing scheme \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/document_creator.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/index_test/document_creator.py b/utils/usergrid-util-python/index_test/document_creator.py new file mode 100644 index 0000000..fd544c6 --- /dev/null +++ b/utils/usergrid-util-python/index_test/document_creator.py @@ -0,0 +1,254 @@ +from __future__ import print_function +from Queue import Empty +import json +from multiprocessing import JoinableQueue, Process +import random +import re +import uuid +import sys + +import argparse + +import loremipsum + + +def parse_args(): + parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1') + + parser.add_argument('-w', '--workers', + help='The number of worker threads', + type=int, + default=8) + + parser.add_argument('-dc', '--document_count', + help='The number of documents per index', + type=long, + default=100000000) + + parser.add_argument('--output', + help='The filename to write to', + type=str, + default='generated_documents.txt') + + parser.add_argument('--fields_min', + help='The min number of fields per document', + type=long, + default=10) + + parser.add_argument('--fields_max', + help='The max number of fields per document', + type=long, + default=100) + + parser.add_argument('-tp', '--type_prefix', + help='The Prefix to use for type names', + type=str, + default='type_this') + + my_args = parser.parse_args(sys.argv[1:]) + + return vars(my_args) + + +args = parse_args() + +sentence_list = loremipsum.get_sentences(10000) + + +class Worker(Process): + def __init__(self, work_queue, response_queue): + super(Worker, self).__init__() + self.work_queue = work_queue + self.response_queue = response_queue + self.sentence_list = loremipsum.get_sentences(1000) + self.re_first_word = re.compile('([A-z]+)') + + def run(self): + print('Starting %s ' % self.name) + + while True: + task = self.work_queue.get(timeout=600) + field_count = random.randint(task['fields_min'], task['fields_max']) + document = self.generate_document(field_count) + flattened_doc = self.process_document(document, + task['uuid'], + task['uuid']) + + self.response_queue.put(flattened_doc) + + self.work_queue.task_done() + + def generate_document(self, fields): + + doc = {} + + my_bool = True + + for i in xrange(fields): + sentence_index = random.randint(0, max((fields / 2) - 1, 1)) + sentence = self.sentence_list[sentence_index] + + if random.random() >= .5: + key = self.re_first_word.findall(sentence)[1] + else: + key = self.re_first_word.findall(sentence)[1] + str(i) + + field_type = random.random() + + if field_type <= 0.3: + doc[key] = sentence + + elif field_type <= 0.5: + doc[key] = random.randint(1, 1000000) + + elif field_type <= 0.6: + doc[key] = random.random() * 1000000000 + + elif field_type == 0.7: + doc[key] = my_bool + my_bool = not my_bool + + elif field_type == 0.8: + doc[key] = self.generate_document(max(fields / 5, 1)) + + elif field_type <= 1.0: + doc['mylocation'] = self.generate_location() + + return doc + + @staticmethod + def get_fields(document, base_name=None): + fields = [] + + for name, value in document.iteritems(): + if base_name: + field_name = '%s.%s' % (base_name, name) + else: + field_name = name + + if isinstance(value, dict): + fields += Worker.get_fields(value, field_name) + else: + value_name = None + if isinstance(value, basestring): + value_name = 'string' + + elif isinstance(value, bool): + value_name = 'boolean' + + elif isinstance(value, (int, long)): + value_name = 'long' + + elif isinstance(value, float): + value_name = 'double' + + if value_name: + field = { + 'name': field_name, + value_name: value + } + else: + field = { + 'name': field_name, + 'string': str(value) + } + + fields.append(field) + + return fields + + + @staticmethod + def process_document(document, application_id, uuid): + response = { + 'entityId': uuid, + 'entityVersion': '1', + 'applicationId': application_id, + 'fields': Worker.get_fields(document) + } + + return response + + def generate_location(self): + response = {} + + lat = random.random() * 90.0 + lon = random.random() * 180.0 + + lat_neg_true = True if lon > .5 else False + lon_neg_true = True if lat > .5 else False + + lat = lat * -1.0 if lat_neg_true else lat + lon = lon * -1.0 if lon_neg_true else lon + + response['location'] = { + 'lat': lat, + 'lon': lon + } + + return response + + +class Writer(Process): + def __init__(self, document_queue): + super(Writer, self).__init__() + self.document_queue = document_queue + + def run(self): + keep_going = True + + with open(args['output'], 'w') as f: + while keep_going: + try: + document = self.document_queue.get(timeout=300) + print(json.dumps(document), file=f) + + except Empty: + print('done!') + keep_going = False + + +def total_milliseconds(td): + return (td.microseconds + td.seconds * 1000000) / 1000 + + +def main(): + work_queue = JoinableQueue() + response_queue = JoinableQueue() + + workers = [Worker(work_queue, response_queue) for x in xrange(args.get('workers'))] + + writer = Writer(response_queue) + writer.start() + + [worker.start() for worker in workers] + + try: + total_messages = args.get('document_count') + batch_size = 100000 + message_counter = 0 + + for doc_number in xrange(total_messages): + message_counter += 1 + + for count in xrange(batch_size): + doc_id = str(uuid.uuid1()) + + task = { + 'fields_min': args['fields_min'], + 'fields_max': args['fields_max'], + 'uuid': doc_id + } + + work_queue.put(task) + + print('Joining queues counter=[%s]...' % message_counter) + work_queue.join() + response_queue.join() + print('Done queue counter=[%s]...' % message_counter) + + except KeyboardInterrupt: + [worker.terminate() for worker in workers] + + +main() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/index_test_mixed_batch.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/index_test/index_test_mixed_batch.py b/utils/usergrid-util-python/index_test/index_test_mixed_batch.py new file mode 100644 index 0000000..d1dd40c --- /dev/null +++ b/utils/usergrid-util-python/index_test/index_test_mixed_batch.py @@ -0,0 +1,545 @@ +import json +from multiprocessing import JoinableQueue, Process +import random +import re +import traceback +import uuid +import time +import sys + +import argparse +import loremipsum +import requests +from elasticsearch import Elasticsearch + + +es_hosts = [ + {'host': 'ees000wo', 'port': 9200}, + {'host': 'ees001wo', 'port': 9200}, + {'host': 'ees002wo', 'port': 9200}, + {'host': 'ees003wo', 'port': 9200}, + {'host': 'ees004wo', 'port': 9200}, + {'host': 'ees005wo', 'port': 9200}, + {'host': 'ees006wo', 'port': 9200}, + {'host': 'ees007wo', 'port': 9200}, + {'host': 'ees008wo', 'port': 9200}, + {'host': 'ees009wo', 'port': 9200}, + {'host': 'ees010wo', 'port': 9200}, + {'host': 'ees011wo', 'port': 9200}, + {'host': 'ees012wo', 'port': 9200}, + {'host': 'ees013wo', 'port': 9200}, + {'host': 'ees014wo', 'port': 9200}, + {'host': 'ees015wo', 'port': 9200}, + {'host': 'ees016wo', 'port': 9200}, + {'host': 'ees017wo', 'port': 9200} +] + +def parse_args(): + parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1') + + parser.add_argument('-t', '--type_count', + help='The number of types to produce', + type=int, + default=100) + + parser.add_argument('-ic', '--index_count', + help='The number of indices to create', + type=int, + default=10) + + parser.add_argument('-sc', '--shard_count', + help='The number of indices to create', + type=int, + default=18) + + parser.add_argument('-rc', '--replica_count', + help='The number of indices to create', + type=int, + default=1) + + parser.add_argument('-w', '--workers', + help='The number of worker threads', + type=int, + default=8) + + parser.add_argument('-dc', '--document_count', + help='The number of documents per index', + type=long, + default=100000000) + + parser.add_argument('-bs', '--batch_size', + help='The size of batches to send to ES', + type=long, + default=25) + + parser.add_argument('-ip', '--index_prefix', + help='The Prefix to use for index names', + type=str, + default='apigee_ftw') + + parser.add_argument('-tp', '--type_prefix', + help='The Prefix to use for type names', + type=str, + default='type_this') + + parser.add_argument('-s', '--setup', + help='The Prefix to use for type names', + action='store_true') + + my_args = parser.parse_args(sys.argv[1:]) + + return vars(my_args) + + +args = parse_args() + + +class APIClient(): + def __init__(self, base_url): + self.base_url = base_url + + def put(self, path='/', data=None): + if not data: + data = {} + + url = '%s%s' % (self.base_url, path) + r = requests.put(url, json.dumps(data)) + + if r.status_code == 200: + print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed)) + return r.json() + + raise Exception('HTTP %s calling PUT on URL=[%s]: %s' % (r.status_code, url, r.text)) + + def index_batch(self, batch): + + data = '' + + for element in batch: + index_tuple = element[0] + doc = element[1] + data += '{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : "%s" } }\n' % ( + index_tuple[0], index_tuple[1], doc['entityId']) + data += json.dumps(doc) + data += '\n' + + url = '%s/_bulk' % self.base_url + + # print data + + r = requests.post(url, data) + + # print json.dumps(r.json(), indent=2) + + if r.status_code == 200: + print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed)) + return r.json() + + raise Exception('HTTP %s calling POST URL=[%s]: %s' % (r.status_code, url, r.text)) + + def delete(self, index): + url = '%s%s' % (self.base_url, index) + r = requests.delete(url) + + if r.status_code == 200: + print 'DELETE (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed)) + return r.json() + + raise Exception('HTTP %s calling DELETE URL=[%s]: %s' % (r.status_code, url, r.text)) + + def create_index(self, name=None, shards=18 * 3, replicas=1): + data = { + "settings": { + "index": { + "action": { + "write_consistency": "one" + }, + "number_of_shards": shards, + "number_of_replicas": replicas + } + } + } + + try: + print 'Creating index %s' % name + response = self.put('/%s/' % name.lower(), data) + + print response + + except Exception, e: + print traceback.format_exc() + + def delete_index(self, name): + try: + response = self.delete('/%s/' % name.lower()) + + print response + + except Exception, e: + print traceback.format_exc() + + def define_type_mapping(self, index_name, type_name): + try: + url = '/%s/_mapping/%s' % (index_name, type_name) + print url + + response = self.put(url, get_type_mapping(type_name)) + + print response + + except Exception, e: + print traceback.format_exc() + + +class Worker(Process): + def __init__(self, work_queue): + super(Worker, self).__init__() + self.api_client = APIClient('http://%s:9200' % es_hosts[random.randint(0, len(es_hosts) - 1)].get('host')) + self.work_queue = work_queue + self.es = Elasticsearch(es_hosts) + self.sentence_list = loremipsum.get_sentences(1000) + self.re_first_word = re.compile('([A-z]+)') + + def run(self): + print 'Starting %s ' % self.name + counter = 0 + + batch = [] + + while True: + index_batch_size = args.get('batch_size') + task = self.work_queue.get(timeout=600) + counter += 1 + + document = self.generate_document(task['field_count']) + flattened_doc = self.process_document(document, + task['type'], + task['uuid'], + task['uuid']) + + index_type_tuple = (task['index'], task['type']) + + # self.handle_document(task['index'], task['type'], task['uuid'], flattened_doc) + + batch.append((index_type_tuple, flattened_doc)) + + if len(batch) >= index_batch_size: + self.handle_batch(batch) + batch = [] + + self.work_queue.task_done() + + def generate_document(self, fields): + + doc = {} + + my_bool = True + + for i in xrange(fields): + sentence_index = random.randint(0, max((fields / 2) - 1, 1)) + sentence = self.sentence_list[sentence_index] + + if random.random() >= .5: + key = self.re_first_word.findall(sentence)[1] + else: + key = self.re_first_word.findall(sentence)[1] + str(i) + + field_type = random.random() + + if field_type <= 0.3: + doc[key] = sentence + + elif field_type <= 0.5: + doc[key] = random.randint(1, 1000000) + + elif field_type <= 0.6: + doc[key] = random.random() * 1000000000 + + elif field_type == 0.7: + doc[key] = my_bool + my_bool = not my_bool + + elif field_type == 0.8: + doc[key] = self.generate_document(max(fields / 5, 1)) + + elif field_type <= 1.0: + doc['mylocation'] = self.generate_location() + + return doc + + @staticmethod + def get_fields(document, base_name=None): + fields = [] + + for name, value in document.iteritems(): + if base_name: + field_name = '%s.%s' % (base_name, name) + else: + field_name = name + + if isinstance(value, dict): + fields += Worker.get_fields(value, field_name) + else: + value_name = None + if isinstance(value, basestring): + value_name = 'string' + + elif isinstance(value, bool): + value_name = 'boolean' + + elif isinstance(value, (int, long)): + value_name = 'long' + + elif isinstance(value, float): + value_name = 'double' + + if value_name: + field = { + 'name': field_name, + value_name: value + } + else: + field = { + 'name': field_name, + 'string': str(value) + } + + fields.append(field) + + return fields + + + @staticmethod + def process_document(document, doc_type, application_id, uuid): + response = { + 'entityId': uuid, + 'entityVersion': '1', + 'entityType': doc_type, + 'applicationId': application_id, + 'fields': Worker.get_fields(document) + } + + return response + + def handle_document(self, index, doc_type, uuid, document): + + res = self.es.create(index=index, + doc_type=doc_type, + id=uuid, + body=document) + + print res + + def generate_location(self): + response = {} + + lat = random.random() * 90.0 + lon = random.random() * 180.0 + + lat_neg_true = True if lon > .5 else False + lon_neg_true = True if lat > .5 else False + + lat = lat * -1.0 if lat_neg_true else lat + lon = lon * -1.0 if lon_neg_true else lon + + response['location'] = { + 'lat': lat, + 'lon': lon + } + + return response + + def handle_batch(self, batch): + print 'HANDLE BATCH size=%s' % len(batch) + # self.api_client.define_type_mapping(index, doc_type) + self.api_client.index_batch(batch) + + +def total_milliseconds(td): + return (td.microseconds + td.seconds * 1000000) / 1000 + + +def get_type_mapping(type_name): + return { + type_name: { + "_routing": { + "path": "entityId", + "required": True + }, + "properties": { + "entityId": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "entityVersion": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "entityType": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "applicationId": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "nodeId": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "edgeName": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "entityNodeType": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "edgeTimestamp": { + "type": "long", + "doc_values": True + }, + "edgeSearch": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "fields": { + "type": "nested", + "properties": { + "name": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "boolean": { + "type": "boolean", + "doc_values": True + }, + "long": { + "type": "long", + "doc_values": True + }, + "double": { + "type": "double", + "doc_values": True + }, + "location": { + "type": "geo_point", + "lat_lon": True, + "geohash": True, + "doc_values": True + }, + "string": { + "type": "string", + "norms": { + "enabled": False + }, + "fields": { + "exact": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + } + } + }, + "uuid": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + } + } + } + }, + "_all": { + "enabled": False + } + + } + } + + +def main(): + INDEX_COUNT = args.get('index_count') + TYPE_COUNT = args.get('type_count') + SETUP = args.get('setup') + + indices = [] + types = [] + work_queue = JoinableQueue() + + apiclient = APIClient('http://%s:9200' % es_hosts[random.randint(0, len(es_hosts) - 1)].get('host')) + + workers = [Worker(work_queue) for x in xrange(args.get('workers'))] + [worker.start() for worker in workers] + + try: + # + for x in xrange(TYPE_COUNT): + type_name = '%s_%s' % (args.get('type_prefix'), x) + types.append(type_name) + + for x in xrange(INDEX_COUNT): + index_name = '%s_%s' % (args.get('index_prefix'), x) + indices.append(index_name) + + if SETUP: + print 'Running setup...' + + for index_name in indices: + apiclient.delete_index(index_name) + + time.sleep(1) + + for index_name in indices: + apiclient.create_index( + index_name, + shards=args['shard_count'], + replicas=args['replica_count']) + + # time.sleep(5) + + # for index_name in indices: + # for type_name in types: + # apiclient.define_type_mapping(index_name, type_name) + + # time.sleep(5) + + total_messages = args.get('document_count') + batch_size = 100000 + message_counter = 0 + fields = random.randint(50, 100) + + while message_counter < total_messages: + + for count in xrange(batch_size): + + for index_name in indices: + doc_id = str(uuid.uuid1()) + + task = { + 'field_count': fields, + 'uuid': doc_id, + 'index': index_name, + 'type': types[random.randint(0, len(types) - 1)] + } + + work_queue.put(task) + + print 'Joining queue counter=[%s]...' % message_counter + work_queue.join() + print 'Done queue counter=[%s]...' % message_counter + message_counter += batch_size + + except KeyboardInterrupt: + [worker.terminate() for worker in workers] + + +main() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/index_test/index_test_single_type_batch.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/index_test/index_test_single_type_batch.py b/utils/usergrid-util-python/index_test/index_test_single_type_batch.py new file mode 100644 index 0000000..e3afdc3 --- /dev/null +++ b/utils/usergrid-util-python/index_test/index_test_single_type_batch.py @@ -0,0 +1,547 @@ +import json +from multiprocessing import JoinableQueue, Process +import random +import re +import traceback +import uuid +import time +import sys + +import argparse +import loremipsum +import requests +from elasticsearch import Elasticsearch + +es_hosts = [ + {'host': 'ees000wo', 'port': 9200}, + {'host': 'ees001wo', 'port': 9200}, + {'host': 'ees002wo', 'port': 9200}, + {'host': 'ees003wo', 'port': 9200}, + {'host': 'ees004wo', 'port': 9200}, + {'host': 'ees005wo', 'port': 9200}, + {'host': 'ees006wo', 'port': 9200}, + {'host': 'ees007wo', 'port': 9200}, + {'host': 'ees008wo', 'port': 9200}, + {'host': 'ees009wo', 'port': 9200}, + {'host': 'ees010wo', 'port': 9200}, + {'host': 'ees011wo', 'port': 9200}, + {'host': 'ees012wo', 'port': 9200}, + {'host': 'ees013wo', 'port': 9200}, + {'host': 'ees014wo', 'port': 9200}, + {'host': 'ees015wo', 'port': 9200}, + {'host': 'ees016wo', 'port': 9200}, + {'host': 'ees017wo', 'port': 9200} +] + + +def parse_args(): + parser = argparse.ArgumentParser(description='ElasticSearch Index Test 1') + + parser.add_argument('-t', '--type_count', + help='The number of types to produce', + type=int, + default=50) + + parser.add_argument('-ic', '--index_count', + help='The number of indices to create', + type=int, + default=50) + + parser.add_argument('-sc', '--shard_count', + help='The number of indices to create', + type=int, + default=50) + + parser.add_argument('-rc', '--replica_count', + help='The number of indices to create', + type=int, + default=1) + + parser.add_argument('-w', '--workers', + help='The number of worker threads', + type=int, + default=8) + + parser.add_argument('-dc', '--document_count', + help='The number of documents per index', + type=long, + default=100000000) + + parser.add_argument('-bs', '--batch_size', + help='The size of batches to send to ES', + type=long, + default=25) + + parser.add_argument('-ip', '--index_prefix', + help='The Prefix to use for index names', + type=str, + default='apigee_ftw') + + parser.add_argument('-tp', '--type_prefix', + help='The Prefix to use for type names', + type=str, + default='type_this') + + parser.add_argument('-s', '--setup', + help='The Prefix to use for type names', + action='store_true') + + my_args = parser.parse_args(sys.argv[1:]) + + return vars(my_args) + + +args = parse_args() + + +class APIClient(): + def __init__(self, base_url): + self.base_url = base_url + + def put(self, path='/', data=None): + if not data: + data = {} + + url = '%s%s' % (self.base_url, path) + r = requests.put(url, json.dumps(data)) + + if r.status_code == 200: + print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed)) + return r.json() + + raise Exception('HTTP %s calling PUT on URL=[%s]: %s' % (r.status_code, url, r.text)) + + def index_docs(self, index, documents, type): + + data = '' + + for doc in documents: + data += '{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : "%s" } }\n' % (index, type, doc['entityId']) + data += json.dumps(doc) + data += '\n' + + url = '%s/_bulk' % self.base_url + + # print data + + r = requests.post(url, data) + + # print json.dumps(r.json(), indent=2) + + if r.status_code == 200: + print 'PUT (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed)) + return r.json() + + raise Exception('HTTP %s calling POST URL=[%s]: %s' % (r.status_code, url, r.text)) + + def delete(self, index): + url = '%s%s' % (self.base_url, index) + r = requests.delete(url) + + if r.status_code == 200: + print 'DELETE (%s) in %sms' % (r.status_code, total_milliseconds(r.elapsed)) + return r.json() + + raise Exception('HTTP %s calling DELETE URL=[%s]: %s' % (r.status_code, url, r.text)) + + def create_index(self, name=None, shards=18 * 3, replicas=1): + data = { + "settings": { + "index": { + "action": { + "write_consistency": "one" + }, + "number_of_shards": shards, + "number_of_replicas": replicas + } + } + } + + try: + print 'Creating index %s' % name + response = self.put('/%s/' % name.lower(), data) + + print response + + except Exception, e: + print traceback.format_exc() + + def delete_index(self, name): + try: + response = self.delete('/%s/' % name.lower()) + + print response + + except Exception, e: + print traceback.format_exc() + + def define_type_mapping(self, index_name, type_name): + try: + url = '/%s/_mapping/%s' % (index_name, type_name) + print url + + response = self.put(url, get_type_mapping(type_name)) + + print response + + except Exception, e: + print traceback.format_exc() + + +class Worker(Process): + def __init__(self, work_queue): + super(Worker, self).__init__() + self.api_client = APIClient('http://%s:9200' % es_hosts[random.randint(0, len(es_hosts) - 1)].get('host')) + self.work_queue = work_queue + self.es = Elasticsearch(es_hosts) + self.sentence_list = loremipsum.get_sentences(1000) + self.re_first_word = re.compile('([A-z]+)') + + def run(self): + print 'Starting %s ' % self.name + counter = 0 + + docs = {} + + while True: + index_batch_size = args.get('batch_size') + task = self.work_queue.get(timeout=600) + counter += 1 + + document = self.generate_document(task['field_count']) + flattened_doc = self.process_document(document, + task['type'], + task['uuid'], + task['uuid']) + + index_type_tuple = (task['index'], task['type']) + + # self.handle_document(task['index'], task['type'], task['uuid'], flattened_doc) + + doc_array = docs.get(index_type_tuple) + + if doc_array is None: + doc_array = [] + docs[index_type_tuple] = doc_array + + doc_array.append(flattened_doc) + + if len(doc_array) >= index_batch_size: + self.handle_batch(task['index'], task['type'], doc_array) + doc_array = [] + + self.work_queue.task_done() + + def generate_document(self, fields): + + doc = {} + + my_bool = True + + for i in xrange(fields): + sentence_index = random.randint(0, max((fields / 2) - 1, 1)) + sentence = self.sentence_list[sentence_index] + + if random.random() >= .5: + key = self.re_first_word.findall(sentence)[1] + else: + key = self.re_first_word.findall(sentence)[1] + str(i) + + field_type = random.random() + + if field_type <= 0.3: + doc[key] = sentence + + elif field_type <= 0.5: + doc[key] = random.randint(1, 1000000) + + elif field_type <= 0.6: + doc[key] = random.random() * 1000000000 + + elif field_type == 0.7: + doc[key] = my_bool + my_bool = not my_bool + + elif field_type == 0.8: + doc[key] = self.generate_document(max(fields / 5, 1)) + + elif field_type <= 1.0: + doc['mylocation'] = self.generate_location() + + return doc + + @staticmethod + def get_fields(document, base_name=None): + fields = [] + + for name, value in document.iteritems(): + if base_name: + field_name = '%s.%s' % (base_name, name) + else: + field_name = name + + if isinstance(value, dict): + fields += Worker.get_fields(value, field_name) + else: + value_name = None + if isinstance(value, basestring): + value_name = 'string' + + elif isinstance(value, bool): + value_name = 'boolean' + + elif isinstance(value, (int, long)): + value_name = 'long' + + elif isinstance(value, float): + value_name = 'double' + + if value_name: + field = { + 'name': field_name, + value_name: value + } + else: + field = { + 'name': field_name, + 'string': str(value) + } + + fields.append(field) + + return fields + + @staticmethod + def process_document(document, doc_type, application_id, uuid): + response = { + 'entityId': uuid, + 'entityVersion': '1', + 'entityType': doc_type, + 'applicationId': application_id, + 'fields': Worker.get_fields(document) + } + + return response + + def handle_document(self, index, doc_type, uuid, document): + + res = self.es.create(index=index, + doc_type=doc_type, + id=uuid, + body=document) + + print res + + def generate_location(self): + response = {} + + lat = random.random() * 90.0 + lon = random.random() * 180.0 + + lat_neg_true = True if lon > .5 else False + lon_neg_true = True if lat > .5 else False + + lat = lat * -1.0 if lat_neg_true else lat + lon = lon * -1.0 if lon_neg_true else lon + + response['location'] = { + 'lat': lat, + 'lon': lon + } + + return response + + def handle_batch(self, index, doc_type, docs): + print 'HANDLE BATCH' + self.api_client.define_type_mapping(index, doc_type) + self.api_client.index_docs(index, docs, doc_type) + + +def total_milliseconds(td): + return (td.microseconds + td.seconds * 1000000) / 1000 + + +def get_type_mapping(type_name): + return { + type_name: { + "_routing": { + "path": "entityId", + "required": True + }, + "properties": { + "entityId": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "entityVersion": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "entityType": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "applicationId": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "nodeId": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "edgeName": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "entityNodeType": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "edgeTimestamp": { + "type": "long", + "doc_values": True + }, + "edgeSearch": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "fields": { + "type": "nested", + "properties": { + "name": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + }, + "boolean": { + "type": "boolean", + "doc_values": True + }, + "long": { + "type": "long", + "doc_values": True + }, + "double": { + "type": "double", + "doc_values": True + }, + "location": { + "type": "geo_point", + "lat_lon": True, + "geohash": True, + "doc_values": True + }, + "string": { + "type": "string", + "norms": { + "enabled": False + }, + "fields": { + "exact": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + } + } + }, + "uuid": { + "type": "string", + "index": "not_analyzed", + "doc_values": True + } + } + } + }, + "_all": { + "enabled": False + } + + } + } + + +def main(): + INDEX_COUNT = args.get('index_count') + TYPE_COUNT = args.get('type_count') + SETUP = args.get('setup') + + indices = [] + types = [] + work_queue = JoinableQueue() + + apiclient = APIClient('http://%s:9200' % es_hosts[random.randint(1, len(es_hosts) - 1)].get('host')) + + workers = [Worker(work_queue) for x in xrange(args.get('workers'))] + [worker.start() for worker in workers] + + try: + # + for x in xrange(TYPE_COUNT): + type_name = '%s_%s' % (args.get('type_prefix'), x) + types.append(type_name) + + for x in xrange(INDEX_COUNT): + index_name = '%s_%s' % (args.get('index_prefix'), x) + indices.append(index_name) + + if SETUP: + print 'Running setup...' + + for index_name in indices: + apiclient.delete_index(index_name) + + time.sleep(5) + + for index_name in indices: + apiclient.create_index( + index_name, + shards=args['shard_count'], + replicas=args['replica_count']) + + # time.sleep(5) + + # for index_name in indices: + # for type_name in types: + # apiclient.define_type_mapping(index_name, type_name) + + # time.sleep(5) + + total_messages = args.get('document_count') + batch_size = 100000 + message_counter = 0 + fields = random.randint(50, 100) + + while message_counter < total_messages: + + for count in xrange(batch_size): + + for index_name in indices: + doc_id = str(uuid.uuid1()) + + task = { + 'field_count': fields, + 'uuid': doc_id, + 'index': index_name, + 'type': types[random.randint(0, len(types) - 1)] + } + + work_queue.put(task) + + print 'Joining queue counter=[%s]...' % message_counter + work_queue.join() + print 'Done queue counter=[%s]...' % message_counter + message_counter += batch_size + + except KeyboardInterrupt: + [worker.terminate() for worker in workers] + + +main() http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/requirements.txt ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/requirements.txt b/utils/usergrid-util-python/requirements.txt new file mode 100644 index 0000000..d15d7be --- /dev/null +++ b/utils/usergrid-util-python/requirements.txt @@ -0,0 +1,4 @@ +urllib3 +usergrid +requests +redis http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/samples/activity_streams/activity_streams.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/samples/activity_streams/activity_streams.py b/utils/usergrid-util-python/samples/activity_streams/activity_streams.py new file mode 100644 index 0000000..ce38544 --- /dev/null +++ b/utils/usergrid-util-python/samples/activity_streams/activity_streams.py @@ -0,0 +1,132 @@ +# docs page: http://docs.apigee.com/api-baas/content/creating-activity + +# create user 1 +# post event for user 1 +# check feed for user 1 + +# create user 2 +# user 2 follows user 1 +# post event for user 1 + +# check feed for user 1 +# check feed for user 2 +import json + +import requests + +collection_url_template = "{api_url}/{org}/{app}/{collection}" +entity_url_template = "{api_url}/{org}/{app}/{collection}/{entity_id}" +connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}" +connection_create_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}" + +user_url_template = "{api_url}/{org}/{app}/users/{username}" +user_feed_url_template = "{api_url}/{org}/{app}/users/{username}/feed" +user_activity_url_template = "{api_url}/{org}/{app}/users/{username}/activities" +user_follows_url_template = "{api_url}/{org}/{app}/users/{user2}/following/users/{user1}" + +url_data = { + 'api_url': 'https://amer-apibaas-prod.apigee.net/appservices', + 'org': 'jwest-samples', + 'app': 'feed-example' +} + +session = requests.Session() + + +def create_user(user): + data = { + 'username': user, + 'email': '%[email protected]' % user + } + + url = collection_url_template.format(collection='users', **url_data) + + r = session.post(url, json.dumps(data)) + + if r.status_code != 200: + print 'Error creating user [%s] at URL=[%s]: %s' % (user, url, r.text) + + +def post_activity(user, text): + activity = { + "actor": { + "displayName": user, + "username": user, + "image": { + "duration": 0, + "height": 80, + "url": "http://www.gravatar.com/avatar/", "width": 80}, + "email": "%[email protected]" % user + }, + "verb": "post", + "content": text + } + + url = user_activity_url_template.format(username=user, **url_data) + + r = session.post(url, json.dumps(activity)) + + if r.status_code != 200: + print 'Error creating activity for user [%s] at URL=[%s]: %s' % (user, url, r.text) + + +def get_feed(user): + url = user_feed_url_template.format(username=user, **url_data) + + r = session.get(url) + + if r.status_code != 200: + print 'Error getting feed for user [%s] at URL=[%s]: %s' % (user, url, r.text) + + else: + print '----- START' + print json.dumps(r.json(), indent=2) + print '----- END' + + +def create_follows(user, user_to_follow): + url = user_follows_url_template.format(user1=user, user2=user_to_follow, **url_data) + + r = session.post(url) + + print r.text + + if r.status_code != 200: + print 'Error getting creating follows from user [%s] to user [%s] at URL=[%s]: %s' % ( + user, user_to_follow, url, r.text) + + +def delete_user(username): + url = user_url_template.format(username=username, **url_data) + + r = session.post(url) + + # print r.text + + if r.status_code != 200: + print 'Error deleting user [%s] at URL=[%s]: %s' % (username, url, r.text) + + +user_base = 'natgeo' + +user1 = '%s_%s' % (user_base, 1) +user2 = '%s_%s' % (user_base, 2) + +create_user(user1) +post_activity(user1, 'Hello World!') + +get_feed(user1) + +create_user(user2) +create_follows(user2, user1) +post_activity(user2, "I'm here!") +get_feed(user2) + +post_activity(user1, 'SEE YA!!') + +get_feed(user2) + +get_feed(user1) + +delete_user(user1) +delete_user(user2) http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/samples/beacon-event-example.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/samples/beacon-event-example.py b/utils/usergrid-util-python/samples/beacon-event-example.py new file mode 100644 index 0000000..fc05cdc --- /dev/null +++ b/utils/usergrid-util-python/samples/beacon-event-example.py @@ -0,0 +1,196 @@ +# URL Templates for Usergrid +import json +import random + +import requests +from multiprocessing import Process, Pool + +import time + +collection_url_template = "{api_url}/{org}/{app}/{collection}" +entity_url_template = "{api_url}/{org}/{app}/{collection}/{entity_id}" +connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}" +connection_create_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}" + +url_data = { + 'api_url': 'https://amer-apibaas-prod.apigee.net/appservices', + 'org': 'jwest-samples', + 'app': 'event-example' +} + +url_data = { + 'api_url': 'http://usergrid_app.cfapps-01.haas-26.pez.pivotal.io', + 'org': 'jwest', + 'app': 'sandbox' +} + +session = requests.Session() + + +class EventGenerator(Process): + def __init__(self, store_id, event_count, user_array, beacons): + super(EventGenerator, self).__init__() + + self.store_id = store_id + self.user_array = user_array + self.event_count = event_count + self.beacons = beacons + self.session = requests.Session() + self.create_store(self.store_id) + self.create_users(self.user_array) + + def create_store(self, store_id): + url = entity_url_template.format(collection='stores', entity_id=store_id, **url_data) + + r = self.session.put(url, data=json.dumps({"name": store_id})) + + if r.status_code != 200: + print 'Error creating store [%s] at URL=[%s]: %s' % (store_id, url, r.text) + + def create_event(self, user, event): + print 'creating event: %s' % json.dumps(event) + + url = collection_url_template.format(collection='general-events', **url_data) + + r = self.session.post(url, data=json.dumps(event)) + + if r.status_code == 200: + res = r.json() + entity = res.get('entities')[0] + event_uuid = entity.get('uuid') + + # link to user + create_connection_url = connection_create_url_template.format(collection='users', + uuid=user, + verb='events', + target_uuid=event_uuid, + **url_data) + + r_connect = self.session.post(create_connection_url) + + if r_connect.status_code == 200: + print 'created connection: %s' % create_connection_url + + # link to store + create_connection_url = connection_create_url_template.format(collection='stores', + uuid=event.get('storeId'), + verb='events', + target_uuid=event_uuid, + **url_data) + + r_connect = self.session.post(create_connection_url) + + if r_connect.status_code == 200: + print 'created connection: %s' % create_connection_url + + if event.get('eventType') == 'beacon': + # link to beacon + create_connection_url = connection_create_url_template.format(collection='beacons', + uuid=event.get('beaconId'), + verb='events', + target_uuid=event_uuid, + **url_data) + + r_connect = self.session.post(create_connection_url) + + if r_connect.status_code == 200: + print 'created connection: %s' % create_connection_url + else: + print 'Error creating connection at URL=[%s]: %s' % (create_connection_url, r.text) + + def run(self): + + for user in self.user_array: + + # store 123 + self.create_event(user, { + 'storeId': self.store_id, + 'eventType': 'enterStore' + }) + + for x in xrange(0, self.event_count): + beacon_number = random.randint(0, len(self.beacons) - 1) + beacon_name = self.beacons[beacon_number] + + event = { + 'beaconId': '%s-%s' % (self.store_id, beacon_name), + 'storeId': self.store_id, + 'eventType': 'beacon' + } + + self.create_event(user, event) + + self.create_event(user, { + 'storeId': self.store_id, + 'eventType': 'exitStore' + }) + + def create_users(self, user_array): + for user in user_array: + self.create_user(user) + + def create_user(self, user): + data = { + 'username': user, + 'email': '%[email protected]' % user + } + + url = collection_url_template.format(collection='users', **url_data) + + r = self.session.post(url, json.dumps(data)) + + if r.status_code != 200: + print 'Error creating user [%s] at URL=[%s]: %s' % (user, url, r.text) + + +def create_entity(entity_type, entity_name): + url = entity_url_template.format(collection=entity_type, entity_id=entity_name, **url_data) + r = session.put(url, data=json.dumps({'name': entity_name})) + + if r.status_code != 200: + print 'Error creating %s [%s] at URL=[%s]: %s' % (entity_type, entity_name, url, r.text) + + +def create_beacon(beacon_name): + create_entity('beacons', beacon_name) + + +def create_store(store_name): + create_entity('stores', store_name) + + +def main(): + beacons = ["b1", "b2", "b3", "b4", "b5", "b6"] + + stores = ['store_123', 'store_456', 'store_789', 'store_901'] + + beacon_names = [] + + for store in stores: + for beacon in beacons: + beacon_names.append('%s-%s' % (store, beacon)) + + pool = Pool(16) + + pool.map(create_beacon, beacon_names) + pool.map(create_store, stores) + + processes = [ + EventGenerator(stores[0], 100, ['jeff', 'julie'], beacons=beacons), + EventGenerator(stores[0], 100, ['russo', 'dunker'], beacons=beacons), + EventGenerator(stores[2], 100, ['jeff', 'julie'], beacons=beacons), + EventGenerator(stores[2], 100, ['russo', 'dunker'], beacons=beacons), + EventGenerator(stores[3], 100, ['jeff', 'julie'], beacons=beacons), + EventGenerator(stores[3], 100, ['russo', 'dunker'], beacons=beacons), + EventGenerator(stores[1], 100, ['bala', 'shankar'], beacons=beacons), + EventGenerator(stores[1], 100, ['chet', 'anant'], beacons=beacons) + ] + + [p.start() for p in processes] + + while len([p for p in processes if p.is_alive()]) > 0: + print 'Processors active, waiting' + time.sleep(1) + + +main() http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/samples/counter_test.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/samples/counter_test.py b/utils/usergrid-util-python/samples/counter_test.py new file mode 100644 index 0000000..7852b26 --- /dev/null +++ b/utils/usergrid-util-python/samples/counter_test.py @@ -0,0 +1,31 @@ +import datetime +import time +import json + +import requests + +tstamp = time.gmtime() * 1000 + +s = requests.Session() + +s.headers.update({'authorization': 'Bearer YWMt7AHANAKcEeaVR-EahuX8EgAAAVQ7Q56jxQjUsmhJn8rGLTth0XtRrBSIzDA'}) +s.headers.update({'content-type': 'application/json'}) + +url = 'https://host/appservices-new/usergrid/pushtest/events' + +body = { + "timestamp": tstamp, + "counters": { + "counters.jeff.west": 1 + } +} + +r = s.post(url, data=json.dumps(body)) + +print r.status_code + +time.sleep(30) + +r = s.get('https://host/appservices-new/usergrid/pushtest//counters?counter=counters.jeff.west') + +print r.text http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/setup.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/setup.py b/utils/usergrid-util-python/setup.py new file mode 100755 index 0000000..1f19cb2 --- /dev/null +++ b/utils/usergrid-util-python/setup.py @@ -0,0 +1,40 @@ +from setuptools import setup, find_packages + +__author__ = 'Jeff West @ ApigeeCorporation' + +VERSION = '0.5.13' + +setup( + name='usergrid-tools', + version=VERSION, + description='Tools for working with Apache Usergrid', + url='http://usergrid.apache.org', + download_url="https://codeload.github.com/jwest-apigee/usergrid-util-python/zip/%s" % VERSION, + author='Jeff West', + author_email='[email protected]', + + # packages=['usergrid_tools', 'es_tools'], + packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests", "sandbox"]), + + install_requires=[ + 'requests', + 'usergrid>=0.1.3', + 'time_uuid', + 'argparse', + 'redis', + 'ConcurrentLogHandler', + ], + + entry_points={ + 'console_scripts': [ + 'usergrid_iterator = usergrid_tools.iterators.simple_iterator:main', + 'usergrid_data_migrator = usergrid_tools.migration.usergrid_data_migrator:main', + 'usergrid_data_exporter = usergrid_tools.migration.usergrid_data_exporter:main', + 'usergrid_entity_index_test = usergrid_tools.indexing.entity_index_test:main', + 'usergrid_batch_index_test = usergrid_tools.indexing.batch_index_test:main', + 'usergrid_parse_importer = usergrid_tools.parse_importer.parse_importer:main', + 'usergrid_deleter = usergrid_tools.parse_importer.parse_importer:main', + 'usergrid_library_check = usergrid_tools.library_check:main', + ] + } +) http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/__init__.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/usergrid_tools/__init__.py b/utils/usergrid-util-python/usergrid_tools/__init__.py new file mode 100644 index 0000000..beed654 --- /dev/null +++ b/utils/usergrid-util-python/usergrid_tools/__init__.py @@ -0,0 +1,4 @@ +import migration +import iterators +import indexing +import general http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/general/__init__.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/usergrid_tools/general/__init__.py b/utils/usergrid-util-python/usergrid_tools/general/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/general/deleter.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/usergrid_tools/general/deleter.py b/utils/usergrid-util-python/usergrid_tools/general/deleter.py new file mode 100644 index 0000000..3c53cae --- /dev/null +++ b/utils/usergrid-util-python/usergrid_tools/general/deleter.py @@ -0,0 +1,151 @@ +import json +import traceback +import requests + +__author__ = 'Jeff West @ ApigeeCorporation' + + +def total_milliseconds(td): + return (td.microseconds + td.seconds * 1000000) / 1000 + + +# for Apigee Developer, leave this as is. For paid BaaS instances change this to https://{your_api_url}/[appservices] +api_url = 'https://api.usergrid.com' + +# specify the org[] / app[] / collection[] to delete +# Org and App level are required. If no collections are specified, all collections will be deleted +# you also need to specify the client_id and secret of each org + +data_map = { + "orgs": + { + "myOrg": { + "apps": { + "myApp": { + "collections": [ + 'examples' + ] + } + }, + "credentials": { + "client_id": "foo", + "client_secret": "bar" + } + } + } +} +# it is generally not a good idea to delete more than 100 at a time due to latency and resource utilization +url_template = '{api_url}/{org}/{app}/{collection}?limit=250' + +session = requests.Session() + + +def check_response_status(response, message='', exit_on_error=True): + if response.status_code != 200: + print 'ERROR: ' + message + print response.text + + if exit_on_error: + exit() + + +def delete_all_collections(org, app, token): + url = '{api_url}/{org}/{app}'.format(api_url=api_url, org=org, app=app) + + print 'Listing collections at URL: %s' % url + + r = session.get(url) + + if r.status_code != 200: + print r.text + + collections = [] + + delete_collections(org, app, collections, token) + + +def delete_collections(org, app, collections, token): + print 'Deleting [%s] collections: %s' % (len(collections), collections) + + for collection in collections: + print 'Deleting collection [%s]...' % collection + + keep_going = True + + count_with_zero = 0 + + while keep_going: + + url = url_template.format(api_url=api_url, org=org, app=app, collection=collection) + + try: + response = session.get(url) + check_response_status(response, message='Unable to GET URL: %s' % url) + + count = len(response.json().get('entities')) + total_ms = total_milliseconds(response.elapsed) + + print 'GET %s from collection %s in %s' % (count, collection, total_ms) + print 'Deleting...' + + response = session.delete(url) + + check_response_status(response, message='UNABLE TO DELETE on URL: %s' % url) + + try: + count = len(response.json().get('entities')) + total_ms = total_milliseconds(response.elapsed) + + print 'Deleted %s from collection %s in %s' % (count, collection, total_ms) + + if count == 0: + count_with_zero += 1 + print 'Count with ZERO: %s' % count_with_zero + + # if there are 10 in a row with zero entities returned, we're done + if count_with_zero >= 10: + keep_going = False + else: + count_with_zero = 0 + except: + print 'Error! HTTP Status: %s response: %s' % (response.status_code, response.text) + + except KeyboardInterrupt: + exit() + + except: + print traceback.format_exc() + + +# iterate the orgs specified in the configuration above +for org, org_data in data_map.get('orgs', {}).iteritems(): + + credentials = org_data.get('credentials', {}) + + token_request = { + 'grant_type': 'client_credentials', + 'client_id': credentials.get('client_id'), + 'client_secret': credentials.get('client_secret'), + } + + token_url = '{api_url}/management/token'.format(api_url=api_url) + + r = session.post(token_url, data=json.dumps(token_request)) + + check_response_status(r, message='Unable to get Token at URL %s' % token_url) + + token = r.json().get('access_token') + session.headers.update({'Authorization': 'Bearer ' + token}) + + # iterate the apps specified in the config above + for app, app_data in org_data.get('apps', {}).iteritems(): + + collections = app_data.get('collections', []) + + # if the list of collections is empty, delete all collections + if len(collections) == 0: + delete_all_collections(org, app, token) + + # Otherwise, delete the specified collections + else: + delete_collections(org, app, collections, token) http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py ---------------------------------------------------------------------- diff --git a/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py b/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py new file mode 100644 index 0000000..3682d18 --- /dev/null +++ b/utils/usergrid-util-python/usergrid_tools/general/duplicate_name_checker.py @@ -0,0 +1,25 @@ +from usergrid import UsergridQueryIterator + +### This iterates a collection using GRAPH and checks whether there are more than on entity with the same name + +url = 'https://host/org/app/collection?access_token=foo&limit=1000' + +q = UsergridQueryIterator(url) + +name_tracker = {} +counter = 0 +for e in q: + counter += 1 + + if counter % 1000 == 1: + print 'Count: %s' % counter + + name = e.get('name') + + if name in name_tracker: + name_tracker[name].append(e.get('uuid')) + + print 'duplicates for name=[%s]: %s' % (name, name_tracker[name]) + + else: + name_tracker[name] = [e.get('uuid')]
