Repository: incubator-hawq Updated Branches: refs/heads/master 7ab73156f -> 45adcbdb3
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/45adcbdb/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 492b151..67a188d 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -123,6 +123,25 @@ def register_yaml_dict_check(D, table_column_num, src_tablename): logger.error('Column number of table in yaml file is not equals to the column number of table %s.' % src_tablename) sys.exit(1) +def ispartition(yml_file): + import yaml + try: + with open(yml_file, 'r') as f: + params = yaml.load(f) + except yaml.scanner.ScannerError as e: + print e + sys.exit(1) + + if params['FileFormat'].lower() == 'parquet': + Format = 'Parquet' + else: #AO format + Format = 'AO' + Format_FileLocations = '%s_FileLocations' % Format + if params.get(Format_FileLocations): + partitionby = params.get(Format_FileLocations).get('PartitionBy') + if partitionby: + return True + return False class FailureHandler(object): def __init__(self, conn): @@ -256,6 +275,23 @@ class GpRegisterAccessor(object): rows = self.exec_query(query) return rows[0]['attrnums'] + def get_partition_info(self, tablename): + ''' Get partition information from pg_partitions, return a constraint-tablename dictionary ''' + query = "SELECT partitiontablename, partitionboundary FROM pg_partitions WHERE tablename = '%s'" % tablename + return self.exec_query(query) + + def get_partitionby(self, tablename): + query = "SELECT partitionschemaname, partitiontablename, partitionname, partitiontype, parentpartitiontablename, partitionboundary FROM pg_partitions WHERE tablename='%s';" % tablename + parition_type = self.exec_query(query)[0]['partitiontype'] + query = "SELECT columnname, partitionlevel FROM pg_partition_columns WHERE tablename='%s' ORDER BY position_in_partition_key;" % tablename + partition_columnname = self.exec_query(query)[0]['columnname'] + partitionby = 'PARTITION BY %s (%s)' % (parition_type, partition_columnname) + return partitionby + + def get_partition_num(self, tablename): + query = "SELECT partitionschemaname from pg_partitions WHERE tablename='%s';" % tablename + return len(self.exec_query(query)) + def get_bucket_number(self, tablename): query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() rows = self.exec_query(query) @@ -307,7 +343,13 @@ class HawqRegister(object): self.mode = self._init_mode(options.force, options.repair) self.srcfiles = [] self.dstfiles = [] - self._init() + self.files_same_path = [] + self.sizes_same_path = [] + self.segnos_same_path = [] + self.tupcounts_same_path = [] + self.varblockcounts_same_path = [] + self.eofuncompresseds_same_path = [] + self.segnos_same_path = [] def _init_mode(self, force, repair): def table_existed(): @@ -374,7 +416,8 @@ class HawqRegister(object): def _set_yml_data(self, file_format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, file_locations,\ bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\ - partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding): + partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, \ + partitions_tupcounts, partitions_eofuncompresseds, partitions_varblockcounts, encoding): self.file_format = file_format self.files = files self.sizes = sizes @@ -394,6 +437,9 @@ class HawqRegister(object): self.partitions_checksum = partitions_checksum self.partitions_filepaths = partitions_filepaths self.partitions_filesizes = partitions_filesizes + self.partitions_tupcounts = partitions_tupcounts + self.partitions_eofuncompresseds = partitions_eofuncompresseds + self.partitions_varblockcounts = partitions_varblockcounts self.encoding = encoding def _option_parser_yml(self, yml_file): @@ -414,6 +460,9 @@ class HawqRegister(object): partitions_checksum = [] partitions_compression_level = [] partitions_compression_type = [] + partitions_tupcounts = [] + partitions_eofuncompresseds = [] + partitions_varblockcounts = [] files, sizes, tupcounts, eofuncompresseds, varblockcounts = [], [], [], [], [] if params['FileFormat'].lower() == 'parquet': @@ -422,9 +471,6 @@ class HawqRegister(object): Format = 'AO' Format_FileLocations = '%s_FileLocations' % Format partitionby = params.get(Format_FileLocations).get('PartitionBy') - if partitionby: - logger.info('Partition table is not supported in current release of hawq register.') - sys.exit(0) if params.get(Format_FileLocations).get('Partitions') and len(params[Format_FileLocations]['Partitions']): partitions_checksum = [d['Checksum'] for d in params[Format_FileLocations]['Partitions']] partitions_compression_level = [d['CompressionLevel'] for d in params[Format_FileLocations]['Partitions']] @@ -435,6 +481,9 @@ class HawqRegister(object): for pfile in partitions_files: partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile]) partitions_filesizes.append([item['size'] for item in pfile]) + partitions_tupcounts.append([item['tupcount'] if item.has_key('tupcount') else -1 for item in pfile]) + partitions_eofuncompresseds.append([item['eofuncompressed'] if item.has_key('eofuncompressed') else -1 for item in pfile]) + partitions_varblockcounts.append([item['varblockcount'] if item.has_key('varblockcount') else -1 for item in pfile]) partitions_name = [d['Name'] for d in params[Format_FileLocations]['Partitions']] if len(params[Format_FileLocations]['Files']): for ele in params[Format_FileLocations]['Files']: @@ -446,7 +495,7 @@ class HawqRegister(object): encoding = params['Encoding'] bucketNum = params['Bucketnum'] if params['Distribution_Policy'].startswith('DISTRIBUTED BY') else 6 - self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], bucketNum, partitionby, partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding) + self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], bucketNum, partitionby, partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, partitions_tupcounts, partitions_eofuncompresseds, partitions_varblockcounts, encoding) # check conflicting distributed policy @@ -528,13 +577,7 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) - def _init(self): - self.files_same_path = [] - self.sizes_same_path = [] - self.tupcounts_same_path = [] - self.varblockcounts_same_path = [] - self.eofuncompresseds_same_path = [] - self.segnos_same_path = [] + def prepare(self): if self.yml: self._option_parser_yml(options.yml_config) self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' @@ -673,6 +716,27 @@ class HawqRegister(object): if self.file_format == 'Parquet': self._check_parquet_format(self.files) + def test_set_move_files_in_hdfs(self): + ''' Output of print shoud be: + self.files_update = ['1', '2', '3'] + self.files_same_path = ['5', '6', 'a'] + self.srcfiles=['5', '6', 'a', '1', '2', '3'] + self.dstfiles=['5', '6', '4', '7' , '8', '9'] + ''' + self.firstsegno = 4 + self.files_update = ['1', '2', '3', '5', '6', 'a'] + self.sizes_update = [1, 2, 3, 4, 5, 6] + self.files_append = ['1', '2', '3'] + self.tupcounts_update = [1, 2, 3, 4, 5, 6] + self.eofuncompresseds_update = [1, 2, 3, 4, 5, 6] + self.varblockcounts_update = [1, 2, 3, 4, 5, 6] + self.tabledir = '' + self._set_move_files_in_hdfs() + print self.files_update + print self.files_same_path + print self.srcfiles + print self.dstfiles + def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir): '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster''' if not filepath: @@ -915,6 +979,88 @@ class HawqRegister(object): logger.info('Hawq Register Succeed.') +class HawqRegisterPartition(HawqRegister): + def __init__(self, options, table, utility_conn, conn, failure_handler): + HawqRegister.__init__(self, options, table, utility_conn, conn, failure_handler) + + def _get_partition_info(self): + dic = {} + for ele in self.accessor.get_partition_info(self.dst_table_name.split('.')[-1]): + dic[ele['partitionboundary']] = ele['partitiontablename'] + return dic + + def _check_partitionby(self): + def get_partitionby(): + return self.accessor.get_partitionby(self.dst_table_name.split('.')[-1]) + + if self.partitionby != get_partitionby(): + logger.error('PartitionBy of %s is not consistent with previous partitionby.' % self.tablename) + self.failure_handler.rollback() + sys.exit(1) + + def _check_partition_num(self): + def get_partition_num(): + return self.accessor.get_partition_num(self.dst_table_name.split('.')[-1]) + + if get_partition_num() < len(self.partitions_name): + logger.error('Partition Number of %s is not consistent with previous partition number.' % self.tablename) + self.failure_handler.rollback() + sys.exit(1) + + def _check_duplicate_constraint(self): + partitions_constraint = sorted(self.partitions_constraint) + for k, _ in enumerate(partitions_constraint): + if k < len(partitions_constraint) - 1 and partitions_constraint[k] == partitions_constraint[k+1]: + logger.error('Partition Constraint "%s" in table %s is duplicated' % (partitions_constraint[k], self.tablename)) + self.failure_handler.rollback() + sys.exit(1) + + def prepare(self): + if self.yml: + self._option_parser_yml(options.yml_config) + else: + if self._is_folder(self.filepath) and self.filesize: + logger.error('-e option is only supported with single file case.') + sys.exit(1) + self.file_format = 'Parquet' + self._check_hash_type() # Usage1 only support randomly distributed table + for k, pn in enumerate(self.partitions_name): + self.tablename = pn + self.files = self.partitions_filepaths[k] + self.sizes = self.partitions_filesizes[k] + if self.yml: + self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' + self._check_file_not_folder() + if self.yml: + self._check_database_encoding() + if self.mode != 'repair': + if not self._create_table() and self.mode != 'force': + self.mode = 'usage2_table_exist' + self._check_partitionby() + self._check_partition_num() + partitions = self._get_partition_info() + self.queries = "set allow_system_table_mods='dml';" + self.queries += "begin transaction;" + self._check_duplicate_constraint() + for k, pn in enumerate(self.partitions_name): + self.constraint = self.partitions_constraint[k] + if not partitions.has_key(self.constraint): + logger.error('Partition Constraint "%s" is not in table %s' % (self.constraint, self.tablename)) + self.failure_handler.rollback() + sys.exit(1) + self.tablename = partitions[self.constraint] + self.files = self.partitions_filepaths[k] + self.sizes = self.partitions_filesizes[k] + self.tupcounts = self.partitions_tupcounts[k] + self.eofuncompresseds = self.partitions_eofuncompresseds[k] + self.varblockcounts = self.partitions_varblockcounts[k] + self._do_check() + self._prepare_register() + self.queries += "end transaction;" + + def register(self): + HawqRegister.register(self) + def main(options, args): def connectdb(options): ''' @@ -939,10 +1085,41 @@ def main(options, args): failure_handler = FailureHandler(conn) # register - ins = HawqRegister(options, args[0], utility_conn, conn, failure_handler) + if options.yml_config and ispartition(options.yml_config): + ins = HawqRegisterPartition(options, args[0], utility_conn, conn, failure_handler) + else: + ins = HawqRegister(options, args[0], utility_conn, conn, failure_handler) + ins.prepare() ins.register() conn.close() +def test(options, args): + def connectdb(options): + ''' + Trying to connect database, return a connection object. + If failed to connect, raise a pg.InternalError + ''' + url = dbconn.DbURL(hostname=options.host, port=options.port, + dbname=options.database, username=options.user) + logger.info('try to connect database %s:%s %s' % (url.pghost, url.pgport, url.pgdb)) + utility_conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport, + user=url.pguser, passwd=url.pgpass, opt='-c gp_session_role=utility') + conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport, + user=url.pguser, passwd=url.pgpass) + return utility_conn, conn + + # connect db + try: + utility_conn, conn = connectdb(options) + except pg.InternalError: + logger.error('Fail to connect to database, this script can only be run when database is up.') + return 1 + + failure_handler = FailureHandler(conn) + # register + ins = HawqRegister(options, args[0], utility_conn, conn, failure_handler) + ins.test_set_move_files_in_hdfs() + if __name__ == '__main__': parser = option_parser()
