HAWQ-991. Rewrite hawqregister to support registering from yaml file.

Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2596be6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2596be6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2596be6e

Branch: refs/heads/master
Commit: 2596be6e9c87da3d23a13f92e85307b785bee5d5
Parents: 7661dec
Author: xunzhang <[email protected]>
Authored: Tue Aug 9 19:39:13 2016 +0800
Committer: rlei <[email protected]>
Committed: Fri Aug 19 10:57:09 2016 +0800

----------------------------------------------------------------------
 .../ManagementTool/test_hawq_register.cpp       |  20 +-
 tools/bin/hawqextract                           |   0
 tools/bin/hawqregister                          | 256 +++++++++----------
 3 files changed, 130 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/src/test/feature/ManagementTool/test_hawq_register.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register.cpp 
b/src/test/feature/ManagementTool/test_hawq_register.cpp
index a7982b3..afc2cb4 100644
--- a/src/test/feature/ManagementTool/test_hawq_register.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register.cpp
@@ -27,7 +27,7 @@ TEST_F(TestHawqRegister, TestSingleHawqFile) {
        util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hawq.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq hawqregister"));
 
        util.query("select * from hawqregister;", 3);
        util.execute("insert into hawqregister values(1);");
@@ -46,7 +46,7 @@ TEST_F(TestHawqRegister, TestSingleHiveFile) {
        util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hive.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hive.paq hawqregister"));
 
        util.query("select * from hawqregister;", 1);
        util.execute("insert into hawqregister values(1);");
@@ -67,7 +67,7 @@ TEST_F(TestHawqRegister, TestDataTypes) {
        util.execute("create table hawqregister(a bool, b int2, c int2, d int4, 
e int8, f date, g float4, h float8, i varchar, j bytea, k char, l varchar) with 
(appendonly=true, orientation=parquet);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_data_types.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_data_types.paq 
hawqregister"));
 
        util.query("select * from hawqregister;", 1);
        util.execute("drop table hawqregister;");
@@ -87,7 +87,7 @@ TEST_F(TestHawqRegister, TestAllNULL) {
        util.execute("create table hawqregister(a bool, b int2, c int2, d int4, 
e int8, f date, g float4, h float8, i varchar, j bytea, k char, l varchar) with 
(appendonly=true, orientation=parquet);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_data_types.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_data_types.paq 
hawqregister"));
 
        util.query("select * from hawqregister;", 1);
        util.execute("drop table hawqregister;");
@@ -113,7 +113,7 @@ TEST_F(TestHawqRegister, TestFiles) {
        util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_test"));
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_test hawqregister"));
 
        util.query("select * from hawqregister;", 12);
        util.execute("insert into hawqregister values(1);");
@@ -133,7 +133,7 @@ TEST_F(TestHawqRegister, TestHashDistributedTable) {
        util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet) distributed by (i);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hawq.paq"));
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq hawqregister"));
        util.query("select * from hawqregister;", 0);
 
        EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
hdfs://localhost:8020/hawq_register_hawq.paq"));
@@ -151,7 +151,7 @@ TEST_F(TestHawqRegister, TestNotParquetFile) {
        util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_test_not_paq"));
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_test_not_paq hawqregister"));
        util.query("select * from hawqregister;", 0);
 
        EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
hdfs://localhost:8020/hawq_register_test_not_paq"));
@@ -169,7 +169,7 @@ TEST_F(TestHawqRegister, TestNotParquetTable) {
        util.execute("create table hawqregister(i int);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister hdfs://localhost:8020/hawq_register_hawq.paq"));
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq hawqregister"));
        util.query("select * from hawqregister;", 0);
 
        EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
hdfs://localhost:8020/hawq_register_hawq.paq"));
@@ -182,7 +182,7 @@ TEST_F(TestHawqRegister, TestFileNotExist) {
        util.execute("create table hawqregister(i int);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + " hawqregister /hdfs://localhost:8020hawq_register_file_not_exist"));
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f /hdfs://localhost:8020hawq_register_file_not_exist 
hawqregister"));
        util.query("select * from hawqregister;", 0);
 
        util.execute("drop table hawqregister;");
@@ -199,7 +199,7 @@ TEST_F(TestHawqRegister, TestNotHDFSPath) {
        util.execute("create table hawqregister(i int);");
        util.query("select * from hawqregister;", 0);
 
-       EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) 
HAWQ_DB + "hawqregister /hawq_register_hawq.paq"));
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -f /hawq_register_hawq.paq hawqregister"));
        util.query("select * from hawqregister;", 0);
 
        EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
hdfs://localhost:8020/hawq_register_hawq.paq"));

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/tools/bin/hawqextract
----------------------------------------------------------------------
diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract
old mode 100755
new mode 100644

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 380a548..6700f54 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,9 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-'''
-hawq register [options] database_name table_name file_or_dir_path_in_hdfs
-'''
+# Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] 
[-f filepath] tablename
+# Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] 
[-c config] tablename
 import os, sys, optparse, getpass, re, urlparse
 try:
     from gppylib.commands.unix import getLocalHostname, getUserName
@@ -40,133 +39,137 @@ EXECNAME = os.path.split(__file__)[-1]
 setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
 
 
-def create_opt_parser(version):
+def option_parser():
     parser = OptParser(option_class=OptChecker,
-                       usage='usage: %prog [options] database_name table_name 
file_or_dir_path_in_hdfs',
-                       version=version)
+                       usage='usage: %prog [options] table_name',
+                       version='%prog version $Revision: #1 $')
     parser.remove_option('-h')
     parser.add_option('-?', '--help', action='help')
-    parser.add_option('-h', '--host', help="host of the target DB")
-    parser.add_option('-p', '--port', help="port of the target DB", 
type='int', default=0)
-    parser.add_option('-U', '--user', help="username of the target DB")
-    return parser
-
-
-def check_hadoop_command():
-    hdfscmd = "hadoop"
-    result = local_ssh(hdfscmd);
-    if result != 0:
-        logger.error("command 'hadoop' is not available, please set 
environment variable $PATH to fix this")
+    parser.add_option('-h', '--host', help='host of the target DB')
+    parser.add_option('-p', '--port', help='port of the target DB', 
type='int', default=0)
+    parser.add_option('-U', '--user', help='username of the target DB')
+    parser.add_option('-d', '--database', default = 'postgres', dest = 
'database', help='database name')
+    parser.add_option('-f', '--filepath', dest = 'filepath', help='file name 
in HDFS')
+    parser.add_option('-c', '--config', dest = 'yml_config', default = '', 
help='configuration file in YAML format')
+    return parser.parse_args()
+
+
+def option_parser_yml(yml_file):
+    import yaml
+    with open(yml_file, 'r') as f:
+        params = yaml.load(f)
+    if params['FileFormat'] == 'Parquet':
+        offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/')
+        filepath = params['DFS_URL'] + 
params['Parquet_FileLocations']['Files'][0]['path'][:offset] if 
len(params['Parquet_FileLocations']['Files']) != 1 else params['DFS_URL'] + 
params['Parquet_FileLocations']['Files'][0]['path']
+        return 'Parquet', filepath, params['Parquet_Schema'], 
params['Distribution_Policy']
+    offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/')
+    filepath = params['DFS_URL'] + 
params['AO_FileLocations']['Files'][0]['path'][:offset] if 
len(params['AO_FileLocations']['Files']) != 1 else params['DFS_URL'] + 
params['AO_FileLocations']['Files'][0]['path']
+    return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy']
+
+
+def create_table(dburl, tablename, schema_info, fmt, distrbution_policy):
+    try:
+        schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
+        fmt = 'ROW' if fmt == 'AO' else fmt
+        query = 'create table %s(%s) with (appendonly=true, orientation=%s) 
%s;' % (tablename, schema, fmt, distrbution_policy)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+        conn.commit()
+    except DatabaseError, ex:
+        logger.error('Failed to execute query ""%s"' % query)
         sys.exit(1)
 
 
-def get_seg_name(options, databasename, tablename):
+def get_seg_name(dburl, tablename, database):
     try:
-        relfilenode = 0
-        relname = ""
-        query = ("select pg_class2.relname from pg_class as pg_class1, 
pg_appendonly, pg_class as pg_class2 where pg_class1.relname ='%s' "
-                 "and  pg_class1.oid = pg_appendonly.relid and 
pg_appendonly.segrelid = pg_class2.oid;") % tablename
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        relname = ''
+        query = ("select pg_class2.relname from pg_class as pg_class1, 
pg_appendonly, pg_class as pg_class2 "
+                 "where pg_class1.relname ='%s' and pg_class1.oid = 
pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
         conn = dbconn.connect(dburl, True)
         rows = dbconn.execSQL(conn, query)
-       conn.commit()
-        if rows.rowcount == 0:
-            logger.error("table '%s' not found in db '%s'" % (tablename, 
databasename));
+        conn.commit()
+        if not rows.rowcount:
+            logger.error('table "%s" not found in db "%s"' % (tablename, 
database))
             sys.exit(1)
         for row in rows:
             relname = row[0]
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
-        sys.exit(1)
-
-    # check whether the target table is parquet format
-    if relname.find("paq") == -1:
-        logger.error("table '%s' is not parquet format" % tablename)
+        logger.error('Failed to run query "%s" with dbname "%s"' % (query, 
database))
         sys.exit(1)
-
     return relname
 
 
-def check_hash_type(options, databasename, tablename):
+def check_hash_type(dburl, tablename):
+    '''Check whether target table is hash-typed, in that case simple insertion 
does not work'''
     try:
         query = "select attrnums from gp_distribution_policy, pg_class where 
pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % 
tablename
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
-       conn.commit()
-        if rows.rowcount == 0:
-            logger.error("target not found in table gp_distribution_policy")
+        conn.commit()
+        if not rows.rowcount:
+            logger.error('Target not found in table gp_distribution_policy.')
             sys.exit(1)
         for row in rows:
-            if row[0] != None:
-                logger.error("Cannot register file(s) to a table which is 
hash-typed")
+            if row[0]:
+                logger.error('Cannot register file(s) to a table which is 
hash-typed.')
                 sys.exit(1)
-
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
 
 
-def get_metadata_from_database(options, databasename, tablename, seg_name):
+def get_metadata_from_database(dburl, tablename, seg_name):
+    '''Get the metadata to be inserted from hdfs'''
     try:
-        query = "select segno from pg_aoseg.%s;" % seg_name
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        query = 'select segno from pg_aoseg.%s;' % seg_name
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
-       conn.commit()
+        conn.commit()
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
 
     firstsegno = rows.rowcount + 1
 
-    # get the full path of correspoding file for target table
     try:
+        # get the full path of correspoding file for target table
         query = ("select location, 
gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from 
pg_class, gp_persistent_relation_node, "
-                 "gp_persistent_tablespace_node, gp_persistent_filespace_node 
where relname = '%s' and pg_class.relfilenode = "
-                 "gp_persistent_relation_node.relfilenode_oid and 
gp_persistent_relation_node.tablespace_oid = 
gp_persistent_tablespace_node.tablespace_oid "
-                 "and gp_persistent_filespace_node.filespace_oid = 
gp_persistent_filespace_node.filespace_oid;") % tablename
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+             "gp_persistent_tablespace_node, gp_persistent_filespace_node 
where relname = '%s' and pg_class.relfilenode = "
+             "gp_persistent_relation_node.relfilenode_oid and 
gp_persistent_relation_node.tablespace_oid = 
gp_persistent_tablespace_node.tablespace_oid "
+             "and gp_persistent_filespace_node.filespace_oid = 
gp_persistent_filespace_node.filespace_oid;") % tablename
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
-       conn.commit()
+        conn.commit()
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
-
     for row in rows:
         tabledir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + 
"/" + str(row[3]) + "/"
-
+        #tabledir = '/'.join([row[0], str(row[1]), str(row[2]), str(row[3]), 
''])
     return firstsegno, tabledir
 
 
 def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
+    '''Check whether all the files refered by 'filepath' and the location 
corresponding to the table are in the same hdfs cluster'''
     # check whether the files to be registered is in hdfs
     filesystem = filepath.split('://')
     if filesystem[0] != 'hdfs':
-        logger.error("Only support to register file(s) in hdfs")
+        logger.error('Only support to register file(s) in hdfs')
         sys.exit(1)
     fileroot = filepath.split('/')
     tableroot = tabledir.split('/')
     # check the root url of them. eg: for 
'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
-    if fileroot[0] != tableroot[0] or fileroot[1] != tableroot[1] or 
fileroot[2] != tableroot[2]:
+    if fileroot[0:3] != tableroot[0:3]:
         logger.error("Files to be registered and the table are not in the same 
hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % 
(filepath, tabledir))
         sys.exit(1)
 
 
 def get_files_in_hdfs(filepath):
+    '''Get all the files refered by 'filepath', which could be a file or a 
directory containing all the files'''
     files = []
     sizes = []
     hdfscmd = "hadoop fs -test -e %s" % filepath
@@ -174,52 +177,52 @@ def get_files_in_hdfs(filepath):
     if result != 0:
         logger.error("Path '%s' does not exist in hdfs" % filepath)
         sys.exit(1)
-
     hdfscmd = "hadoop fs -ls -R %s" % filepath
+    print filepath
     result, out, err = local_ssh_output(hdfscmd)
     outlines = out.splitlines()
-
     # recursively search all the files under path 'filepath'
-    i = 0
     for line in outlines:
         lineargs = line.split()
         if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
             files.append(lineargs[7])
             sizes.append(int(lineargs[4]))
-
     if len(files) == 0:
         logger.error("Dir '%s' is empty" % filepath)
         sys.exit(1)
-
     return files, sizes
 
 
-def check_parquet_format(options, files):
-    # check whether the files are parquet format by checking the first and 
last four bytes
-    for file in files:
-        hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file
+def check_parquet_format(files):
+    '''Check whether the file to be registered is parquet format'''
+    for f in files:
+        hdfscmd = 'hadoop fs -du -h %s | head -c 1' % f
+        rc, out, err = local_ssh_output(hdfscmd)
+        if out == '0':
+            continue
+        hdfscmd = 'hadoop fs -cat %s | head -c 4 | grep PAR1' % f
         result1 = local_ssh(hdfscmd)
-        hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file
+        hdfscmd = 'hadoop fs -cat %s | tail -c 4 | grep PAR1' % f
         result2 = local_ssh(hdfscmd)
         if result1 or result2:
-            logger.error("File %s is not parquet format" % file)
+            logger.error('File %s is not parquet format' % f)
             sys.exit(1)
 
 
-def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, 
tabledir, normal):
-    # move file(s) in src path into the folder correspoding to the target table
-    if (normal == True):
+def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, 
normal):
+    '''Move file(s) in src path into the folder correspoding to the target 
table'''
+    if normal:
         segno = firstsegno
         for file in files:
             srcfile = file
             dstfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
-                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
-                sys.stdout.write("hdfscmd: '%s'\n" % hdfscmd)
+                hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+                sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
                 result = local_ssh(hdfscmd)
                 if result != 0:
-                    logger.error("Fail to move '%s' to '%s'" % (srcfile, 
dstfile))
+                    logger.error('Fail to move %s to %s' % (srcfile, dstfile))
                     sys.exit(1)
     else:
         segno = firstsegno
@@ -228,79 +231,60 @@ def move_files_in_hdfs(options, databasename, tablename, 
files, firstsegno, tabl
             srcfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
-                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
-                sys.stdout.write("hdfscmd: '%s'\n" % hdfscmd)
+                hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+                sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
                 result = local_ssh(hdfscmd)
                 if result != 0:
-                    logger.error("Fail to move '%s' to '%s'" % (srcfile, 
dstfile))
+                    logger.error('Fail to move "%s" to "%s"' % (srcfile, 
dstfile))
                     sys.exit(1)
 
 
-def insert_metadata_into_database(options, databasename, tablename, seg_name, 
firstsegno, tabledir, eofs):
+def insert_metadata_into_database(dburl, databasename, tablename, seg_name, 
firstsegno, tabledir, eofs):
+    '''Insert the metadata into database'''
     try:
         query = "SET allow_system_table_mods='dml';"
         segno = firstsegno
         for eof in eofs:
             query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % 
(seg_name, segno, eof, -1, -1)
             segno += 1
-
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
         conn = dbconn.connect(dburl, True)
         rows = dbconn.execSQL(conn, query)
-       conn.commit()
+        conn.commit()
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
-       move_files_in_hdfs(options, databasename, tablename, files, firstsegno, 
tabledir, False)
-
+        logger.error('Failed to connect to database, this script can only be 
run when the database is up')
+        move_files_in_hdfs(options.database, options.tablename, files, 
firstsegno, tabledir, False)
         sys.exit(1)
 
 
-def main(args=None):
-    parser = create_opt_parser('%prog version $Revision: #1 $')
-    options, args = parser.parse_args(args)
-    if len(args) != 3:
-        sys.stderr.write('Incorrect number of arguments\n\n')
-        parser.print_help(sys.stderr)
-        return 1
-
-    databasename = args[0]
-    tablename = args[1]
-    filepath = args[2]
-
-    # 1. check whether the path of shell command 'hadoop' is set.
-    check_hadoop_command()
-
-    # 2. get the seg_name from database
-    seg_name = get_seg_name(options, databasename, tablename)
+if __name__ == '__main__':
+    options, args = option_parser()
+    if len(args) != 1 or (options.yml_config and options.filepath):
+        logger.error('Incorrect usage!\n Correct usage: "hawq register [-h 
hostname] [-p port] [-U username] [-d database] [-f filepath] tablename"\n or 
"hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] 
tablename"\n')
+        sys.exit(1)
+    if local_ssh('hadoop'):
+        logger.error('command "hadoop" is not available.')
+        sys.exit(1)
 
-    # 3. check whether target table is hash-typed, in that case simple 
insertion does not work
-    result = check_hash_type(options, databasename, tablename)
+    dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=options.database)
+    filepath, database, tablename = options.filepath, options.database, args[0]
 
-    # 4. get the metadata to be inserted from hdfs
-    firstsegno, tabledir = get_metadata_from_database(options, databasename, 
tablename, seg_name)
+    if options.yml_config: # Usage2
+        fileformat, filepath, schema, distribution_policy = 
option_parser_yml(options.yml_config)
+        create_table(dburl, tablename, schema, fileformat, distribution_policy)
+    else:
+        fileformat = 'Parquet'
+        check_hash_type(dburl, tablename) # Usage1 only support randomly 
distributed table
 
-    # 5. check whether all the files refered by 'filepath' and the location 
corresponding to the table are in the same hdfs cluster
+    seg_name = get_seg_name(dburl, tablename, database)
+    firstsegno, tabledir = get_metadata_from_database(dburl, tablename, 
seg_name)
     check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
 
-    # 6. get all the files refered by 'filepath', which could be a file or a 
directory containing all the files
     files, sizes = get_files_in_hdfs(filepath)
-    print "File(s) to be registered:"
+    print 'File(s) to be registered:', files
+    if fileformat == 'Parquet':
+        check_parquet_format(files)
     print files
-
-    # 7. check whether the file to be registered is parquet format
-    check_parquet_format(options, files)
-
-    # 8. move the file in hdfs to proper location
-    move_files_in_hdfs(options, databasename, tablename, files, firstsegno, 
tabledir, True)
-
-    # 9. insert the metadata into database
-    insert_metadata_into_database(options, databasename, tablename, seg_name, 
firstsegno, tabledir, sizes)
-
-    # 10. report the final status of hawq register
-    logger.info("Hawq register succeed.")
-
-if __name__ == '__main__':
-    sys.exit(main())
+    move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
+    insert_metadata_into_database(dburl, database, tablename, seg_name, 
firstsegno, tabledir, sizes)
+    logger.info('Hawq Register Succeed.')

Reply via email to