Repository: incubator-hawq Updated Branches: refs/heads/master 28d192d23 -> c7d6a7f52
HAWQ-991. Refator HAWQ Register code for partition table. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/c7d6a7f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/c7d6a7f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/c7d6a7f5 Branch: refs/heads/master Commit: c7d6a7f52b03ece32d10b1dd9088a91e14565384 Parents: 28d192d Author: Chunling Wang <[email protected]> Authored: Sat Oct 1 22:06:51 2016 +0800 Committer: Wen Lin <[email protected]> Committed: Mon Oct 10 17:44:06 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 461 +++++++++++++++++++++----------------------- 1 file changed, 225 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c7d6a7f5/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 69809f7..29b3a30 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -252,11 +252,6 @@ class GpRegisterAccessor(object): rows = self.exec_query(query) return rows[0]['attrnums'] - def get_partition_info(self, tablename): - ''' Get partition information from pg_partitions, return a constraint-tablename dictionary ''' - query = "SELECT partitiontablename, partitionboundary FROM pg_partitions WHERE tablename = '%s'" % tablename - return self.exec_query(query) - def get_bucket_number(self, tablename): query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() rows = self.exec_query(query) @@ -299,17 +294,20 @@ class HawqRegister(object): self.yml = options.yml_config self.filepath = options.filepath self.database = options.database + self.dst_table_name = table.lower() self.tablename = table.lower() self.filesize = options.filesize self.accessor = GpRegisterAccessor(conn) self.utility_accessor = GpRegisterAccessor(utility_conn) self.failure_handler = failure_handler self.mode = self._init_mode(options.force, options.repair) + self.srcfiles = [] + self.dstfiles = [] self._init() def _init_mode(self, force, repair): def table_existed(): - return self.accessor.get_table_existed(self.tablename) + return self.accessor.get_table_existed(self.dst_table_name) if self.yml: if force: @@ -325,231 +323,221 @@ class HawqRegister(object): return 'usage2_table_not_exist' else: if not table_existed(): - logger.error('Table %s does not exist.\nYou should create table before registering the data.' % self.tablename) + logger.error('Table %s does not exist.\nYou should create table before registering the data.' % self.dst_table_name) sys.exit(1) else: return 'usage1' - def _init(self): - def check_hash_type(): - self.accessor.check_hash_type(self.tablename) - - # check conflicting distributed policy - def check_distribution_policy(): - if self.distribution_policy.startswith('DISTRIBUTED BY'): - if len(self.files) % self.bucket_number != 0: - logger.error('Files to be registered must be multiple times to the bucket number of hash table.') - self.failure_handler.rollback() - sys.exit(1) - - def check_database_encoding(): - encoding_indx = self.accessor.get_database_encoding_indx(self.database) - encoding = self.accessor.get_database_encoding(encoding_indx) - if self.encoding.strip() != encoding: - logger.error('Database encoding from yaml configuration file(%s) is not consistent with encoding from input args(%s).' % (self.encoding, encoding)) - sys.exit(1) + def _check_hash_type(self): + self.accessor.check_hash_type(self.dst_table_name) - def create_table(): - try: - (ret, query) = self.accessor.do_create_table(self.src_table_name, self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number, + def _create_table(self): + try: + (ret, query) = self.accessor.do_create_table(self.src_table_name, self.dst_table_name, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number, self.partitionby, self.partitions_constraint, self.partitions_name) - except pg.DatabaseError as e: - print e + except pg.DatabaseError as e: + print e + sys.exit(1) + if ret: + self.failure_handler.commit(('SQL', query)) + return ret + + def _check_database_encoding(self): + encoding_indx = self.accessor.get_database_encoding_indx(self.database) + encoding = self.accessor.get_database_encoding(encoding_indx) + if self.encoding.strip() != encoding: + logger.error('Database encoding from yaml configuration file(%s) is not consistent with encoding from input args(%s).' % (self.encoding, encoding)) + sys.exit(1) + + def _check_policy_consistency(self): + policy = self._get_distribution_policy() # "" or "{1,3}" + if policy is None: + return + if self.distribution_policy == 'DISTRIBUTED RANDOMLY': + logger.error('Distribution policy of %s from yaml is not consistent with the policy of existing table.' % self.tablename) + self.failure_handler.rollback() + sys.exit(1) + tmp_dict = {} + for i, d in enumerate(self.schema): + tmp_dict[d['name']] = i + 1 + # 'DISTRIBUETD BY (1,3)' -> {1,3} + cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',') + original_policy = ','.join([str(tmp_dict[col]) for col in cols]) + if policy.strip('{').strip('}') != original_policy: + logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.dst_table_name) + self.failure_handler.rollback() + sys.exit(1) + + def _set_yml_dataa(self, file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\ + bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\ + partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding): + self.file_format = file_format + self.files = files + self.sizes = sizes + self.src_table_name = tablename + self.schema = schema + self.distribution_policy = distribution_policy + self.file_locations = file_locations + self.bucket_number = bucket_number + self.partitionby = partitionby + self.partitions_constraint = partitions_constraint + self.partitions_name = partitions_name + self.partitions_compression_level = partitions_compression_level + self.partitions_compression_type = partitions_compression_type + self.partitions_checksum = partitions_checksum + self.partitions_filepaths = partitions_filepaths + self.partitions_filesizes = partitions_filesizes + self.encoding = encoding + + def _option_parser_yml(self, yml_file): + import yaml + try: + with open(yml_file, 'r') as f: + params = yaml.load(f) + except yaml.scanner.ScannerError as e: + print e + self.failure_handler.rollback() + sys.exit(1) + table_column_num = self.accessor.get_table_column_num(self.tablename) + register_yaml_dict_check(params, table_column_num, self.tablename) + partitions_filepaths = [] + partitions_filesizes = [] + partitions_constraint = [] + partitions_name = [] + partitions_checksum = [] + partitions_compression_level = [] + partitions_compression_type = [] + files, sizes = [], [] + + if params['FileFormat'].lower() == 'parquet': + Format = 'Parquet' + else: #AO format + Format = 'AO' + Format_FileLocations = '%s_FileLocations' % Format + partitionby = params.get(Format_FileLocations).get('PartitionBy') + if partitionby: + logger.info('Partition table is not supported in current release of hawq register.') + sys.exit(0) + if params.get(Format_FileLocations).get('Partitions') and len(params[Format_FileLocations]['Partitions']): + partitions_checksum = [d['Checksum'] for d in params[Format_FileLocations]['Partitions']] + partitions_compression_level = [d['CompressionLevel'] for d in params[Format_FileLocations]['Partitions']] + partitions_compression_type = [d['CompressionType'] for d in params[Format_FileLocations]['Partitions']] + partitions_constraint = [d['Constraint'] for d in params[Format_FileLocations]['Partitions']] + partitions_files = [d['Files'] for d in params[Format_FileLocations]['Partitions']] + if len(partitions_files): + for pfile in partitions_files: + partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile]) + partitions_filesizes.append([item['size'] for item in pfile]) + partitions_name = [d['Name'] for d in params[Format_FileLocations]['Partitions']] + if len(params[Format_FileLocations]['Files']): + files, sizes = [params['DFS_URL'] + d['path'] for d in params[Format_FileLocations]['Files']], [d['size'] for d in params[Format_FileLocations]['Files']] + encoding = params['Encoding'] + self._set_yml_dataa(Format, files, sizes, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], params['Bucketnum'], partitionby,\ + partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding) + + + # check conflicting distributed policy + def _check_distribution_policy(self): + if self.distribution_policy.startswith('DISTRIBUTED BY'): + if len(self.files) % self.bucket_number != 0: + logger.error('Files to be registered must be multiple times to the bucket number of hash table.') + self.failure_handler.rollback() sys.exit(1) - if ret: - self.failure_handler.commit(('SQL', query)) - return ret - def get_seg_name(): - return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format) + def _get_seg_name(self): + return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format) - def get_metadata(): - return self.accessor.get_metadata_from_database(self.tablename, self.seg_name) + def _get_metadata(self): + return self.accessor.get_metadata_from_database(self.tablename, self.seg_name) - def get_metadata_from_table(): - return self.accessor.get_metadata_from_seg_name(self.seg_name) + def _get_metadata_from_table(self): + return self.accessor.get_metadata_from_seg_name(self.seg_name) - def get_distribution_policy(): - return self.accessor.get_distribution_policy_info(self.tablename) + def _get_distribution_policy(self): + return self.accessor.get_distribution_policy_info(self.tablename) - def check_policy_consistency(): - policy = get_distribution_policy() # "" or "{1,3}" - if policy is None: - return - if self.distribution_policy == 'DISTRIBUTED RANDOMLY': - logger.error('Distribution policy of %s from yaml is not consistent with the policy of existing table.' % self.tablename) - self.failure_handler.rollback() - sys.exit(1) - tmp_dict = {} - for i, d in enumerate(self.schema): - tmp_dict[d['name']] = i + 1 - # 'DISTRIBUETD BY (1,3)' -> {1,3} - cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',') - original_policy = ','.join([str(tmp_dict[col]) for col in cols]) - if policy.strip('{').strip('}') != original_policy: - logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename) - self.failure_handler.rollback() + def _check_bucket_number(self): + def get_bucket_number(): + return self.accessor.get_bucket_number(self.tablename) + + if self.bucket_number != get_bucket_number(): + logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename) + self.failure_handler.rollback() + sys.exit(1) + + def _check_file_not_folder(self): + for fn in self.files: + hdfscmd = 'hadoop fs -test -f %s' % fn + if local_ssh(hdfscmd, logger): + logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn) sys.exit(1) - def check_bucket_number(): - def get_bucket_number(): - return self.accessor.get_bucket_number(self.tablename) + def _is_folder(self, filepath): + hdfscmd = 'hadoop fs -test -d %s' % filepath + if local_ssh(hdfscmd, logger): + return False + else: + return True - if self.bucket_number != get_bucket_number(): - logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename) + def _check_sizes_valid(self): + for sz in self.sizes: + if type(sz) != type(1): + logger.error('File size(%s) in yaml configuration file should be int type.' % sz) self.failure_handler.rollback() sys.exit(1) - - def set_yml_dataa(file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\ - bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\ - partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding): - self.file_format = file_format - self.files = files - self.sizes = sizes - self.src_table_name = tablename - self.schema = schema - self.distribution_policy = distribution_policy - self.file_locations = file_locations - self.bucket_number = bucket_number - self.partitionby = partitionby - self.partitions_constraint = partitions_constraint - self.partitions_name = partitions_name - self.partitions_compression_level = partitions_compression_level - self.partitions_compression_type = partitions_compression_type - self.partitions_checksum = partitions_checksum - self.partitions_filepaths = partitions_filepaths - self.partitions_filesizes = partitions_filesizes - self.encoding = encoding - - def option_parser_yml(yml_file): - import yaml - try: - with open(yml_file, 'r') as f: - params = yaml.load(f) - except yaml.scanner.ScannerError as e: - print e + if sz < 0: + logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz) + self.failure_handler.rollback() + sys.exit(1) + for k, fn in enumerate(self.files): + hdfscmd = 'hadoop fs -du %s' % fn + _, out, _ = local_ssh_output(hdfscmd) + if self.sizes[k] > int(out.strip().split()[0]): + logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn)) self.failure_handler.rollback() sys.exit(1) - - table_column_num = self.accessor.get_table_column_num(self.tablename) - register_yaml_dict_check(params, table_column_num, self.tablename) - partitions_filepaths = [] - partitions_filesizes = [] - partitions_constraint = [] - partitions_name = [] - partitions_checksum = [] - partitions_compression_level = [] - partitions_compression_type = [] - files, sizes = [], [] - - if params['FileFormat'].lower() == 'parquet': - partitionby = params.get('Parquet_FileLocations').get('PartitionBy') - if partitionby: - logger.info('Partition table is not supported in current release of hawq register.') - sys.exit(0) - if params.get('Parquet_FileLocations').get('Partitions') and len(params['Parquet_FileLocations']['Partitions']): - partitions_checksum = [d['Checksum'] for d in params['Parquet_FileLocations']['Partitions']] - partitions_compression_level = [d['CompressionLevel'] for d in params['Parquet_FileLocations']['Partitions']] - partitions_compression_type = [d['CompressionType'] for d in params['Parquet_FileLocations']['Partitions']] - partitions_constraint = [d['Constraint'] for d in params['Parquet_FileLocations']['Partitions']] - partitions_files = [d['Files'] for d in params['Parquet_FileLocations']['Partitions']] - if len(partitions_files): - for pfile in partitions_files: - partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile]) - partitions_filesizes.append([item['size'] for item in pfile]) - partitions_name = [d['Name'] for d in params['Parquet_FileLocations']['Partitions']] - if len(params['Parquet_FileLocations']['Files']): - files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']] - encoding = params['Encoding'] - set_yml_dataa('Parquet', files, sizes, params['TableName'], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'], partitionby,\ - partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding) - - else: #AO format - partitionby = params.get('AO_FileLocations').get('PartitionBy') - if partitionby: - logger.info('Partition table is not supported in current release of hawq register.') - sys.exit(0) - if params.get('AO_FileLocations').get('Partitions') and len(params['AO_FileLocations']['Partitions']): - partitions_checksum = [d['Checksum'] for d in params['AO_FileLocations']['Partitions']] - partitions_compressionLevel = [d['CompressionLevel'] for d in params['AO_FileLocations']['Partitions']] - partitions_compressionType = [d['CompressionType'] for d in params['AO_FileLocations']['Partitions']] - partitions_constraint = [d['Constraint'] for d in params['AO_FileLocations']['Partitions']] - partitions_files = [d['Files'] for d in params['AO_FileLocations']['Partitions']] - if len(partitions_files): - for pfile in partitions_files: - partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile]) - partitions_filesizes.append([item['size'] for item in pfile]) - partitions_name = [d['Name'] for d in params['AO_FileLocations']['Partitions']] - if len(params['AO_FileLocations']['Files']): - files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']], [d['size'] for d in params['AO_FileLocations']['Files']] - encoding = params['Encoding'] - set_yml_dataa('AO', files, sizes, params['TableName'], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\ - partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding) - - def check_file_not_folder(): - for fn in self.files: - hdfscmd = 'hadoop fs -test -f %s' % fn - if local_ssh(hdfscmd, logger): - logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn) - sys.exit(1) - - def is_folder(filepath): - hdfscmd = 'hadoop fs -test -d %s' % filepath - if local_ssh(hdfscmd, logger): - return False - else: - return True - def check_sizes_valid(): - for sz in self.sizes: - if type(sz) != type(1): - logger.error('File size(%s) in yaml configuration file should be int type.' % sz) - self.failure_handler.rollback() - sys.exit(1) - if sz < 0: - logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz) + def _check_no_regex_filepath(self, files): + for fn in files: + tmp_lst = fn.split('/') + for v in tmp_lst: + if v == '.': + logger.error('Hawq register does not support file path with regex: %s.' % fn) self.failure_handler.rollback() sys.exit(1) - for k, fn in enumerate(self.files): - hdfscmd = 'hadoop fs -du %s' % fn - _, out, _ = local_ssh_output(hdfscmd) - if self.sizes[k] > int(out.strip().split()[0]): - logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn)) + for ch in ['..', '*']: + if fn.find(ch) != -1: + logger.error('Hawq register does not support file path with regex: %s.' % fn) self.failure_handler.rollback() sys.exit(1) - def check_no_regex_filepath(files): - for fn in files: - tmp_lst = fn.split('/') - for v in tmp_lst: - if v == '.': - logger.error('Hawq register does not support file path with regex: %s.' % fn) - self.failure_handler.rollback() - sys.exit(1) - for ch in ['..', '*']: - if fn.find(ch) != -1: - logger.error('Hawq register does not support file path with regex: %s.' % fn) - self.failure_handler.rollback() - sys.exit(1) - + def _init(self): if self.yml: - option_parser_yml(options.yml_config) + self._option_parser_yml(options.yml_config) self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' - check_file_not_folder() - check_database_encoding() + self._check_file_not_folder() + self._check_database_encoding() if self.mode != 'repair': - if not create_table() and self.mode != 'force': + if not self._create_table() and self.mode != 'force': self.mode = 'usage2_table_exist' - check_bucket_number() - check_distribution_policy() - check_policy_consistency() - check_no_regex_filepath(self.files) else: - if is_folder(self.filepath) and self.filesize: + if self._is_folder(self.filepath) and self.filesize: logger.error('-e option is only supported with single file case.') sys.exit(1) self.file_format = 'Parquet' - check_hash_type() # Usage1 only support randomly distributed table + self._check_hash_type() # Usage1 only support randomly distributed table + self.queries = "set allow_system_table_mods='dml';" + self.queries += "begin transaction;" + self._do_check() + self._prepare_register() + self.queries += "end transaction;" + + def _do_check(self): + if self.yml: + self._check_bucket_number() + self._check_distribution_policy() + self._check_policy_consistency() + self._check_no_regex_filepath(self.files) if not self.filepath: if self.mode == 'usage1': logger.info('Please specify filepath with -f option.') @@ -557,18 +545,18 @@ class HawqRegister(object): logger.info('Hawq Register Succeed.') sys.exit(0) - (self.seg_name, tmp_ret) = get_seg_name() + (self.seg_name, tmp_ret) = self._get_seg_name() if not tmp_ret: self.failure_handler.rollback() sys.exit(1) - self.firstsegno, self.tabledir = get_metadata() + self.firstsegno, self.tabledir = self._get_metadata() if self.mode == 'repair': if self.tabledir.strip('/') != self.filepath.strip('/'): logger.error("In repair mode, file path from yaml file should be the same with table's path.") self.failure_handler.rollback() sys.exit(1) - seg_list, existed_sizes = get_metadata_from_table() + seg_list, existed_sizes = self._get_metadata_from_table() existed_files = [self.tabledir + seg for seg in seg_list] existed_info = {} for k, fn in enumerate(existed_files): @@ -590,13 +578,13 @@ class HawqRegister(object): sys.exit(1) if not self.yml: - check_no_regex_filepath([self.filepath]) + self._check_no_regex_filepath([self.filepath]) self.files, self.sizes = self._get_files_in_hdfs(self.filepath) self.do_not_move, self.files_update, self.sizes_update = False, [], [] self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes] if self.mode == 'force': - seg_list, _ = get_metadata_from_table() + seg_list, _ = self._get_metadata_from_table() existed_files = [self.tabledir + seg for seg in seg_list] if len(self.files) == len(existed_files): if sorted(self.files) != sorted(existed_files): @@ -641,17 +629,11 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) self.sizes = [self.filesize] - check_sizes_valid() + self._check_sizes_valid() if self.file_format == 'Parquet': self._check_parquet_format(self.files) - def _get_partition_info(self): - dic = {} - for ele in self.accessor.get_partition_info(self.tablename): - dic[ele['partitionboundary']] = ele['partitiontablename'] - return dic - def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir): '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster''' if not filepath: @@ -714,13 +696,17 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) - def _move_files_in_hdfs(self): - '''Move file(s) in src path into the folder correspoding to the target table''' + def _set_move_files_in_hdfs(self): segno = self.firstsegno for f in self.newfiles: - srcfile = f - dstfile = self.tabledir + str(segno) + self.srcfiles.append(f) + self.dstfiles.append(self.tabledir + str(segno)) segno += 1 + + def _move_files_in_hdfs(self): + '''Move file(s) in src path into the folder correspoding to the target table''' + for k, srcfile in enumerate(self.srcfiles): + dstfile = self.dstfiles[k] if srcfile != dstfile: hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile) sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) @@ -733,7 +719,7 @@ class HawqRegister(object): def _delete_files_in_hdfs(self): for fn in self.files_delete: - hdfscmd = 'hadoop dfs -rm %s' % fn + hdfscmd = 'hadoop fs -rm %s' % fn sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) result = local_ssh(hdfscmd, logger) if result != 0: @@ -741,10 +727,10 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) - def _modify_metadata(self, mode): + def _set_modify_metadata(self, mode): if mode == 'insert': eofs = self.sizes - query = "set allow_system_table_mods='dml';" + query = "" if self.file_format == 'Parquet': query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) for k, eof in enumerate(eofs[1:]): @@ -754,12 +740,11 @@ class HawqRegister(object): for k, eof in enumerate(eofs[1:]): query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1) query += ';' + self.queries += query elif mode == 'force': eofs = self.sizes - query = "set allow_system_table_mods='dml';" - query += "begin transaction;" segno_lst = [f.split('/')[-1] for f in self.files] - query += "delete from pg_aoseg.%s;" % (self.seg_name) + query = "delete from pg_aoseg.%s;" % (self.seg_name) firstsegno = 1 if self.file_format == 'Parquet': query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1) @@ -770,11 +755,10 @@ class HawqRegister(object): for k, eof in enumerate(eofs[1:]): query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) query += ';' - query += "end transaction;" + self.queries += query elif mode == 'update': eofs = self.sizes_update - query = "set allow_system_table_mods='dml';" - query += "begin transaction;" + query = "" segno_lst = [f.split('/')[-1] for f in self.files_update] if self.file_format == 'Parquet': for i, eof in enumerate(eofs): @@ -782,11 +766,10 @@ class HawqRegister(object): else: for i, eof in enumerate(eofs): query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i]) - query += "end transaction;" + self.queries += query else: # update_and_insert eofs = self.sizes - query = "set allow_system_table_mods='dml';" - query += "begin transaction;" + query = "" if self.file_format == 'Parquet': query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) for k, eof in enumerate(eofs[1:]): @@ -804,9 +787,11 @@ class HawqRegister(object): else: for i, eof in enumerate(self.sizes_update): query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i]) - query += "end transaction;" + self.queries += query + + def _modify_metadata(self, mode): try: - self.utility_accessor.update_catalog(query) + self.utility_accessor.update_catalog(self.queries) except pg.DatabaseError as e: print e self.failure_handler.rollback() @@ -821,14 +806,18 @@ class HawqRegister(object): query += "end transaction;" return self.utility_accessor.update_catalog(query) - def _mapping_tablename_from_yml(self, partitions): - ''' Mapping table name from yml file, return a list of (table_name,(file_path, file_size)) ''' - mappings = [] - for pos, constraint in enumerate(self.partitions_constraint): - if partitions.has_key(constraint): - mappings.extend([(partitions[constraint], (self.partitions_filepaths[pos][i], self.partitions_filesizes[pos][i])) - for i in xrange(len(self.partitions_filepaths[pos]))]) - return mappings + def _prepare_register(self): + if not self.do_not_move: + self._set_move_files_in_hdfs() + if (not self.do_not_move) and self.mode == 'force': + self._set_modify_metadata('force') + else: + if self.mode == 'force': + self._set_modify_metadata('force') + elif self.mode == 'repair': + self._set_modify_metadata('update') + else: + self._set_modify_metadata('insert') def register(self): if not self.do_not_move:
