HAWQ-991. Rewrite hawqregister to support registering from yaml file.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2596be6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2596be6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2596be6e Branch: refs/heads/master Commit: 2596be6e9c87da3d23a13f92e85307b785bee5d5 Parents: 7661dec Author: xunzhang <[email protected]> Authored: Tue Aug 9 19:39:13 2016 +0800 Committer: rlei <[email protected]> Committed: Fri Aug 19 10:57:09 2016 +0800 ---------------------------------------------------------------------- .../ManagementTool/test_hawq_register.cpp | 20 +- tools/bin/hawqextract | 0 tools/bin/hawqregister | 256 +++++++++---------- 3 files changed, 130 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/src/test/feature/ManagementTool/test_hawq_register.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register.cpp b/src/test/feature/ManagementTool/test_hawq_register.cpp index a7982b3..afc2cb4 100644 --- a/src/test/feature/ManagementTool/test_hawq_register.cpp +++ b/src/test/feature/ManagementTool/test_hawq_register.cpp @@ -27,7 +27,7 @@ TEST_F(TestHawqRegister, TestSingleHawqFile) { util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hawq.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq hawqregister")); util.query("select * from hawqregister;", 3); util.execute("insert into hawqregister values(1);"); @@ -46,7 +46,7 @@ TEST_F(TestHawqRegister, TestSingleHiveFile) { util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hive.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hive.paq hawqregister")); util.query("select * from hawqregister;", 1); util.execute("insert into hawqregister values(1);"); @@ -67,7 +67,7 @@ TEST_F(TestHawqRegister, TestDataTypes) { util.execute("create table hawqregister(a bool, b int2, c int2, d int4, e int8, f date, g float4, h float8, i varchar, j bytea, k char, l varchar) with (appendonly=true, orientation=parquet);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_data_types.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_data_types.paq hawqregister")); util.query("select * from hawqregister;", 1); util.execute("drop table hawqregister;"); @@ -87,7 +87,7 @@ TEST_F(TestHawqRegister, TestAllNULL) { util.execute("create table hawqregister(a bool, b int2, c int2, d int4, e int8, f date, g float4, h float8, i varchar, j bytea, k char, l varchar) with (appendonly=true, orientation=parquet);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_data_types.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_data_types.paq hawqregister")); util.query("select * from hawqregister;", 1); util.execute("drop table hawqregister;"); @@ -113,7 +113,7 @@ TEST_F(TestHawqRegister, TestFiles) { util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_test")); + EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_test hawqregister")); util.query("select * from hawqregister;", 12); util.execute("insert into hawqregister values(1);"); @@ -133,7 +133,7 @@ TEST_F(TestHawqRegister, TestHashDistributedTable) { util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet) distributed by (i);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hawq.paq")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq hawqregister")); util.query("select * from hawqregister;", 0); EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_hawq.paq")); @@ -151,7 +151,7 @@ TEST_F(TestHawqRegister, TestNotParquetFile) { util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_test_not_paq")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_test_not_paq hawqregister")); util.query("select * from hawqregister;", 0); EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_test_not_paq")); @@ -169,7 +169,7 @@ TEST_F(TestHawqRegister, TestNotParquetTable) { util.execute("create table hawqregister(i int);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hawq.paq")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq hawqregister")); util.query("select * from hawqregister;", 0); EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_hawq.paq")); @@ -182,7 +182,7 @@ TEST_F(TestHawqRegister, TestFileNotExist) { util.execute("create table hawqregister(i int);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister /hdfs://localhost:8020hawq_register_file_not_exist")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f /hdfs://localhost:8020hawq_register_file_not_exist hawqregister")); util.query("select * from hawqregister;", 0); util.execute("drop table hawqregister;"); @@ -199,7 +199,7 @@ TEST_F(TestHawqRegister, TestNotHDFSPath) { util.execute("create table hawqregister(i int);"); util.query("select * from hawqregister;", 0); - EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + "hawqregister /hawq_register_hawq.paq")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f /hawq_register_hawq.paq hawqregister")); util.query("select * from hawqregister;", 0); EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_hawq.paq")); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/tools/bin/hawqextract ---------------------------------------------------------------------- diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 380a548..6700f54 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,9 +17,8 @@ # specific language governing permissions and limitations # under the License. -''' -hawq register [options] database_name table_name file_or_dir_path_in_hdfs -''' +# Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath] tablename +# Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] tablename import os, sys, optparse, getpass, re, urlparse try: from gppylib.commands.unix import getLocalHostname, getUserName @@ -40,133 +39,137 @@ EXECNAME = os.path.split(__file__)[-1] setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) -def create_opt_parser(version): +def option_parser(): parser = OptParser(option_class=OptChecker, - usage='usage: %prog [options] database_name table_name file_or_dir_path_in_hdfs', - version=version) + usage='usage: %prog [options] table_name', + version='%prog version $Revision: #1 $') parser.remove_option('-h') parser.add_option('-?', '--help', action='help') - parser.add_option('-h', '--host', help="host of the target DB") - parser.add_option('-p', '--port', help="port of the target DB", type='int', default=0) - parser.add_option('-U', '--user', help="username of the target DB") - return parser - - -def check_hadoop_command(): - hdfscmd = "hadoop" - result = local_ssh(hdfscmd); - if result != 0: - logger.error("command 'hadoop' is not available, please set environment variable $PATH to fix this") + parser.add_option('-h', '--host', help='host of the target DB') + parser.add_option('-p', '--port', help='port of the target DB', type='int', default=0) + parser.add_option('-U', '--user', help='username of the target DB') + parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name') + parser.add_option('-f', '--filepath', dest = 'filepath', help='file name in HDFS') + parser.add_option('-c', '--config', dest = 'yml_config', default = '', help='configuration file in YAML format') + return parser.parse_args() + + +def option_parser_yml(yml_file): + import yaml + with open(yml_file, 'r') as f: + params = yaml.load(f) + if params['FileFormat'] == 'Parquet': + offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/') + filepath = params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset] if len(params['Parquet_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'] + return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'] + offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/') + filepath = params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset] if len(params['AO_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'] + return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'] + + +def create_table(dburl, tablename, schema_info, fmt, distrbution_policy): + try: + schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info]) + fmt = 'ROW' if fmt == 'AO' else fmt + query = 'create table %s(%s) with (appendonly=true, orientation=%s) %s;' % (tablename, schema, fmt, distrbution_policy) + conn = dbconn.connect(dburl, False) + rows = dbconn.execSQL(conn, query) + conn.commit() + except DatabaseError, ex: + logger.error('Failed to execute query ""%s"' % query) sys.exit(1) -def get_seg_name(options, databasename, tablename): +def get_seg_name(dburl, tablename, database): try: - relfilenode = 0 - relname = "" - query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 where pg_class1.relname ='%s' " - "and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + relname = '' + query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 " + "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename conn = dbconn.connect(dburl, True) rows = dbconn.execSQL(conn, query) - conn.commit() - if rows.rowcount == 0: - logger.error("table '%s' not found in db '%s'" % (tablename, databasename)); + conn.commit() + if not rows.rowcount: + logger.error('table "%s" not found in db "%s"' % (tablename, database)) sys.exit(1) for row in rows: relname = row[0] conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) - sys.exit(1) - - # check whether the target table is parquet format - if relname.find("paq") == -1: - logger.error("table '%s' is not parquet format" % tablename) + logger.error('Failed to run query "%s" with dbname "%s"' % (query, database)) sys.exit(1) - return relname -def check_hash_type(options, databasename, tablename): +def check_hash_type(dburl, tablename): + '''Check whether target table is hash-typed, in that case simple insertion does not work''' try: query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) - conn.commit() - if rows.rowcount == 0: - logger.error("target not found in table gp_distribution_policy") + conn.commit() + if not rows.rowcount: + logger.error('Target not found in table gp_distribution_policy.') sys.exit(1) for row in rows: - if row[0] != None: - logger.error("Cannot register file(s) to a table which is hash-typed") + if row[0]: + logger.error('Cannot register file(s) to a table which is hash-typed.') sys.exit(1) - conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) -def get_metadata_from_database(options, databasename, tablename, seg_name): +def get_metadata_from_database(dburl, tablename, seg_name): + '''Get the metadata to be inserted from hdfs''' try: - query = "select segno from pg_aoseg.%s;" % seg_name - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + query = 'select segno from pg_aoseg.%s;' % seg_name conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) - conn.commit() + conn.commit() conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) firstsegno = rows.rowcount + 1 - # get the full path of correspoding file for target table try: + # get the full path of correspoding file for target table query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, " - "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = " - "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid " - "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = " + "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid " + "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) - conn.commit() + conn.commit() conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) - for row in rows: tabledir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3]) + "/" - + #tabledir = '/'.join([row[0], str(row[1]), str(row[2]), str(row[3]), '']) return firstsegno, tabledir def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir): + '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster''' # check whether the files to be registered is in hdfs filesystem = filepath.split('://') if filesystem[0] != 'hdfs': - logger.error("Only support to register file(s) in hdfs") + logger.error('Only support to register file(s) in hdfs') sys.exit(1) fileroot = filepath.split('/') tableroot = tabledir.split('/') # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020' - if fileroot[0] != tableroot[0] or fileroot[1] != tableroot[1] or fileroot[2] != tableroot[2]: + if fileroot[0:3] != tableroot[0:3]: logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir)) sys.exit(1) def get_files_in_hdfs(filepath): + '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files''' files = [] sizes = [] hdfscmd = "hadoop fs -test -e %s" % filepath @@ -174,52 +177,52 @@ def get_files_in_hdfs(filepath): if result != 0: logger.error("Path '%s' does not exist in hdfs" % filepath) sys.exit(1) - hdfscmd = "hadoop fs -ls -R %s" % filepath + print filepath result, out, err = local_ssh_output(hdfscmd) outlines = out.splitlines() - # recursively search all the files under path 'filepath' - i = 0 for line in outlines: lineargs = line.split() if len(lineargs) == 8 and lineargs[0].find ("d") == -1: files.append(lineargs[7]) sizes.append(int(lineargs[4])) - if len(files) == 0: logger.error("Dir '%s' is empty" % filepath) sys.exit(1) - return files, sizes -def check_parquet_format(options, files): - # check whether the files are parquet format by checking the first and last four bytes - for file in files: - hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file +def check_parquet_format(files): + '''Check whether the file to be registered is parquet format''' + for f in files: + hdfscmd = 'hadoop fs -du -h %s | head -c 1' % f + rc, out, err = local_ssh_output(hdfscmd) + if out == '0': + continue + hdfscmd = 'hadoop fs -cat %s | head -c 4 | grep PAR1' % f result1 = local_ssh(hdfscmd) - hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file + hdfscmd = 'hadoop fs -cat %s | tail -c 4 | grep PAR1' % f result2 = local_ssh(hdfscmd) if result1 or result2: - logger.error("File %s is not parquet format" % file) + logger.error('File %s is not parquet format' % f) sys.exit(1) -def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabledir, normal): - # move file(s) in src path into the folder correspoding to the target table - if (normal == True): +def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, normal): + '''Move file(s) in src path into the folder correspoding to the target table''' + if normal: segno = firstsegno for file in files: srcfile = file dstfile = tabledir + str(segno) segno += 1 if srcfile != dstfile: - hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile) - sys.stdout.write("hdfscmd: '%s'\n" % hdfscmd) + hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile) + sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) result = local_ssh(hdfscmd) if result != 0: - logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile)) + logger.error('Fail to move %s to %s' % (srcfile, dstfile)) sys.exit(1) else: segno = firstsegno @@ -228,79 +231,60 @@ def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabl srcfile = tabledir + str(segno) segno += 1 if srcfile != dstfile: - hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile) - sys.stdout.write("hdfscmd: '%s'\n" % hdfscmd) + hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile) + sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) result = local_ssh(hdfscmd) if result != 0: - logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile)) + logger.error('Fail to move "%s" to "%s"' % (srcfile, dstfile)) sys.exit(1) -def insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno, tabledir, eofs): +def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firstsegno, tabledir, eofs): + '''Insert the metadata into database''' try: query = "SET allow_system_table_mods='dml';" segno = firstsegno for eof in eofs: query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % (seg_name, segno, eof, -1, -1) segno += 1 - - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) conn = dbconn.connect(dburl, True) rows = dbconn.execSQL(conn, query) - conn.commit() + conn.commit() conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) - move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabledir, False) - + logger.error('Failed to connect to database, this script can only be run when the database is up') + move_files_in_hdfs(options.database, options.tablename, files, firstsegno, tabledir, False) sys.exit(1) -def main(args=None): - parser = create_opt_parser('%prog version $Revision: #1 $') - options, args = parser.parse_args(args) - if len(args) != 3: - sys.stderr.write('Incorrect number of arguments\n\n') - parser.print_help(sys.stderr) - return 1 - - databasename = args[0] - tablename = args[1] - filepath = args[2] - - # 1. check whether the path of shell command 'hadoop' is set. - check_hadoop_command() - - # 2. get the seg_name from database - seg_name = get_seg_name(options, databasename, tablename) +if __name__ == '__main__': + options, args = option_parser() + if len(args) != 1 or (options.yml_config and options.filepath): + logger.error('Incorrect usage!\n Correct usage: "hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath] tablename"\n or "hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] tablename"\n') + sys.exit(1) + if local_ssh('hadoop'): + logger.error('command "hadoop" is not available.') + sys.exit(1) - # 3. check whether target table is hash-typed, in that case simple insertion does not work - result = check_hash_type(options, databasename, tablename) + dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=options.database) + filepath, database, tablename = options.filepath, options.database, args[0] - # 4. get the metadata to be inserted from hdfs - firstsegno, tabledir = get_metadata_from_database(options, databasename, tablename, seg_name) + if options.yml_config: # Usage2 + fileformat, filepath, schema, distribution_policy = option_parser_yml(options.yml_config) + create_table(dburl, tablename, schema, fileformat, distribution_policy) + else: + fileformat = 'Parquet' + check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table - # 5. check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster + seg_name = get_seg_name(dburl, tablename, database) + firstsegno, tabledir = get_metadata_from_database(dburl, tablename, seg_name) check_files_and_table_in_same_hdfs_cluster(filepath, tabledir) - # 6. get all the files refered by 'filepath', which could be a file or a directory containing all the files files, sizes = get_files_in_hdfs(filepath) - print "File(s) to be registered:" + print 'File(s) to be registered:', files + if fileformat == 'Parquet': + check_parquet_format(files) print files - - # 7. check whether the file to be registered is parquet format - check_parquet_format(options, files) - - # 8. move the file in hdfs to proper location - move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabledir, True) - - # 9. insert the metadata into database - insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno, tabledir, sizes) - - # 10. report the final status of hawq register - logger.info("Hawq register succeed.") - -if __name__ == '__main__': - sys.exit(main()) + move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True) + insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes) + logger.info('Hawq Register Succeed.')
