Repository: incubator-hawq Updated Branches: refs/heads/master 921b908ef -> a4f57020c
HAWQ-991. Support registering hash distribution table into random distribution table. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/a4f57020 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/a4f57020 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/a4f57020 Branch: refs/heads/master Commit: a4f57020c0b88a860fa5cea6060c3c17c7434c08 Parents: 921b908 Author: xunzhang <[email protected]> Authored: Fri Sep 23 15:49:20 2016 +0800 Committer: Lili Ma <[email protected]> Committed: Fri Sep 23 16:41:46 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/a4f57020/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 3661f3e..b2c85ea 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -117,7 +117,7 @@ def register_yaml_dict_check(D, table_column_num, src_tablename): yml_column_num = len(D['AO_Schema']) if table_column_num != yml_column_num and table_column_num > 0: logger.error('Column number of table in yaml file is not equals to the column number of table %s.' % src_tablename) - sys.exit(1) + sys.exit(1) class FailureHandler(object): @@ -136,7 +136,8 @@ class FailureHandler(object): return ' '.join(lst[:-2] + [lst[-1], lst[-2]]) def rollback(self): - logger.info('Error found, Hawqregister starts to rollback...') + if len(self.operations) != 0: + logger.info('Error found, Hawqregister starts to rollback...') for (typ, cmd) in reversed(self.operations): if typ == 'SQL': sql = self.assemble_SQL(cmd) @@ -153,7 +154,8 @@ class FailureHandler(object): if result != 0: logger.error('Fail to rollback: %s.' % hdfscmd) sys.exit(1) - logger.info('Hawq Register Rollback Succeed.') + if len(self.operations) != 0: + logger.info('Hawq Register Rollback Finished.') class GpRegisterAccessor(object): @@ -221,11 +223,9 @@ class GpRegisterAccessor(object): rows = self.exec_query(qry) if len(rows) == 0: logger.error('Table %s is not an append-only table. There is no record in gp_distribution_policy table.' % tablename) - self.failure_handler.rollback() sys.exit(1) if rows[0]['attrnums']: logger.error('Cannot register file(s) to a table which is hash distributed.') - self.failure_handler.rollback() sys.exit(1) # pg_paqseg_# @@ -236,15 +236,13 @@ class GpRegisterAccessor(object): rows = self.exec_query(query) if len(rows) == 0: logger.error('table "%s" not found in db "%s"' % (tablename, database)) - self.failure_handler.rollback() - sys.exit(1) + return ('', False) relname = rows[0]['relname'] if fmt == 'Parquet': if relname.find('paq') == -1: logger.error("table '%s' is not parquet format" % tablename) - self.failure_handler.rollback() - sys.exit(1) - return relname + return ('', False) + return (relname, True) def get_distribution_policy_info(self, tablename): query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() @@ -353,7 +351,8 @@ class HawqRegister(object): except pg.DatabaseError as e: print e sys.exit(1) - self.failure_handler.commit(('SQL', query)) + if ret: + self.failure_handler.commit(('SQL', query)) return ret def get_seg_name(): @@ -368,12 +367,7 @@ class HawqRegister(object): def check_policy_consistency(): policy = get_distribution_policy() # "" or "{1,3}" if policy is None: - if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly': - return - else: - logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename) - self.failure_handler.rollback() - sys.exit(1) + return tmp_dict = {} for i, d in enumerate(self.schema): tmp_dict[d['name']] = i + 1 @@ -535,7 +529,10 @@ class HawqRegister(object): logger.info('Hawq Register Succeed.') sys.exit(0) - self.seg_name = get_seg_name() + (self.seg_name, tmp_ret) = get_seg_name() + if not tmp_ret: + self.failure_handler.rollback() + sys.exit(1) self.firstsegno, self.tabledir = get_metadata() if self.mode == 'repair': @@ -676,9 +673,9 @@ class HawqRegister(object): if out == '0': continue hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f - result1 = local_ssh(hdfscmd, logger) + result1 = local_ssh(hdfscmd) hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f - result2 = local_ssh(hdfscmd, logger) + result2 = local_ssh(hdfscmd) if result1 or result2: logger.error('File %s is not parquet format' % f) self.failure_handler.rollback()
