Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 4f9650b93 -> 35430a59d
Add migration system plugin into the script. Reformat logging, ensure appinfos are migrated only when they can be. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5803d58c Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5803d58c Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5803d58c Branch: refs/heads/two-dot-o-dev Commit: 5803d58c972562567645a8871c0d36c1f5ebf133 Parents: e3a4a95 Author: Michael Russo <michaelaru...@gmail.com> Authored: Thu Aug 13 13:12:33 2015 -0700 Committer: Michael Russo <michaelaru...@gmail.com> Committed: Thu Aug 13 13:12:33 2015 -0700 ---------------------------------------------------------------------- stack/scripts/migrate_entity_data.py | 155 +++++++++++++++++++++--------- 1 file changed, 112 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5803d58c/stack/scripts/migrate_entity_data.py ---------------------------------------------------------------------- diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py index fd4d936..13c1b41 100644 --- a/stack/scripts/migrate_entity_data.py +++ b/stack/scripts/migrate_entity_data.py @@ -15,13 +15,30 @@ # specific language governing permissions and limitations # under the License. # -# Usage from a machine running Usergrid: # -# python migrate_entity_data.py -u adminuser:adminpass (standard data migration and reindex) -# python migrate_entity_data.py -u adminuser:adminpass -f (force a re-migration ) -# python migrate_entity_data.py -u adminuser:adminpass -d <timestamp> (re-index only from the timestamp specified) # +# Usage from a machine running Usergrid with the new Usergrid version: # +# ###################################################### +# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION +# ###################################################### +# +# python migrate_entity_data.py --user adminuser:adminpass +# +# The above command performs an appinfo migration and system re-index only. This creates indices in Elasticsearch with +# the updated indexing strategy in the new Usergrid version. +# +# ###################################################### +# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION +# ###################################################### +# +# python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp> +# +# The above command performs an appinfo migration, system re-index using a start date, and full data migration which +# includes entity data. This step is necessary to ensure Usergrid starts reading and writing data from the latest +# entity version, including delta indexing of any documents create during the time between STEP 1 and STEP 2. If +# all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo migration will be skipped. + import sys @@ -35,30 +52,37 @@ import json # Version expected in status response post-migration for entity and app-info data TARGET_VERSION = 2 +TARGET_MIGRATION_SYSTEM_VERSION = 1 # Set an interval (in seconds) for checking if re-index and/or migration has finished STATUS_INTERVAL_SECONDS = 2 +# Set plugin names +PLUGIN_MIGRATION_SYSTEM = 'migration-system' +PLUGIN_APPINFO = 'appinfo-migration' +PLUGIN_ENTITYDATA = 'collections-entity-data' + + def parse_args(): parser = argparse.ArgumentParser(description='Usergrid Migration Tool') - parser.add_argument('-d', '--date', + parser.add_argument('--date', help='A date from which to start the migration', type=str) - parser.add_argument('-e', '--endpoint', + parser.add_argument('--endpoint', help='The endpoint to use for making API requests.', type=str, default='http://localhost:8080') - parser.add_argument('-u', '--user', + parser.add_argument('--user', help='System Admin Credentials used to authenticate with Usergrid <user:pass>', type=str, required=True) - parser.add_argument('-f', '--force', - help='Force a delta migration.', + parser.add_argument('--delta', + help='Run a delta migration.', action='store_true', default=False) @@ -91,7 +115,7 @@ class Migrate: self.logger = init_logging(self.__class__.__name__) self.admin_user = self.args['user'] self.admin_pass = self.args['pass'] - self.force_migration = self.args['force'] + self.delta_migration = self.args['delta'] def run(self): self.logger.info('Initializing...') @@ -104,24 +128,40 @@ class Migrate: try: - # Always run an app info migration first - if self.is_appinfo_migrated(): - self.logger.info('AppInfo already migrated. Resetting version for re-migration.') - self.reset_appinfo_migration() - time.sleep(STATUS_INTERVAL_SECONDS) + # We need to check and roll the migration system to 1 if not already + migration_system_updated = self.is_migration_system_updated() - self.start_appinfo_migration() - self.logger.info('AppInfo Migration Started.') - self.metrics['appinfo_migration_start'] = get_current_time() + if not migration_system_updated: + self.logger.info('Migration system needs to updated. Updating migration system..') + self.start_migration_system_update() + while not migration_system_updated: + time.sleep(STATUS_INTERVAL_SECONDS) + migration_system_updated = self.is_migration_system_updated() + if migration_system_updated: + break - is_appinfo_migrated = False - while not is_appinfo_migrated: - is_appinfo_migrated = self.is_appinfo_migrated() - time.sleep(STATUS_INTERVAL_SECONDS) - if is_appinfo_migrated: - self.metrics['appinfo_migration_end'] = get_current_time() - break - self.logger.info('AppInfo Migration Ended.') + # Run AppInfo migration only when both appinfos and collection entity data have not been migrated + if not self.is_data_migrated(): + + if self.is_appinfo_migrated(): + self.logger.info('AppInfo already migrated. Resetting version for re-migration.') + self.reset_appinfo_migration() + time.sleep(STATUS_INTERVAL_SECONDS) + + self.start_appinfo_migration() + self.logger.info('AppInfo Migration Started.') + self.metrics['appinfo_migration_start'] = get_current_time() + + is_appinfo_migrated = False + while not is_appinfo_migrated: + is_appinfo_migrated = self.is_appinfo_migrated() + time.sleep(STATUS_INTERVAL_SECONDS) + if is_appinfo_migrated: + self.metrics['appinfo_migration_end'] = get_current_time() + break + self.logger.info('AppInfo Migration Ended.') + else: + self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.') # Perform system re-index (it will grab date from input if provided) job = self.start_reindex() @@ -137,10 +177,10 @@ class Migrate: self.logger.info("Finished Re-index. Job=[%s]", job) self.metrics['reindex_end'] = get_current_time() - # Only when we do a delta (force migration) do we run the full data migration (includes entity data) - if self.force_migration: + # Only when we do a delta migration do we run the full data migration (includes appinfo and entity data) + if self.delta_migration: - self.logger.info('Force option provided. Performing full data migration...') + self.logger.info('Delta option provided. Performing full data migration...') if self.is_data_migrated(): self.reset_data_migration() time.sleep(STATUS_INTERVAL_SECONDS) @@ -191,9 +231,19 @@ class Migrate: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) + def start_migration_system_update(self): + try: + migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM + r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass)) + response = r.json() + return response + except requests.exceptions.RequestException as e: + self.logger.error('Failed to start migration, %s', e) + exit_on_error(str(e)) + def start_appinfo_migration(self): try: - migrateUrl = self.get_migration_url() + '/' + 'appinfo-migration' + migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass)) response = r.json() return response @@ -203,12 +253,12 @@ class Migrate: def reset_data_migration(self): version = TARGET_VERSION - 1 - body = json.dumps({'collections-entity-data': version, 'appinfo-migration': version}) + body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version}) try: r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass)) response = r.json() - self.logger.info('Resetting data migration versions to collections-entity-data=[v%s] ' - 'and appinfo-migration=[v%s]', version, version) + self.logger.info('Resetting data migration versions to %s=[%s] ' + 'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version) return response except requests.exceptions.RequestException as e: self.logger.error('Failed to reset full data migration versions, %s', e) @@ -216,11 +266,11 @@ class Migrate: def reset_appinfo_migration(self): version = TARGET_VERSION - 1 - body = json.dumps({'appinfo-migration': version}) + body = json.dumps({PLUGIN_APPINFO: version}) try: r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass)) response = r.json() - self.logger.info('Resetting appinfo migration versions to appinfo-migration=[v%s]', version) + self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version) return response except requests.exceptions.RequestException as e: self.logger.error('Failed to reset appinfo migration version, %s', e) @@ -229,13 +279,15 @@ class Migrate: def is_data_migrated(self): status = self.check_data_migration_status() if status is not None: - entity_version = status['data']['collections-entity-data'] - appinfo_version = status['data']['appinfo-migration'] + entity_version = status['data'][PLUGIN_ENTITYDATA] + appinfo_version = status['data'][PLUGIN_APPINFO] if entity_version == TARGET_VERSION and appinfo_version == TARGET_VERSION: - self.logger.info('Full Data Migration status=[COMPLETE], collections-entity-data=[v%s], ' - 'appinfo-migration=[v%s]', + self.logger.info('Full Data Migration status=[COMPLETE], %s=[%s], ' + '%s=[%s]', + PLUGIN_ENTITYDATA, entity_version, + PLUGIN_APPINFO, appinfo_version) return True else: @@ -245,17 +297,34 @@ class Migrate: def is_appinfo_migrated(self): status = self.check_data_migration_status() if status is not None: - appinfo_version = status['data']['appinfo-migration'] + appinfo_version = status['data'][PLUGIN_APPINFO] if appinfo_version == TARGET_VERSION: self.logger.info('AppInfo Migration status=[COMPLETE],' - 'appinfo-migration=[v%s]', + '%s=[%s]', + PLUGIN_APPINFO, appinfo_version) return True else: self.logger.info('AppInfo Migration status=[NOTSTARTED/INPROGRESS]') return False + def is_migration_system_updated(self): + status = self.check_data_migration_status() + if status is not None: + migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM] + + if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION: + self.logger.info('Migration System CURRENT, %s=[%s]', + PLUGIN_MIGRATION_SYSTEM, + migration_system_version) + return True + else: + self.logger.info('Migration System OLD, %s=[%s]', + PLUGIN_MIGRATION_SYSTEM, + migration_system_version) + return False + def check_data_migration_status(self): try: @@ -348,8 +417,8 @@ def init_logging(name): logger = logging.getLogger(name) log_file_name = './migration.log' - log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | %(levelname)s | %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p') + log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name, mode='a',