Repository: incubator-hawq
Updated Branches:
  refs/heads/master 981c0a939 -> ef2aef879


HAWQ-1034. Implement --repair option for hawq register.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ef2aef87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ef2aef87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ef2aef87

Branch: refs/heads/master
Commit: ef2aef87958941082a016afeea45b7bbcccb9779
Parents: 637f9d5
Author: xunzhang <xunzhang...@gmail.com>
Authored: Sat Sep 17 20:20:43 2016 +0800
Committer: Lili Ma <ictmal...@gmail.com>
Committed: Sun Sep 18 14:39:54 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 115 ++++++++++++++++++++++++++++++++++++++------
 1 file changed, 99 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ef2aef87/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index ee5275b..ffae437 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -54,8 +54,8 @@ def option_parser():
     parser.add_option('-f', '--filepath', dest='filepath', help='file name in 
HDFS')
     parser.add_option('-e', '--eof', dest='filesize', type='int', default=0, 
help='eof of the file to be registered')
     parser.add_option('-c', '--config', dest='yml_config', default='', 
help='configuration file in YAML format')
-    parser.add_option('--force', action='store_true', default=False)
-    parser.add_option('--repair', action='store_true', default=False)
+    parser.add_option('-F', '--force', dest='force', action='store_true', 
default=False)
+    parser.add_option('-R', '--repair', dest='repair', action='store_true', 
default=False)
     return parser
 
 
@@ -166,7 +166,7 @@ class GpRegisterAccessor(object):
         qry = """select attrnums from gp_distribution_policy, pg_class where 
pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" 
% tablename
         rows = self.exec_query(qry)
         if len(rows) == 0:
-            logger.error('Table %s not found in table gp_distribution_policy.' 
% tablename)
+            logger.error('Table %s is not an append-only table. There is no 
record in gp_distribution_policy table.' % tablename)
             sys.exit(1)
         if rows[0]['attrnums']:
             logger.error('Cannot register file(s) to a table which is hash 
distribuetd.')
@@ -196,6 +196,14 @@ class GpRegisterAccessor(object):
         rows = self.exec_query(query)
         return rows[0]['attrnums']
 
+    def get_bucket_number(self, tablename):
+        query = "select oid from pg_class where relname = '%s';" % 
tablename.split('.')[-1].lower()
+        rows = self.exec_query(query)
+        oid = rows[0]['oid']
+        query = "select * from gp_distribution_policy where localoid = '%s';" 
% oid
+        rows = self.exec_query(query)
+        return rows[0]['bucketnum']
+
     def get_metadata_from_database(self, tablename, seg_name):
         query = 'select segno from pg_aoseg.%s;' % seg_name
         firstsegno = len(self.exec_query(query)) + 1
@@ -233,7 +241,7 @@ class HawqRegister(object):
                 return 'force'
             elif repair:
                 if not table_existed():
-                    logger.error('--repair mode asserts the table is already 
create.')
+                    logger.error('--repair mode asserts the table has been 
already created.')
                     sys.exit(1)
                 return 'repair'
             else:
@@ -261,32 +269,76 @@ class HawqRegister(object):
         def get_metadata():
             return self.accessor.get_metadata_from_database(self.tablename, 
self.seg_name)
 
+        def get_distribution_policy():
+            return self.accessor.get_distribution_policy_info(self.tablename)
+
+        def check_policy_consistency():
+            policy = get_distribution_policy() # "" or "{1,3}"
+            if policy is None:
+                if ' '.join(self.distribution_policy.strip().split()).lower() 
== 'distributed randomly':
+                    return
+                else:
+                    logger.error('Distribution policy of %s from yaml file is 
not consistent with the policy of existing table.' % self.tablename)
+                    sys.exit(1)
+            tmp_dict = {}
+            for i, d in enumerate(self.schema):
+                tmp_dict[d['name']] = i + 1
+            # 'DISTRIBUETD BY (1,3)' -> {1,3}
+            cols = 
self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
+            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
+            if policy.strip('{').strip('}') != original_policy:
+                logger.error('Distribution policy of %s from yaml file is not 
consistent with the policy of existing table.' % self.tablename)
+                sys.exit(1)
+
+        def check_bucket_number():
+            def get_bucket_number():
+                return self.accessor.get_bucket_number(self.tablename)
+
+            if self.bucket_number != get_bucket_number():
+                logger.error('Bucket number of %s is not consistent with 
previous bucket number.' % self.tablename)
+                sys.exit(1)
+
         if self.yml:
             self.file_format, self.files, self.sizes, self.schema, 
self.distribution_policy, self.file_locations, self.bucket_number = 
option_parser_yml(self.yml)
             self.filepath = self.files[0][:self.files[0].rfind('/')] if 
self.files else ''
             check_distribution_policy()
-            if self.mode != 'force':
+            if self.mode != 'force' and self.mode != 'repair':
                 if not create_table():
                     self.mode = 'second_exist'
         else:
             self.file_format = 'Parquet'
             check_hash_type() # Usage1 only support randomly distributed table
         if not self.filepath:
+            if self.mode == 'first':
+                logger('Please specify filepath with -f option.')
+            else:
+                logger.info('Hawq Register Succeed.')
             sys.exit(0)
 
-        if self.mode == 'repair':
-            # TODO
-            # check distribution policy consistency
-            # check bucketnum, pagesize, rowgroupsize, etc
-            # check filesize smaller
-            pass
-
         self.seg_name = get_seg_name()
         self.firstsegno, self.tabledir = get_metadata()
 
+        if self.mode == 'repair':
+            if self.tabledir.strip('/') != self.filepath.strip('/'):
+                logger.error("In repair mode, file path from yaml file should 
be the same with table's path.")
+                sys.exit(1)
+            check_policy_consistency()
+            check_bucket_number()
+            existed_files, existed_sizes = 
self._get_files_in_hdfs(self.filepath)
+            existed_info = {}
+            for k, fn in enumerate(existed_files):
+                existed_info[fn] = existed_sizes[k]
+            for k, fn in enumerate(self.files):
+                if fn not in existed_files:
+                    logger.error('Can not register in repair mode since giving 
non-existing file: %s.' % fn)
+                    sys.exit(1)
+                if self.sizes[k] > existed_info[fn]:
+                    logger.error('Can not register in repair mode since giving 
larger file size: %s' % self.sizes[k])
+                    sys.exit(1)
+
         if self.mode == 'second_exist':
             if self.tabledir.strip('/') == self.filepath.strip('/'):
-                logger.error('Files to be registeted in this case should not 
be the same with table path.')
+                logger.error('Files to be registered should not be the same 
with table path.')
                 sys.exit(1)
 
         self.do_not_move, self.files_update, self.sizes_update = False, [], []
@@ -294,7 +346,7 @@ class HawqRegister(object):
             existed_files, _ = self._get_files_in_hdfs(self.tabledir)
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
-                    logger.error('In this case, you should include previous 
table files.\nOtherwise you should drop the previous table before registering 
--force.')
+                    logger.error('In force mode, you should include existing 
table files in yaml configuration file. Otherwise you should drop the previous 
table before register --force.')
                     sys.exit(1)
                 else:
                     self.do_not_move, self.files_update, self.sizes_update = 
True, self.files, self.sizes
@@ -307,6 +359,14 @@ class HawqRegister(object):
                         self.sizes_update.append(sizes_old[k])
                         self.files.remove(files_old[k])
                         self.sizes.remove(sizes_old[k])
+        elif self.mode == 'repair':
+            self.do_not_move = True
+            self.files_update, self.sizes_update = [fn for fn in self.files], 
[sz for sz in self.sizes]
+            self.files_delete = []
+            for fn in existed_files:
+                if fn not in self.files:
+                    self.files_delete.append(fn)
+            self.files, self.sizes = [], []
 
         self._check_files_and_table_in_same_hdfs_cluster(self.filepath, 
self.tabledir)
 
@@ -332,13 +392,13 @@ class HawqRegister(object):
         # check whether the files to be registered is in hdfs
         filesystem = filepath.split('://')
         if filesystem[0] != 'hdfs':
-            logger.error('Only support to register file(s) in hdfs')
+            logger.error('Only support registering file(s) in hdfs.')
             sys.exit(1)
         fileroot = filepath.split('/')
         tableroot = tabledir.split('/')
         # check the root url of them. eg: for 
'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
         if fileroot[0:3] != tableroot[0:3]:
-            logger.error("Files to be registered and the table are not in the 
same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % 
(filepath, tabledir))
+            logger.error("Files to be registered and the table are not in the 
same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'." 
% (filepath, tabledir))
             sys.exit(1)
 
     def _get_files_in_hdfs(self, filepath):
@@ -393,6 +453,15 @@ class HawqRegister(object):
                     logger.error('Fail to move %s to %s' % (srcfile, dstfile))
                     sys.exit(1)
 
+    def _delete_files_in_hdfs(self):
+        for fn in self.files_delete:
+            hdfscmd = 'hdfs dfs -rm %s' % fn
+            sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
+            result = local_ssh(hdfscmd, logger)
+            if result != 0:
+                logger.error('Fail to delete %s ' % fn)
+                sys.exit(1)
+
     def _modify_metadata(self, mode):
         if mode == 'insert':
             eofs = self.sizes
@@ -434,6 +503,15 @@ class HawqRegister(object):
             query += "end transaction;"
         return self.utility_accessor.update_catalog(query)
 
+    def _delete_metadata(self):
+        query = "set allow_system_table_mods='dml';"
+        query += "begin transaction;"
+        segno_lst = [fn.strip().split('/')[-1] for fn in self.files_delete]
+        for seg in segno_lst:
+            query += "delete from pg_aoseg.%s where segno = '%s';" % 
(self.seg_name, seg)
+        query += "end transaction;"
+        return self.utility_accessor.update_catalog(query)
+
     def register(self):
         if not self.do_not_move:
             self._move_files_in_hdfs()
@@ -442,6 +520,11 @@ class HawqRegister(object):
         else:
             if self.mode == 'force':
                 self._modify_metadata('update')
+            elif self.mode == 'repair':
+                self._modify_metadata('update')
+                if self.files_delete:
+                    self._delete_files_in_hdfs()
+                    self._delete_metadata()
             else:
                 self._modify_metadata('insert')
         logger.info('Hawq Register Succeed.')

Reply via email to