Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 4f9650b93 -> 35430a59d


Add migration system plugin into the script.  Reformat logging, ensure appinfos 
are migrated only when they can be.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5803d58c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5803d58c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5803d58c

Branch: refs/heads/two-dot-o-dev
Commit: 5803d58c972562567645a8871c0d36c1f5ebf133
Parents: e3a4a95
Author: Michael Russo <michaelaru...@gmail.com>
Authored: Thu Aug 13 13:12:33 2015 -0700
Committer: Michael Russo <michaelaru...@gmail.com>
Committed: Thu Aug 13 13:12:33 2015 -0700

----------------------------------------------------------------------
 stack/scripts/migrate_entity_data.py | 155 +++++++++++++++++++++---------
 1 file changed, 112 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5803d58c/stack/scripts/migrate_entity_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/migrate_entity_data.py 
b/stack/scripts/migrate_entity_data.py
index fd4d936..13c1b41 100644
--- a/stack/scripts/migrate_entity_data.py
+++ b/stack/scripts/migrate_entity_data.py
@@ -15,13 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-# Usage from a machine running Usergrid:
 #
-# python migrate_entity_data.py -u adminuser:adminpass    (standard data 
migration and reindex)
-# python migrate_entity_data.py -u adminuser:adminpass -f    (force a 
re-migration )
-# python migrate_entity_data.py -u adminuser:adminpass -d <timestamp>    
(re-index only from the timestamp specified)
 #
+# Usage from a machine running Usergrid with the new Usergrid version:
 #
+# ######################################################
+# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
+#
+# python migrate_entity_data.py --user adminuser:adminpass
+#
+# The above command performs an appinfo migration and system re-index only.  
This creates indices in Elasticsearch with
+# the updated indexing strategy in the new Usergrid version.
+#
+# ######################################################
+# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
+#
+# python migrate_entity_data.py --user adminuser:adminpass --delta --date 
<timestamp>
+#
+# The above command performs an appinfo migration, system re-index using a 
start date, and full data migration which
+# includes entity data.  This step is necessary to ensure Usergrid starts 
reading and writing data from the latest
+# entity version, including delta indexing of any documents create during the 
time between STEP 1 and STEP 2.  If
+# all data has already been migrated (running this a 2nd, 3rd, etc. time), 
then the appinfo migration will be skipped.
+
 
 
 import sys
@@ -35,30 +52,37 @@ import json
 
 # Version expected in status response post-migration for entity and app-info 
data
 TARGET_VERSION = 2
+TARGET_MIGRATION_SYSTEM_VERSION = 1
 
 # Set an interval (in seconds) for checking if re-index and/or migration has 
finished
 STATUS_INTERVAL_SECONDS = 2
 
+# Set plugin names
+PLUGIN_MIGRATION_SYSTEM = 'migration-system'
+PLUGIN_APPINFO = 'appinfo-migration'
+PLUGIN_ENTITYDATA = 'collections-entity-data'
+
+
 
 def parse_args():
     parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
 
-    parser.add_argument('-d', '--date',
+    parser.add_argument('--date',
                         help='A date from which to start the migration',
                         type=str)
 
-    parser.add_argument('-e', '--endpoint',
+    parser.add_argument('--endpoint',
                         help='The endpoint to use for making API requests.',
                         type=str,
                         default='http://localhost:8080')
 
-    parser.add_argument('-u', '--user',
+    parser.add_argument('--user',
                         help='System Admin Credentials used to authenticate 
with Usergrid  <user:pass>',
                         type=str,
                         required=True)
 
-    parser.add_argument('-f', '--force',
-                        help='Force a delta migration.',
+    parser.add_argument('--delta',
+                        help='Run a delta migration.',
                         action='store_true',
                         default=False)
 
@@ -91,7 +115,7 @@ class Migrate:
         self.logger = init_logging(self.__class__.__name__)
         self.admin_user = self.args['user']
         self.admin_pass = self.args['pass']
-        self.force_migration = self.args['force']
+        self.delta_migration = self.args['delta']
 
     def run(self):
         self.logger.info('Initializing...')
@@ -104,24 +128,40 @@ class Migrate:
 
         try:
 
-            # Always run an app info migration first
-            if self.is_appinfo_migrated():
-                self.logger.info('AppInfo already migrated. Resetting version 
for re-migration.')
-                self.reset_appinfo_migration()
-                time.sleep(STATUS_INTERVAL_SECONDS)
+            # We need to check and roll the migration system to 1 if not 
already
+            migration_system_updated = self.is_migration_system_updated()
 
-            self.start_appinfo_migration()
-            self.logger.info('AppInfo Migration Started.')
-            self.metrics['appinfo_migration_start'] = get_current_time()
+            if not migration_system_updated:
+                self.logger.info('Migration system needs to updated.  Updating 
migration system..')
+                self.start_migration_system_update()
+                while not migration_system_updated:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    migration_system_updated = 
self.is_migration_system_updated()
+                    if migration_system_updated:
+                        break
 
-            is_appinfo_migrated = False
-            while not is_appinfo_migrated:
-                is_appinfo_migrated = self.is_appinfo_migrated()
-                time.sleep(STATUS_INTERVAL_SECONDS)
-                if is_appinfo_migrated:
-                    self.metrics['appinfo_migration_end'] = get_current_time()
-                    break
-            self.logger.info('AppInfo Migration Ended.')
+            # Run AppInfo migration only when both appinfos and collection 
entity data have not been migrated
+            if not self.is_data_migrated():
+
+                if self.is_appinfo_migrated():
+                    self.logger.info('AppInfo already migrated. Resetting 
version for re-migration.')
+                    self.reset_appinfo_migration()
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+
+                self.start_appinfo_migration()
+                self.logger.info('AppInfo Migration Started.')
+                self.metrics['appinfo_migration_start'] = get_current_time()
+
+                is_appinfo_migrated = False
+                while not is_appinfo_migrated:
+                    is_appinfo_migrated = self.is_appinfo_migrated()
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    if is_appinfo_migrated:
+                        self.metrics['appinfo_migration_end'] = 
get_current_time()
+                        break
+                self.logger.info('AppInfo Migration Ended.')
+            else:
+                self.logger.info('Full Data Migration previously ran... 
skipping AppInfo migration.')
 
             # Perform system re-index (it will grab date from input if 
provided)
             job = self.start_reindex()
@@ -137,10 +177,10 @@ class Migrate:
             self.logger.info("Finished Re-index. Job=[%s]", job)
             self.metrics['reindex_end'] = get_current_time()
 
-            # Only when we do a delta (force migration) do we run the full 
data migration (includes entity data)
-            if self.force_migration:
+            # Only when we do a delta migration do we run the full data 
migration (includes appinfo and entity data)
+            if self.delta_migration:
 
-                self.logger.info('Force option provided. Performing full data 
migration...')
+                self.logger.info('Delta option provided. Performing full data 
migration...')
                 if self.is_data_migrated():
                     self.reset_data_migration()
                 time.sleep(STATUS_INTERVAL_SECONDS)
@@ -191,9 +231,19 @@ class Migrate:
             self.logger.error('Failed to start migration, %s', e)
             exit_on_error(str(e))
 
+    def start_migration_system_update(self):
+        try:
+            migrateUrl = self.get_migration_url() + '/' + 
PLUGIN_MIGRATION_SYSTEM
+            r = requests.put(url=migrateUrl, auth=(self.admin_user, 
self.admin_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
     def start_appinfo_migration(self):
         try:
-            migrateUrl = self.get_migration_url() + '/' + 'appinfo-migration'
+            migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO
             r = requests.put(url=migrateUrl, auth=(self.admin_user, 
self.admin_pass))
             response = r.json()
             return response
@@ -203,12 +253,12 @@ class Migrate:
 
     def reset_data_migration(self):
         version = TARGET_VERSION - 1
-        body = json.dumps({'collections-entity-data': version, 
'appinfo-migration': version})
+        body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: 
version})
         try:
             r = requests.put(url=self.get_reset_migration_url(), data=body, 
auth=(self.admin_user, self.admin_pass))
             response = r.json()
-            self.logger.info('Resetting data migration versions to 
collections-entity-data=[v%s] '
-                             'and appinfo-migration=[v%s]', version, version)
+            self.logger.info('Resetting data migration versions to %s=[%s] '
+                             'and %s=[%s]', PLUGIN_ENTITYDATA, version, 
PLUGIN_APPINFO, version)
             return response
         except requests.exceptions.RequestException as e:
             self.logger.error('Failed to reset full data migration versions, 
%s', e)
@@ -216,11 +266,11 @@ class Migrate:
 
     def reset_appinfo_migration(self):
         version = TARGET_VERSION - 1
-        body = json.dumps({'appinfo-migration': version})
+        body = json.dumps({PLUGIN_APPINFO: version})
         try:
             r = requests.put(url=self.get_reset_migration_url(), data=body, 
auth=(self.admin_user, self.admin_pass))
             response = r.json()
-            self.logger.info('Resetting appinfo migration versions to 
appinfo-migration=[v%s]', version)
+            self.logger.info('Resetting appinfo migration versions to 
%s=[%s]', PLUGIN_APPINFO, version)
             return response
         except requests.exceptions.RequestException as e:
             self.logger.error('Failed to reset appinfo migration version, %s', 
e)
@@ -229,13 +279,15 @@ class Migrate:
     def is_data_migrated(self):
         status = self.check_data_migration_status()
         if status is not None:
-            entity_version = status['data']['collections-entity-data']
-            appinfo_version = status['data']['appinfo-migration']
+            entity_version = status['data'][PLUGIN_ENTITYDATA]
+            appinfo_version = status['data'][PLUGIN_APPINFO]
 
             if entity_version == TARGET_VERSION and appinfo_version == 
TARGET_VERSION:
-                self.logger.info('Full Data Migration status=[COMPLETE], 
collections-entity-data=[v%s], '
-                                 'appinfo-migration=[v%s]',
+                self.logger.info('Full Data Migration status=[COMPLETE], 
%s=[%s], '
+                                 '%s=[%s]',
+                                 PLUGIN_ENTITYDATA,
                                  entity_version,
+                                 PLUGIN_APPINFO,
                                  appinfo_version)
                 return True
             else:
@@ -245,17 +297,34 @@ class Migrate:
     def is_appinfo_migrated(self):
         status = self.check_data_migration_status()
         if status is not None:
-            appinfo_version = status['data']['appinfo-migration']
+            appinfo_version = status['data'][PLUGIN_APPINFO]
 
             if appinfo_version == TARGET_VERSION:
                 self.logger.info('AppInfo Migration status=[COMPLETE],'
-                                 'appinfo-migration=[v%s]',
+                                 '%s=[%s]',
+                                 PLUGIN_APPINFO,
                                  appinfo_version)
                 return True
             else:
                 self.logger.info('AppInfo Migration 
status=[NOTSTARTED/INPROGRESS]')
         return False
 
+    def is_migration_system_updated(self):
+        status = self.check_data_migration_status()
+        if status is not None:
+            migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM]
+
+            if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION:
+                self.logger.info('Migration System CURRENT, %s=[%s]',
+                                 PLUGIN_MIGRATION_SYSTEM,
+                                 migration_system_version)
+                return True
+            else:
+                self.logger.info('Migration System OLD, %s=[%s]',
+                                 PLUGIN_MIGRATION_SYSTEM,
+                                 migration_system_version)
+        return False
+
     def check_data_migration_status(self):
 
         try:
@@ -348,8 +417,8 @@ def init_logging(name):
 
     logger = logging.getLogger(name)
     log_file_name = './migration.log'
-    log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | 
%(levelname)s | %(message)s',
-                                      datefmt='%m/%d/%Y %I:%M:%S %p')
+    log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] 
%(levelname)s %(message)s',
+                                      datefmt='%Y-%m-%d %H:%M:%S')
 
     rotating_file = 
logging.handlers.RotatingFileHandler(filename=log_file_name,
                                                          mode='a',

Reply via email to