Repository: incubator-hawq Updated Branches: refs/heads/master 49ceb69e6 -> be201dcc6
HAWQ-1033. Combine update and insert into one transaction, fix hadoop warning. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/be201dcc Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/be201dcc Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/be201dcc Branch: refs/heads/master Commit: be201dcc6848eb5f65cf8afabfc6de377ae98cd8 Parents: 49ceb69 Author: xunzhang <[email protected]> Authored: Tue Sep 13 14:41:45 2016 +0800 Committer: rlei <[email protected]> Committed: Wed Sep 14 10:24:16 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 72 ++++++++++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/be201dcc/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index b3e3493..bbdc946 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -115,7 +115,7 @@ def option_parser_yml(yml_file): with open(yml_file, 'r') as f: params = yaml.load(f) register_yaml_dict_check(params) - if params['FileFormat'] == 'Parquet': + if params['FileFormat'].lower() == 'parquet': if not len(params['Parquet_FileLocations']['Files']): return 'Parquet', [], [], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'] files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']] @@ -259,12 +259,12 @@ def get_files_in_hdfs(filepath): '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files''' files = [] sizes = [] - hdfscmd = "hadoop fs -test -e %s" % filepath + hdfscmd = "hdfs dfs -test -e %s" % filepath result = local_ssh(hdfscmd, logger) if result != 0: logger.error("Path '%s' does not exist in hdfs" % filepath) sys.exit(1) - hdfscmd = "hadoop fs -ls -R %s" % filepath + hdfscmd = "hdfs dfs -ls -R %s" % filepath result, out, err = local_ssh_output(hdfscmd) outlines = out.splitlines() # recursively search all the files under path 'filepath' @@ -282,13 +282,13 @@ def get_files_in_hdfs(filepath): def check_parquet_format(files): '''Check whether the file to be registered is parquet format''' for f in files: - hdfscmd = 'hadoop fs -du -h %s | head -c 1' % f + hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f rc, out, err = local_ssh_output(hdfscmd) if out == '0': continue - hdfscmd = 'hadoop fs -cat %s | head -c 4 | grep PAR1' % f + hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f result1 = local_ssh(hdfscmd, logger) - hdfscmd = 'hadoop fs -cat %s | tail -c 4 | grep PAR1' % f + hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f result2 = local_ssh(hdfscmd, logger) if result1 or result2: logger.error('File %s is not parquet format' % f) @@ -304,7 +304,7 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor dstfile = tabledir + str(segno) segno += 1 if srcfile != dstfile: - hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile) + hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile) sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) result = local_ssh(hdfscmd, logger) if result != 0: @@ -317,7 +317,7 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor srcfile = tabledir + str(segno) segno += 1 if srcfile != dstfile: - hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile) + hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile) sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) result = local_ssh(hdfscmd, logger) if result != 0: @@ -365,6 +365,35 @@ def update_metadata_into_database(dburl, seg_name, files, eofs): sys.exit(1) +def update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, eofs, fmt, update_files, update_eofs): + '''Insert and update the catalog table in --force case''' + try: + query = "set allow_system_table_mods='dml';" + query += "begin transaction;" + if fmt == 'Parquet': + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) + else: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) + query += ';' + + segno_lst = [f.split('/')[-1] for f in update_files] + for i, eof in enumerate(update_eofs): + query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i]) + query += "end transaction;" + conn = dbconn.connect(dburl, True) + rows = dbconn.execSQL(conn, query) + conn.commit() + conn.close() + except DatabaseError, ex: + logger.error('Failed to execute query "%s"' % query) + move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False) + sys.exit(1) + + if __name__ == '__main__': parser = option_parser() @@ -373,8 +402,8 @@ if __name__ == '__main__': if len(args) != 1 or ((options.yml_config or options.force or options.repair) and options.filepath) or (options.force and options.repair): parser.print_help(sys.stderr) sys.exit(1) - if local_ssh('hadoop', logger): - logger.error('command "hadoop" is not available.') + if local_ssh('hdfs', logger): + logger.error('command "hdfs" is not available.') sys.exit(1) dburl = dbconn.DbURL(hostname = options.host, port = options.port, username = options.user, dbname = options.database) @@ -431,12 +460,13 @@ if __name__ == '__main__': do_not_move, files_update, sizes_update = True, files, sizes files, sizes = [], [] else: - for k, f in enumerate(files): + files_old, sizes_old = [f for f in files], [sz for sz in sizes] + for k, f in enumerate(files_old): if f in existed_files: - files_update.append(files[k]) - sizes_update.append(sizes[k]) - files.remove(files[k]) - sizes.remove(sizes[k]) + files_update.append(files_old[k]) + sizes_update.append(sizes_old[k]) + files.remove(files_old[k]) + sizes.remove(sizes_old[k]) check_files_and_table_in_same_hdfs_cluster(filepath, tabledir) @@ -458,10 +488,12 @@ if __name__ == '__main__': if not do_not_move: move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True) - # update catalog table - if not do_not_move: - insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat) - if force_mode: - update_metadata_into_database(dburl, seg_name, files_update, sizes_update) + if (not do_not_move) and force_mode: + update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat, files_update, sizes_update) + else: + if force_mode: + update_metadata_into_database(dburl, seg_name, files_update, sizes_update) + else: + insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat) logger.info('Hawq Register Succeed.')
