First and untested stab at multi-tenant migration.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d79bf4c3 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d79bf4c3 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d79bf4c3 Branch: refs/heads/USERGRID-909 Commit: d79bf4c36810fdab578a86e4de0ae725f1aa6e75 Parents: 2686054 Author: Dave Johnson <[email protected]> Authored: Thu Oct 29 08:56:14 2015 -0400 Committer: Dave Johnson <[email protected]> Committed: Thu Oct 29 08:56:14 2015 -0400 ---------------------------------------------------------------------- stack/scripts/create_test_data.py | 213 +++++++++++++++++++++ stack/scripts/migrate_entity_data.py | 301 +++++++++++++++++++++--------- 2 files changed, 429 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/d79bf4c3/stack/scripts/create_test_data.py ---------------------------------------------------------------------- diff --git a/stack/scripts/create_test_data.py b/stack/scripts/create_test_data.py new file mode 100644 index 0000000..de85da0 --- /dev/null +++ b/stack/scripts/create_test_data.py @@ -0,0 +1,213 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys +import logging +from logging.handlers import RotatingFileHandler +import argparse +import time +import requests +import json + +# Creates two organizations each with two apps each with three collections each with 100 entities +# Caller must provide a "slug" string which will be used as a prefix for all names +# +# For example, if the slug is mytest then: +# Orgs will be named mytest_org0 and mytest_org1 +# Apps will be named mytest_org0_app0 and so on +# Collections will be named mytest_org0_app0_col0 and so on +# Entities will be named mytest_org0_app0_col0_entity and so on +# Org admins will be named mytest_org0_admin and mytest_org1_admin (both with password test) + +def parse_args(): + parser = argparse.ArgumentParser(description="Usergrid Test Data Creation Tool") + + parser.add_argument("--endpoint", + help="The endpoint to use for making API requests.", + type=str, + default="http://localhost:8080") + + parser.add_argument("--user", + help="System Admin Credentials used to authenticate with Usergrid <user:pass>", + type=str, + required=True) + + parser.add_argument("--slug", + help="Unique string to be used to name organization, applications and other things create", + type=str, + required=True) + + my_args = parser.parse_args(sys.argv[1:]) + + arg_vars = vars(my_args) + creds = arg_vars["user"].split(":") + if len(creds) != 2: + print("Credentials not properly specified. Must be '-u <user:pass>'. Exiting...") + exit_on_error() + else: + arg_vars["user"] = creds[0] + arg_vars["pass"] = creds[1] + + return arg_vars + + +class Creator: + def __init__(self): + self.args = parse_args() + self.endpoint = self.args["endpoint"] + self.logger = init_logging(self.__class__.__name__) + self.admin_user = self.args["user"] + self.admin_pass = self.args["pass"] + self.slug = self.args["slug"] + + def run(self): + self.logger.info("Initializing...") + + if not self.is_endpoint_available(): + exit_on_error("Endpoint is not available, aborting") + + for orgIndex in range(2): + orgName = self.slug + "_org" + str(orgIndex) + orgUser = orgName + "_admin" + orgEmail = orgUser + "@example.com" + + url = self.endpoint + "/management/orgs" + body = json.dumps({"username":orgUser, "email":orgEmail, "password":"test", "organization":orgName }) + r = requests.post(url=url, data=body, auth=(self.admin_user, self.admin_pass)) + if ( r.status_code >= 400 ): + print "Error creating organization " + orgName + ": " + r.text + return + + print "Created org " + orgName + + url = self.endpoint + "/management/token" + body = json.dumps({"grant_type":"password","username":orgUser,"password":"test"}) + r = requests.post(url=url, data=body) + if ( r.status_code != 200 ): + print "Error logging into organization " + orgName + ": " + r.text + return + + accessToken = r.json()["access_token"] + + for appIndex in range(2): + appName = orgName + "_app" + str(appIndex) + + url = self.endpoint + "/management/orgs/" + orgName + "/apps?access_token=" + accessToken + body = json.dumps({"name":appName}) + r = requests.post(url=url, data=body, auth=(self.admin_user, self.admin_pass)) + if ( r.status_code >= 400 ): + print "Error creating application" + appName + ": " + r.text + return + + print " Created app: " + orgName + "/" + appName + appUrl = self.endpoint + "/" + orgName + "/" + appName + time.sleep(2) + + for userIndex in range(2): + userName = appName + "_user" + str(userIndex) + email = userName + "@example.com" + + url = appUrl + "/users?access_token=" + accessToken + body = json.dumps({"name":userName, "username":userName, "email":email, "password":"test"}) + r = requests.post(url=url, data=body) + if ( r.status_code >= 400 ): + print "Error creating user " + userName + ": " + r.text + return + + for colIndex in range(3): + colName = appName + "_col" + str(colIndex) + print " Creating collection: " + colName + + for entityIndex in range(100): + entityName = colName + "_entity" + str(entityIndex) + + url = appUrl + "/" + colName + "s?access_token=" + accessToken + body = json.dumps({"name":entityName}) + r = requests.post(url=url, data=body) + if ( r.status_code >= 400 ): + print "Error creating entity" + userName + ": " + r.text + retur + + # connect entities in collection 0 to collection 1 + for entityIndex in range(100): + sourceCollection = appName + "_col0s" + sourceName = appName + "_col0_entity" + str(entityIndex) + targetName = appName + "_col1_entity" + str(entityIndex) + targetType = appName + "_col1" + url = appUrl + "/" + sourceCollection + "/" + sourceName + "/has/" + targetType + "/" + targetName + r = requests.post(url=url + "?access_token=" + accessToken) + if ( r.status_code >= 400 ): + print "Error connecting entity " + sourceName + " to " + targetName + ": " + r.text + print "url is: " + url + return + + # connect entities in collection 1 to collection 2 + for entityIndex in range(100): + sourceCollection = appName + "_col1s" + sourceName = appName + "_col1_entity" + str(entityIndex) + targetName = appName + "_col2_entity" + str(entityIndex) + targetType = appName + "_col2" + url = appUrl + "/" + sourceCollection + "/" + sourceName + "/has/" + targetType + "/" + targetName + r = requests.post(url=url + "?access_token=" + accessToken) + if ( r.status_code >= 400 ): + print "Error connecting entity " + sourceName + " to " + targetName + ": " + r.text + print "url is: " + url + return + + def is_endpoint_available(self): + + try: + r = requests.get(url=self.endpoint+"/status") + if r.status_code == 200: + return True + except requests.exceptions.RequestException as e: + self.logger.error("Endpoint is unavailable, %s", str(e)) + return False + + +def exit_on_error(e=""): + print ("Exiting script due to error: " + str(e)) + sys.exit(1) + + +def init_logging(name): + + logger = logging.getLogger(name) + log_file_name = "./create-test-data.log" + log_formatter = logging.Formatter(fmt="%(asctime)s [%(name)s] %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S") + + rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name, + mode="a", + maxBytes=104857600, + backupCount=10) + rotating_file.setFormatter(log_formatter) + rotating_file.setLevel(logging.INFO) + logger.addHandler(rotating_file) + logger.setLevel(logging.INFO) + + stdout_logger = logging.StreamHandler(sys.stdout) + stdout_logger.setFormatter(log_formatter) + stdout_logger.setLevel(logging.INFO) + logger.addHandler(stdout_logger) + + return logger + +if __name__ == "__main__": + + creator = Creator() + creator.run() http://git-wip-us.apache.org/repos/asf/usergrid/blob/d79bf4c3/stack/scripts/migrate_entity_data.py ---------------------------------------------------------------------- diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py index 36f73c0..f576108 100644 --- a/stack/scripts/migrate_entity_data.py +++ b/stack/scripts/migrate_entity_data.py @@ -16,30 +16,50 @@ # under the License. # # +# To migrate multiple tenants within one cluster. # -# Usage from a machine running Usergrid with the new Usergrid version: +# STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION # -# ###################################################### -# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION -# ###################################################### +# python migrate_entity_data.py --org <org1name> --user <superuser>:<superpass> --init # -# python migrate_entity_data.py --user adminuser:adminpass +# This command will setup and bootstrap the database, setup the migration system and update index mappings: +# - /system/database/setup +# - /system/database/bootstrap +# - /system/migrate/run/migration-system +# - /system/migrate/run/index_mapping_migration # -# The above command performs an appinfo migration and system re-index only. This creates indices in Elasticsearch with -# the updated indexing strategy in the new Usergrid version. +# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps +# it will de-dup connections and re-index the app. # -# ###################################################### -# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION -# ###################################################### +# STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION # -# python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp> +# python migrate_entity_data.py --org <org1name> --user <superuser>:<superpass> --date +# +# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps +# it will de-dup connections and re-index the app with a start-date specified so only data modified since +# STEP 1 will be re-indexed. +# +# STEP 3 - SETUP TENENT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE +# +# python migrate_entity_data.py --org <org2name> --user <superuser>:<superpass> --date +# +# This command will migrate appinfos, re-index the management app and then for each of the specified org's apps +# it will de-dup connections and re-index the app. +# +# STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION +# +# python migrate_entity_data.py --org <org2name> --user <superuser>:<superpass> --date +# +# Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps +# it will de-dup connections and re-index the app with a start-date specified so only data modified since +# STEP 1 will be re-indexed. +# +# STEP 5 - FULL DATA MIGRATION +# +# python migrate_entity_data.py --user <superuser>:<superpass> --full +# +# This command will run the full data migration. # -# The above command performs an appinfo migration, system re-index using a start date, and full data migration which -# includes entity data. This step is necessary to ensure Usergrid starts reading and writing data from the latest -# entity version, including delta indexing of any documents create during the time between STEP 1 and STEP 2. If -# all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo migration will be skipped. - - import sys import logging @@ -72,10 +92,6 @@ PLUGIN_CORE_DATA = 'core-data' def parse_args(): parser = argparse.ArgumentParser(description='Usergrid Migration Tool') - parser.add_argument('--date', - help='A date from which to start the migration', - type=str) - parser.add_argument('--endpoint', help='The endpoint to use for making API requests.', type=str, @@ -86,8 +102,22 @@ def parse_args(): type=str, required=True) - parser.add_argument('--delta', - help='Run a delta migration.', + parser.add_argument('--init', + help='Init system and start first migration.', + action='store_true', + default=False) + + parser.add_argument('--org', + help='Name of organization on which to run migration.', + type=str, + required=False) + + parser.add_argument('--date', + help='A date from which to start the migration', + type=str) + + parser.add_argument('--full', + help='Run full data migration (last step in cluster migration).', action='store_true', default=False) @@ -120,7 +150,9 @@ class Migrate: self.logger = init_logging(self.__class__.__name__) self.admin_user = self.args['user'] self.admin_pass = self.args['pass'] - self.delta_migration = self.args['delta'] + self.org = self.args['org'] + self.init = self.args['init'] + self.full = self.args['full'] def run(self): self.logger.info('Initializing...') @@ -133,62 +165,82 @@ class Migrate: try: - self.run_database_setup() + if self.full: - # We need to check and roll the migration system to 1 if not already - migration_system_updated = self.is_migration_system_updated() + # Do full data migration and exit - if not migration_system_updated: - self.logger.info('Migration system needs to be updated. Updating migration system..') - self.start_migration_system_update() - while not migration_system_updated: + self.start_fulldata_migration() + + self.metrics['full_data_migration_start'] = get_current_time() + self.logger.info("Full Data Migration Started") + is_migrated = False + while not is_migrated: time.sleep(STATUS_INTERVAL_SECONDS) - migration_system_updated = self.is_migration_system_updated() - if migration_system_updated: + is_migrated = self.is_data_migrated() + if is_migrated: break - index_mapping_updated = self.is_index_mapping_updated() + self.metrics['full_data_migration_end'] = get_current_time() + self.logger.info("Full Data Migration completed") - if not index_mapping_updated: - self.logger.info('Index Mapping needs to be updated. Updating index mapping..') - self.start_index_mapping_migration() - while not index_mapping_updated: - time.sleep(STATUS_INTERVAL_SECONDS) - index_mapping_updated = self.is_index_mapping_updated() - if index_mapping_updated: - break + self.log_metrics() + self.logger.info("Finished...") - # Run AppInfo migration only when both appinfos and collection entity data have not been migrated - if not self.is_data_migrated(): + return - #Migrate app info - if self.is_appinfo_migrated(): - self.logger.info('AppInfo already migrated. Resetting version for re-migration.') - self.reset_appinfo_migration() - time.sleep(STATUS_INTERVAL_SECONDS) - self.start_appinfo_migration() - self.logger.info('AppInfo Migration Started.') - self.metrics['appinfo_migration_start'] = get_current_time() + if self.init: - is_appinfo_migrated = False - while not is_appinfo_migrated: - is_appinfo_migrated = self.is_appinfo_migrated() - time.sleep(STATUS_INTERVAL_SECONDS) - if is_appinfo_migrated: - self.metrics['appinfo_migration_end'] = get_current_time() - break - self.logger.info('AppInfo Migration Ended.') + # Init the migration system as this is the first migration done on the cluster + self.run_database_setup() - else: - self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.') + migration_system_updated = self.is_migration_system_updated() + if not migration_system_updated: + self.logger.info('Migration system needs to be updated. Updating migration system..') + self.start_migration_system_update() + while not migration_system_updated: + time.sleep(STATUS_INTERVAL_SECONDS) + migration_system_updated = self.is_migration_system_updated() + if migration_system_updated: + break + index_mapping_updated = self.is_index_mapping_updated() - # We need to check and roll index mapping version to 1 if not already there + if not index_mapping_updated: + self.logger.info('Index Mapping needs to be updated. Updating index mapping..') + self.start_index_mapping_migration() + while not index_mapping_updated: + time.sleep(STATUS_INTERVAL_SECONDS) + index_mapping_updated = self.is_index_mapping_updated() + if index_mapping_updated: + break + + + # Migrate app info + + if self.is_appinfo_migrated(): + self.logger.info('AppInfo already migrated. Resetting version for re-migration.') + self.reset_appinfo_migration() + time.sleep(STATUS_INTERVAL_SECONDS) + + self.start_appinfo_migration() + self.logger.info('AppInfo Migration Started.') + self.metrics['appinfo_migration_start'] = get_current_time() + + is_appinfo_migrated = False + while not is_appinfo_migrated: + is_appinfo_migrated = self.is_appinfo_migrated() + time.sleep(STATUS_INTERVAL_SECONDS) + if is_appinfo_migrated: + self.metrics['appinfo_migration_end'] = get_current_time() + break + self.logger.info('AppInfo Migration Ended.') + + + # Reindex management app - # Perform system re-index (it will grab date from input if provided) job = self.start_reindex() self.metrics['reindex_start'] = get_current_time() self.logger.info('Started Re-index. Job=[%s]', job) @@ -202,33 +254,44 @@ class Migrate: self.logger.info("Finished Re-index. Job=[%s]", job) self.metrics['reindex_end'] = get_current_time() - # Only when we do a delta migration do we run the full data migration (includes appinfo and entity data) - if self.delta_migration: - self.logger.info('Delta option provided. Performing full data migration...') - if self.is_data_migrated(): - self.reset_data_migration() - time.sleep(STATUS_INTERVAL_SECONDS) - self.is_data_migrated() + # Dedup and re-index all of organization's apps - # self.start_core_data_migration() - self.start_fulldata_migration() + app_ids = self.get_app_ids() + for app_id in app_ids: - self.metrics['full_data_migration_start'] = get_current_time() - self.logger.info("Full Data Migration Started") - is_migrated = False - while not is_migrated: + # De-dep app + job = self.start_app_dedup(app_id) + self.metrics['dedup_start_' + app_id] = get_current_time() + self.logger.info('Started dedup. App=[%s], Job=[%s]', app_id, job) + is_running = True + while is_running: time.sleep(STATUS_INTERVAL_SECONDS) - is_migrated = self.is_data_migrated() - if is_migrated: + is_running = self.is_reindex_running(job) + if not is_running: break - self.metrics['full_data_migration_end'] = get_current_time() - self.logger.info("Full Data Migration completed") + self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job) + self.metrics['dedup_end_' + app_id] = get_current_time() + + # Re-index app + job = self.start_app_reindex(app_id) + self.metrics['reindex_start_' + app_id] = get_current_time() + self.logger.info('Started Re-index. App=[%s], Job=[%s]', app_id, job) + is_running = True + while is_running: + time.sleep(STATUS_INTERVAL_SECONDS) + is_running = self.is_reindex_running(job) + if not is_running: + break + + self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job) + self.metrics['reindex_end_' + app_id] = get_current_time() self.log_metrics() self.logger.info("Finished...") + except KeyboardInterrupt: self.log_metrics() self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.') @@ -237,6 +300,10 @@ class Migrate: url = self.endpoint + '/system/database/setup' return url + def get_database_bootstrap_url(self): + url = self.endpoint + '/system/database/bootstrap' + return url + def get_migration_url(self): url = self.endpoint + '/system/migrate/run' return url @@ -249,6 +316,10 @@ class Migrate: url = self.endpoint + '/system/migrate/status' return url + def get_dedup_url(self): + url = self.endpoint + '/system/connection/dedup' + return url + def get_reindex_url(self): url = self.endpoint + '/system/index/rebuild' return url @@ -257,7 +328,6 @@ class Migrate: url = self.get_reindex_url() + "/management" return url - def start_core_data_migration(self): try: r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass)) @@ -267,7 +337,6 @@ class Migrate: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) - def start_fulldata_migration(self): try: r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass)) @@ -279,7 +348,7 @@ class Migrate: def start_migration_system_update(self): try: - #TODO fix this URL + # TODO fix this URL migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass)) response = r.json() @@ -299,6 +368,17 @@ class Migrate: self.logger.error('Failed to run database setup, %s', e) exit_on_error(str(e)) + def run_database_bootstrap(self): + try: + setupUrl = self.get_database_bootstrap_url() + r = requests.put(url=setupUrl, auth=(self.admin_user, self.admin_pass)) + if r.status_code != 200: + exit_on_error('Database Bootstrap Failed') + + except requests.exceptions.RequestException as e: + self.logger.error('Failed to run database bootstrap, %s', e) + exit_on_error(str(e)) + def start_index_mapping_migration(self): try: migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING @@ -437,7 +517,7 @@ class Migrate: self.logger.error('Failed to get reindex status, %s', e) # exit_on_error() - def start_reindex(self): + def start_app_reindex(self, appId): body = "" if self.start_date is not None: body = json.dumps({'updated': self.start_date}) @@ -463,6 +543,39 @@ class Migrate: else: return False + def get_dedup_status(self, job): + status_url = self.get_dedup_url()+'/' + job + try: + r = requests.get(url=status_url, auth=(self.admin_user, self.admin_pass)) + response = r.json() + return response['status'] + except requests.exceptions.RequestException as e: + self.logger.error('Failed to get dedup status, %s', e) + # exit_on_error() + + def start_dedup(self, app_id): + body = "" + try: + r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.admin_user, self.admin_pass)) + if r.status_code == 200: + response = r.json() + return response['jobId'] + else: + self.logger.error('Failed to start dedup, %s', r) + exit_on_error(str(r)) + + except requests.exceptions.RequestException as e: + self.logger.error('Unable to make API request for dedup, %s', e) + exit_on_error(str(e)) + + def is_dedup_running(self, job): + status = self.get_dedup_status(job) + self.logger.info('Dedup status=[%s]', status) + if status != "COMPLETE": + return True + else: + return False + def is_endpoint_available(self): try: @@ -490,6 +603,24 @@ class Migrate: ) + def get_app_ids(self): + + try: + url = self.endpoint + "/management/orgs/" + self.org + "/apps" + r = requests.get(url=url) + if r.status_code != 200: + exit_on_error('Database Bootstrap Failed') + + apps = r.json()["data"] + app_ids = [] + for appId in apps.values(): + app_ids.append(appId) + + return app_ids; + + except requests.exceptions.RequestException as e: + self.logger.error('Unable to get list of application ids, %s', e) + exit_on_error(str(e)) def get_current_time(): return str(int(time.time()*1000))
