HAWQ-1060. Refactor hawq register with better readability and quality.

Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/637f9d57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/637f9d57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/637f9d57

Branch: refs/heads/master
Commit: 637f9d5787aec24d72a3159b356d388ba116991e
Parents: 981c0a9
Author: xunzhang <xunzhang...@gmail.com>
Authored: Fri Sep 16 18:48:39 2016 +0800
Committer: Lili Ma <ictmal...@gmail.com>
Committed: Sun Sep 18 14:39:54 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 680 ++++++++++++++++++++++----------------------
 1 file changed, 335 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/637f9d57/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index bbdc946..ee5275b 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -20,80 +20,81 @@
 # Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] 
[-f filepath] [-e eof] <tablename>
 # Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] 
[-c config] [--force] [--repair] <tablename>
 
-import os, sys, optparse, getpass, re, urlparse
+import os
+import sys
 try:
     from gppylib.commands.unix import getLocalHostname, getUserName
     from gppylib.db import dbconn
     from gppylib.gplog import get_default_logger, setup_tool_logging
     from gppylib.gpparseopts import OptParser, OptChecker
     from pygresql import pg
-    from pygresql.pgdb import DatabaseError
     from hawqpylib.hawqlib import local_ssh, local_ssh_output
 except ImportError, e:
     print e
-    sys.stderr.write('cannot import module, please check that you have source 
greenplum_path.sh\n')
+    sys.stderr.write('Cannot import module, please check that you have source 
greenplum_path.sh\n')
     sys.exit(2)
 
 # setup logging
 logger = get_default_logger()
 EXECNAME = os.path.split(__file__)[-1]
-setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
+setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
 
 
 def option_parser():
+    '''option parser'''
     parser = OptParser(option_class=OptChecker,
                        usage='usage: %prog [options] table_name',
                        version='%prog version $Revision: #1 $')
     parser.remove_option('-h')
-    parser.add_option('-?', '--help', action = 'help')
-    parser.add_option('-h', '--host', help = 'host of the target DB')
-    parser.add_option('-p', '--port', help = 'port of the target DB', type = 
'int', default = 0)
-    parser.add_option('-U', '--user', help = 'username of the target DB')
-    parser.add_option('-d', '--database', default = 'postgres', dest = 
'database', help='database name')
-    parser.add_option('-f', '--filepath', dest = 'filepath', help = 'file name 
in HDFS')
-    parser.add_option('-e', '--eof', dest = 'filesize', type = 'int', default 
= 0, help = 'eof of the file to be registered')
-    parser.add_option('-c', '--config', dest = 'yml_config', default = '', 
help = 'configuration file in YAML format')
-    parser.add_option('--force', action = 'store_true', default = False)
-    parser.add_option('--repair', action = 'store_true', default = False)
+    parser.add_option('-?', '--help', action='help')
+    parser.add_option('-h', '--host', help='host of the target DB')
+    parser.add_option('-p', '--port', help='port of the target DB', 
type='int', default=0)
+    parser.add_option('-U', '--user', help='username of the target DB')
+    parser.add_option('-d', '--database', default='postgres', dest='database', 
help='database name')
+    parser.add_option('-f', '--filepath', dest='filepath', help='file name in 
HDFS')
+    parser.add_option('-e', '--eof', dest='filesize', type='int', default=0, 
help='eof of the file to be registered')
+    parser.add_option('-c', '--config', dest='yml_config', default='', 
help='configuration file in YAML format')
+    parser.add_option('--force', action='store_true', default=False)
+    parser.add_option('--repair', action='store_true', default=False)
     return parser
 
 
 def register_yaml_dict_check(D):
-    # check exists
+    '''check exists'''
     check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName', 
'Bucketnum']
     for attr in check_list:
-        if D.get(attr) == None:
+        if D.get(attr) is None:
             logger.error('Wrong configuration yaml file format: "%s" attribute 
does not exist.\n See example in "hawq register --help".' % attr)
             sys.exit(1)
     if D['FileFormat'] in ['Parquet', 'AO']:
         prefix = D['FileFormat']
         local_check_list = ['%s_FileLocations' % prefix, '%s_Schema' % prefix]
         for attr in local_check_list:
-            if D.get(attr) == None:
+            if D.get(attr) is None:
                 logger.error('Wrong configuration yaml file format: "%s" 
attribute does not exist.\n See example in "hawq register --help".' % attr)
                 sys.exit(1)
-        if D['%s_FileLocations' % prefix].get('Files') == None:
+        if D['%s_FileLocations' % prefix].get('Files') is None:
             logger.error('Wrong configuration yaml file format: "%s" attribute 
does not exist.\n See example in "hawq register --help".' % 
'%s_FileLocations.Files' % prefix)
             sys.exit(1)
         for d in D['%s_FileLocations' % prefix]['Files']:
-            if d.get('path') == None:
+            if d.get('path') is None:
                 logger.error('Wrong configuration yaml file format: "%s" 
attribute does not exist.\n See example in "hawq register --help".' % 
'%s_FileLocations.Files.path' % prefix)
                 sys.exit(1)
-            if d.get('size') == None:
+            if d.get('size') is None:
                 logger.error('Wrong configuration yaml file format: "%s" 
attribute does not exist.\n See example in "hawq register --help".' % 
'%s_FileLocations.Files.size' % prefix)
                 sys.exit(1)
     else:
         logger.error('hawq register only support Parquet and AO formats. 
Format %s is not supported.' % D['FileFormat'])
         sys.exit(1)
     prefix = D['FileFormat']
-    if D.get('%s_Schema' % prefix) == None:
+    if D.get('%s_Schema' % prefix) is None:
         logger.error('Wrong configuration yaml file format: "%s" attribute 
does not exist.\n See example in "hawq register --help".' % '%s_Schema' % 
prefix)
         sys.exit(1)
     for d in D['%s_Schema' % prefix]:
-        if d.get('name') == None:
+        if d.get('name') is None:
             logger.error('Wrong configuration yaml file format: "%s" attribute 
does not exist.\n See example in "hawq register --help".' % '%s_Schema.name' % 
prefix)
             sys.exit(1)
-        if d.get('type') == None:
+        if d.get('type') is None:
             logger.error('Wrong configuration yaml file format: "%s" attribute 
does not exist.\n See example in "hawq register --help".' % '%s_Schema.type' % 
prefix)
             sys.exit(1)
     if D['FileFormat'] == 'Parquet':
@@ -126,182 +127,263 @@ def option_parser_yml(yml_file):
     return 'AO', files, sizes, params['AO_Schema'], 
params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
 
 
-def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, 
file_locations, bucket_number):
-    try:
-        query = "select count(*) from pg_class where relname = '%s';" % 
tablename.split('.')[-1].lower()
-        conn = dbconn.connect(dburl, False)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
-        for row in rows:
-            if row[0] != 0:
-                return False
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        sys.exit(1)
-
-    try:
+class GpRegisterAccessor(object):
+    def __init__(self, conn):
+        self.conn = conn
+        rows = self.exec_query("""
+        SELECT oid, datname, dat2tablespace,
+               pg_encoding_to_char(encoding) encoding
+        FROM pg_database WHERE datname=current_database()""")
+        self.dbid = rows[0]['oid']
+        self.dbname = rows[0]['datname']
+        self.spcid = rows[0]['dat2tablespace']
+        self.dbencoding = rows[0]['encoding']
+        self.dbversion = self.exec_query('select version()')[0]['version']
+
+    def exec_query(self, sql):
+        '''execute query and return dict result'''
+        return self.conn.query(sql).dictresult()
+
+    def get_table_existed(self, tablename):
+        qry = """select count(*) from pg_class where relname = '%s';""" % 
tablename.split('.')[-1].lower()
+        return self.exec_query(qry)[0]['count'] == 1
+
+    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, 
file_locations, bucket_number):
+        if self.get_table_existed(tablename):
+            return False
         schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
         fmt = 'ROW' if fmt == 'AO' else fmt
         if fmt == 'ROW':
             query = ('create table %s(%s) with (appendonly=true, 
orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) 
%s;'
-                    % (tablename, schema, fmt, 
file_locations['CompressionType'], file_locations['CompressionLevel'], 
file_locations['Checksum'], bucket_number, distrbution_policy))
+                     % (tablename, schema, fmt, 
file_locations['CompressionType'], file_locations['CompressionLevel'], 
file_locations['Checksum'], bucket_number, distrbution_policy))
         else: # Parquet
             query = ('create table %s(%s) with (appendonly=true, 
orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, 
rowgroupsize=%s, bucketnum=%s) %s;'
-                    % (tablename, schema, fmt, 
file_locations['CompressionType'], file_locations['CompressionLevel'], 
file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, 
distrbution_policy))
-        conn = dbconn.connect(dburl, False)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
+                     % (tablename, schema, fmt, 
file_locations['CompressionType'], file_locations['CompressionLevel'], 
file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, 
distrbution_policy))
+        self.conn.query(query)
         return True
-    except DatabaseError, ex:
-        print DatabaseError, ex
-        logger.error('Failed to execute query "%s"' % query)
-        sys.exit(1)
 
+    def check_hash_type(self, tablename):
+        qry = """select attrnums from gp_distribution_policy, pg_class where 
pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" 
% tablename
+        rows = self.exec_query(qry)
+        if len(rows) == 0:
+            logger.error('Table %s not found in table gp_distribution_policy.' 
% tablename)
+            sys.exit(1)
+        if rows[0]['attrnums']:
+            logger.error('Cannot register file(s) to a table which is hash 
distribuetd.')
+            sys.exit(1)
 
-def get_seg_name(dburl, tablename, database, fmt):
-    try:
-        relname = ''
+    # pg_paqseg_#
+    def get_seg_name(self, tablename, database, fmt):
         tablename = tablename.split('.')[-1]
         query = ("select pg_class2.relname from pg_class as pg_class1, 
pg_appendonly, pg_class as pg_class2 "
                  "where pg_class1.relname ='%s' and pg_class1.oid = 
pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
-        conn = dbconn.connect(dburl, True)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        if not rows.rowcount:
+        rows = self.exec_query(query)
+        if len(rows) == 0:
             logger.error('table "%s" not found in db "%s"' % (tablename, 
database))
             sys.exit(1)
-        for row in rows:
-            relname = row[0]
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to run query "%s" with dbname "%s"' % (query, 
database))
-        sys.exit(1)
-    if fmt == 'Parquet':
-        if relname.find("paq") == -1:
-            logger.error("table '%s' is not parquet format" % tablename)
-            sys.exit(1)
-
-    return relname
-
-
-def check_hash_type(dburl, tablename):
-    '''Check whether target table is hash distributed, in that case simple 
insertion does not work'''
-    try:
-        query = "select attrnums from gp_distribution_policy, pg_class where 
pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % 
tablename
-        conn = dbconn.connect(dburl, False)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        if not rows.rowcount:
-            logger.error('Table %s not found in table gp_distribution_policy.' 
% tablename)
-            sys.exit(1)
-        for row in rows:
-            if row[0]:
-                logger.error('Cannot register file(s) to a table which is hash 
distribuetd.')
+        relname = rows[0]['relname']
+        if fmt == 'Parquet':
+            if relname.find('paq') == -1:
+                logger.error("table '%s' is not parquet format" % tablename)
                 sys.exit(1)
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        sys.exit(1)
+        return relname
 
+    def get_distribution_policy_info(self, tablename):
+        query = "select oid from pg_class where relname = '%s';" % 
tablename.split('.')[-1].lower()
+        rows = self.exec_query(query)
+        oid = rows[0]['oid']
+        query = "select * from gp_distribution_policy where localoid = '%s';" 
% oid
+        rows = self.exec_query(query)
+        return rows[0]['attrnums']
 
-def get_metadata_from_database(dburl, tablename, seg_name):
-    '''Get the metadata to be inserted from hdfs'''
-    try:
+    def get_metadata_from_database(self, tablename, seg_name):
         query = 'select segno from pg_aoseg.%s;' % seg_name
-        conn = dbconn.connect(dburl, False)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        sys.exit(1)
-
-    firstsegno = rows.rowcount + 1
-
-    try:
+        firstsegno = len(self.exec_query(query)) + 1
         # get the full path of correspoding file for target table
         query = ("select location, 
gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from 
pg_class, gp_persistent_relation_node, "
                  "gp_persistent_tablespace_node, gp_persistent_filespace_node 
where relname = '%s' and pg_class.relfilenode = "
                  "gp_persistent_relation_node.relfilenode_oid and 
gp_persistent_relation_node.tablespace_oid = 
gp_persistent_tablespace_node.tablespace_oid "
                  "and gp_persistent_filespace_node.filespace_oid = 
gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
-        conn = dbconn.connect(dburl, False)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        sys.exit(1)
-    for row in rows:
-        tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), 
str(row[3]), ''])
-    return firstsegno, tabledir
-
-
-def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
-    '''Check whether all the files refered by 'filepath' and the location 
corresponding to the table are in the same hdfs cluster'''
-    if not filepath:
-        return
-    # check whether the files to be registered is in hdfs
-    filesystem = filepath.split('://')
-    if filesystem[0] != 'hdfs':
-        logger.error('Only support to register file(s) in hdfs')
-        sys.exit(1)
-    fileroot = filepath.split('/')
-    tableroot = tabledir.split('/')
-    # check the root url of them. eg: for 
'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
-    if fileroot[0:3] != tableroot[0:3]:
-        logger.error("Files to be registered and the table are not in the same 
hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % 
(filepath, tabledir))
-        sys.exit(1)
+        D = self.exec_query(query)[0]
+        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), 
str(D['database_oid']), str(D['relfilenode']), ''])
+        return firstsegno, tabledir
+
+    def update_catalog(self, query):
+        self.conn.query(query)
+
+
+class HawqRegister(object):
+    def __init__(self, options, table, utility_conn, conn):
+        self.yml = options.yml_config
+        self.filepath = options.filepath
+        self.database = options.database
+        self.tablename = table
+        self.filesize = options.filesize
+        self.accessor = GpRegisterAccessor(conn)
+        self.utility_accessor = GpRegisterAccessor(utility_conn)
+        self.mode = self._init_mode(options.force, options.repair)
+        self._init()
+
+    def _init_mode(self, force, repair):
+        def table_existed():
+            return self.accessor.get_table_existed(self.tablename)
+
+        if self.yml:
+            if force:
+                return 'force'
+            elif repair:
+                if not table_existed():
+                    logger.error('--repair mode asserts the table is already 
create.')
+                    sys.exit(1)
+                return 'repair'
+            else:
+                return 'second_normal'
+        else:
+            return 'first'
 
+    def _init(self):
+        def check_hash_type():
+            self.accessor.check_hash_type(self.tablename)
 
-def get_files_in_hdfs(filepath):
-    '''Get all the files refered by 'filepath', which could be a file or a 
directory containing all the files'''
-    files = []
-    sizes = []
-    hdfscmd = "hdfs dfs -test -e %s" % filepath
-    result = local_ssh(hdfscmd, logger)
-    if result != 0:
-        logger.error("Path '%s' does not exist in hdfs" % filepath)
-        sys.exit(1)
-    hdfscmd = "hdfs dfs -ls -R %s" % filepath
-    result, out, err = local_ssh_output(hdfscmd)
-    outlines = out.splitlines()
-    # recursively search all the files under path 'filepath'
-    for line in outlines:
-        lineargs = line.split()
-        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
-            files.append(lineargs[7])
-            sizes.append(int(lineargs[4]))
-    if len(files) == 0:
-        logger.error("Dir '%s' is empty" % filepath)
-        sys.exit(1)
-    return files, sizes
-
-
-def check_parquet_format(files):
-    '''Check whether the file to be registered is parquet format'''
-    for f in files:
-        hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
-        rc, out, err = local_ssh_output(hdfscmd)
-        if out == '0':
-            continue
-        hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
-        result1 = local_ssh(hdfscmd, logger)
-        hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
-        result2 = local_ssh(hdfscmd, logger)
-        if result1 or result2:
-            logger.error('File %s is not parquet format' % f)
+        # check conflicting distributed policy
+        def check_distribution_policy():
+            if self.distribution_policy.startswith('DISTRIBUTED BY'):
+                if len(self.files) % self.bucket_number != 0:
+                    logger.error('Files to be registered must be multiple 
times to the bucket number of hash table.')
+                    sys.exit(1)
+
+        def create_table():
+            return self.accessor.do_create_table(self.tablename, self.schema, 
self.file_format, self.distribution_policy, self.file_locations, 
self.bucket_number)
+
+        def get_seg_name():
+            return self.utility_accessor.get_seg_name(self.tablename, 
self.database, self.file_format)
+
+        def get_metadata():
+            return self.accessor.get_metadata_from_database(self.tablename, 
self.seg_name)
+
+        if self.yml:
+            self.file_format, self.files, self.sizes, self.schema, 
self.distribution_policy, self.file_locations, self.bucket_number = 
option_parser_yml(self.yml)
+            self.filepath = self.files[0][:self.files[0].rfind('/')] if 
self.files else ''
+            check_distribution_policy()
+            if self.mode != 'force':
+                if not create_table():
+                    self.mode = 'second_exist'
+        else:
+            self.file_format = 'Parquet'
+            check_hash_type() # Usage1 only support randomly distributed table
+        if not self.filepath:
+            sys.exit(0)
+
+        if self.mode == 'repair':
+            # TODO
+            # check distribution policy consistency
+            # check bucketnum, pagesize, rowgroupsize, etc
+            # check filesize smaller
+            pass
+
+        self.seg_name = get_seg_name()
+        self.firstsegno, self.tabledir = get_metadata()
+
+        if self.mode == 'second_exist':
+            if self.tabledir.strip('/') == self.filepath.strip('/'):
+                logger.error('Files to be registeted in this case should not 
be the same with table path.')
+                sys.exit(1)
+
+        self.do_not_move, self.files_update, self.sizes_update = False, [], []
+        if self.mode == 'force':
+            existed_files, _ = self._get_files_in_hdfs(self.tabledir)
+            if len(self.files) == len(existed_files):
+                if sorted(self.files) != sorted(existed_files):
+                    logger.error('In this case, you should include previous 
table files.\nOtherwise you should drop the previous table before registering 
--force.')
+                    sys.exit(1)
+                else:
+                    self.do_not_move, self.files_update, self.sizes_update = 
True, self.files, self.sizes
+                    self.files, self.sizes = [], []
+            else:
+                files_old, sizes_old = [f for f in self.files], [sz for sz in 
self.sizes]
+                for k, f in enumerate(files_old):
+                    if f in existed_files:
+                        self.files_update.append(files_old[k])
+                        self.sizes_update.append(sizes_old[k])
+                        self.files.remove(files_old[k])
+                        self.sizes.remove(sizes_old[k])
+
+        self._check_files_and_table_in_same_hdfs_cluster(self.filepath, 
self.tabledir)
+
+        if not self.yml:
+            self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
+        print 'New file(s) to be registered: ', self.files
+        if self.files_update:
+            print 'Catalog info need to be updated for these files: ', 
self.files_update
+
+        if self.filesize:
+            if len(self.files) != 1:
+                logger.error('-e option is only supported with single file 
case.')
+                sys.exit(1)
+            self.sizes = [self.filesize]
+
+        if self.file_format == 'Parquet':
+            self._check_parquet_format(self.files)
+
+    def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir):
+        '''Check whether all the files refered by 'filepath' and the location 
corresponding to the table are in the same hdfs cluster'''
+        if not filepath:
+            return
+        # check whether the files to be registered is in hdfs
+        filesystem = filepath.split('://')
+        if filesystem[0] != 'hdfs':
+            logger.error('Only support to register file(s) in hdfs')
+            sys.exit(1)
+        fileroot = filepath.split('/')
+        tableroot = tabledir.split('/')
+        # check the root url of them. eg: for 
'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
+        if fileroot[0:3] != tableroot[0:3]:
+            logger.error("Files to be registered and the table are not in the 
same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % 
(filepath, tabledir))
             sys.exit(1)
 
+    def _get_files_in_hdfs(self, filepath):
+        '''Get all the files refered by 'filepath', which could be a file or a 
directory containing all the files'''
+        files, sizes = [], []
+        hdfscmd = "hdfs dfs -test -e %s" % filepath
+        result = local_ssh(hdfscmd, logger)
+        if result != 0:
+            logger.error("Path '%s' does not exist in hdfs" % filepath)
+            sys.exit(1)
+        hdfscmd = "hdfs dfs -ls -R %s" % filepath
+        result, out, err = local_ssh_output(hdfscmd)
+        outlines = out.splitlines()
+        # recursively search all the files under path 'filepath'
+        for line in outlines:
+            lineargs = line.split()
+            if len(lineargs) == 8 and lineargs[0].find("d") == -1:
+                files.append(lineargs[7])
+                sizes.append(int(lineargs[4]))
+        if len(files) == 0:
+            logger.error("Dir '%s' is empty" % filepath)
+            sys.exit(1)
+        return files, sizes
 
-def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, 
normal):
-    '''Move file(s) in src path into the folder correspoding to the target 
table'''
-    if normal:
-        segno = firstsegno
+    def _check_parquet_format(self, files):
+        '''Check whether the file to be registered is parquet format'''
         for f in files:
+            hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
+            rc, out, err = local_ssh_output(hdfscmd)
+            if out == '0':
+                continue
+            hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
+            result1 = local_ssh(hdfscmd, logger)
+            hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
+            result2 = local_ssh(hdfscmd, logger)
+            if result1 or result2:
+                logger.error('File %s is not parquet format' % f)
+                sys.exit(1)
+
+    def _move_files_in_hdfs(self):
+        '''Move file(s) in src path into the folder correspoding to the target 
table'''
+        segno = self.firstsegno
+        for f in self.files:
             srcfile = f
-            dstfile = tabledir + str(segno)
+            dstfile = self.tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
                 hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
@@ -310,190 +392,98 @@ def move_files_in_hdfs(databasename, tablename, files, 
firstsegno, tabledir, nor
                 if result != 0:
                     logger.error('Fail to move %s to %s' % (srcfile, dstfile))
                     sys.exit(1)
-    else:
-        segno = firstsegno
-        for f in files:
-            dstfile = f
-            srcfile = tabledir + str(segno)
-            segno += 1
-            if srcfile != dstfile:
-                hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
-                sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
-                result = local_ssh(hdfscmd, logger)
-                if result != 0:
-                    logger.error('Fail to move "%s" to "%s"' % (srcfile, 
dstfile))
-                    sys.exit(1)
-
 
-def insert_metadata_into_database(dburl, databasename, tablename, seg_name, 
firstsegno, tabledir, eofs, fmt):
-    '''Insert the metadata into database'''
-    try:
-        query = "set allow_system_table_mods='dml';"
-        if fmt == 'Parquet':
-            query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(seg_name, firstsegno, eofs[0], -1, -1)
-            for k, eof in enumerate(eofs[1:]):
-                query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, 
-1)
+    def _modify_metadata(self, mode):
+        if mode == 'insert':
+            eofs = self.sizes
+            query = "set allow_system_table_mods='dml';"
+            if self.file_format == 'Parquet':
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, self.firstsegno, eofs[0], -1, -1)
+                for k, eof in enumerate(eofs[1:]):
+                    query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, 
eof, -1, -1)
+            else:
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
+                for k, eof in enumerate(eofs[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 
1, eof, -1, -1, -1)
+            query += ';'
+        elif mode == 'update':
+            eofs = self.sizes_update
+            query = "set allow_system_table_mods='dml';"
+            query += "begin transaction;"
+            segno_lst = [f.split('/')[-1] for f in self.files_update]
+            for i, eof in enumerate(eofs):
+                query += "update pg_aoseg.%s set eof = '%s' where segno = 
'%s';" % (self.seg_name, eof, segno_lst[i])
+            query += "end transaction;"
+        else: # update_and_insert
+            eofs = self.sizes
+            query = "set allow_system_table_mods='dml';"
+            query += "begin transaction;"
+            if self.file_format == 'Parquet':
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, self.firstsegno, eofs[0], -1, -1)
+                for k, eof in enumerate(eofs[1:]):
+                    query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, 
eof, -1, -1)
+            else:
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
+                for k, eof in enumerate(eofs[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 
1, eof, -1, -1, -1)
+            query += ';'
+
+            segno_lst = [f.split('/')[-1] for f in self.files_update]
+            for i, eof in enumerate(self.sizes_update):
+                query += "update pg_aoseg.%s set eof = '%s' where segno = 
'%s';" % (self.seg_name, eof, segno_lst[i])
+            query += "end transaction;"
+        return self.utility_accessor.update_catalog(query)
+
+    def register(self):
+        if not self.do_not_move:
+            self._move_files_in_hdfs()
+        if (not self.do_not_move) and self.mode == 'force':
+            self._modify_metadata('update_and_insert')
         else:
-            query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % 
(seg_name, firstsegno, eofs[0], -1, -1, -1)
-            for k, eof in enumerate(eofs[1:]):
-                query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, 
-1, -1, -1)
-        query += ';'
-        conn = dbconn.connect(dburl, True)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        move_files_in_hdfs(databasename, tablename, files, firstsegno, 
tabledir, False)
-        sys.exit(1)
-
-def update_metadata_into_database(dburl, seg_name, files, eofs):
-    '''Update the catalog table in --force case'''
-    try:
-        query = "set allow_system_table_mods='dml';"
-        query += "begin transaction;"
-        segno_lst = [f.split('/')[-1] for f in files]
-        for i, eof in enumerate(eofs):
-            query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % 
(seg_name, eof, segno_lst[i])
-        query += "end transaction;"
-        conn = dbconn.connect(dburl, True)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        sys.exit(1)
-
-
-def update_insert_metadata_into_database(dburl, database, tablename, seg_name, 
firstsegno, tabledir, eofs, fmt, update_files, update_eofs):
-    '''Insert and update the catalog table in --force case'''
+            if self.mode == 'force':
+                self._modify_metadata('update')
+            else:
+                self._modify_metadata('insert')
+        logger.info('Hawq Register Succeed.')
+
+
+def main(options, args):
+    def connectdb(options):
+        '''
+        Trying to connect database, return a connection object.
+        If failed to connect, raise a pg.InternalError
+        '''
+        url = dbconn.DbURL(hostname=options.host, port=options.port,
+                           dbname=options.database, username=options.user)
+        logger.info('try to connect database %s:%s %s' % (url.pghost, 
url.pgport, url.pgdb))
+        utility_conn = pg.connect(dbname=url.pgdb, host=url.pghost, 
port=url.pgport,
+                                  user=url.pguser, passwd=url.pgpass, opt='-c 
gp_session_role=utility')
+        conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport,
+                          user=url.pguser, passwd=url.pgpass)
+        return utility_conn, conn
+
+    # connect db
     try:
-        query = "set allow_system_table_mods='dml';"
-        query += "begin transaction;"
-        if fmt == 'Parquet':
-            query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(seg_name, firstsegno, eofs[0], -1, -1)
-            for k, eof in enumerate(eofs[1:]):
-                query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, 
-1)
-        else:
-            query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % 
(seg_name, firstsegno, eofs[0], -1, -1, -1)
-            for k, eof in enumerate(eofs[1:]):
-                query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, 
-1, -1, -1)
-        query += ';'
-
-        segno_lst = [f.split('/')[-1] for f in update_files]
-        for i, eof in enumerate(update_eofs):
-            query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % 
(seg_name, eof, segno_lst[i])
-        query += "end transaction;"
-        conn = dbconn.connect(dburl, True)
-        rows = dbconn.execSQL(conn, query)
-        conn.commit()
-        conn.close()
-    except DatabaseError, ex:
-        logger.error('Failed to execute query "%s"' % query)
-        move_files_in_hdfs(databasename, tablename, files, firstsegno, 
tabledir, False)
-        sys.exit(1)
+        utility_conn, conn = connectdb(options)
+    except pg.InternalError:
+        logger.error('Fail to connect to database, this script can only be run 
when database is up.')
+        return 1
+    # register
+    ins = HawqRegister(options, args[0], utility_conn, conn)
+    ins.register()
+    conn.close()
 
 
 if __name__ == '__main__':
-
     parser = option_parser()
     options, args = parser.parse_args()
-
-    if len(args) != 1 or ((options.yml_config or options.force or 
options.repair) and options.filepath) or (options.force and options.repair):
+    if len(args) != 1 or (options.force and options.repair):
         parser.print_help(sys.stderr)
         sys.exit(1)
-    if local_ssh('hdfs', logger):
-        logger.error('command "hdfs" is not available.')
+    if (options.yml_config or options.force or options.repair) and 
options.filepath:
+        parser.print_help(sys.stderr)
         sys.exit(1)
-
-    dburl = dbconn.DbURL(hostname = options.host, port = options.port, 
username = options.user, dbname = options.database)
-    filepath, database, tablename = options.filepath, options.database, args[0]
-
-    second_normal_mode, second_exist_mode, force_mode, repair_mode = False, 
False, False, False
-    if options.yml_config: # Usage2
-        if options.force:
-            force_mode = True
-        elif options.repair:
-            repair_mode = True
-        else:
-            second_normal_mode = True
-        fileformat, files, sizes, schema, distribution_policy, file_locations, 
bucket_number = option_parser_yml(options.yml_config)
-        filepath = files[0][:files[0].rfind('/')] if files else ''
-        # check conflicting distributed policy
-        if distribution_policy.startswith('DISTRIBUTED BY'):
-            if len(files) % bucket_number != 0:
-                logger.error('Files to be registered must be multiple times to 
the bucket number of hash table.')
-                sys.exit(1)
-        if not force_mode:
-            if not create_table(dburl, tablename, schema, fileformat, 
distribution_policy, file_locations, bucket_number):
-                second_normal_mode, second_exist_mode = False, True
-    else:
-        fileformat = 'Parquet'
-        check_hash_type(dburl, tablename) # Usage1 only support randomly 
distributed table
-
-    if repair_mode:
-        # check distribution policy consistency
-        # check bucketnum, pagesize, rowgroupsize, etc
-        # check filesize smaller
-        pass
-
-    # check filepath
-    if not filepath:
-        sys.exit(0)
-
-    seg_name = get_seg_name(dburl, tablename, database, fileformat)
-    firstsegno, tabledir = get_metadata_from_database(dburl, tablename, 
seg_name)
-
-    if second_exist_mode:
-        if tabledir.strip('/') == filepath.strip('/'):
-            logger.error('Files to be registered in this case should not be 
the same with table path.')
-            sys.exit(1)
-
-    do_not_move, files_update, sizes_update = False, [], []
-    if force_mode:
-        existed_files, _ = get_files_in_hdfs(tabledir)
-        if len(files) == len(existed_files):
-            if sorted(files) != sorted(existed_files):
-                logger.error('In this case, you should include previous table 
files.\nOtherwise you should drop the previous table before registering 
--force.')
-                sys.exit(1)
-            else:
-                do_not_move, files_update, sizes_update = True, files, sizes
-                files, sizes = [], []
-        else:
-            files_old, sizes_old = [f for f in files], [sz for sz in sizes]
-            for k, f in enumerate(files_old):
-                if f in existed_files:
-                    files_update.append(files_old[k])
-                    sizes_update.append(sizes_old[k])
-                    files.remove(files_old[k])
-                    sizes.remove(sizes_old[k])
-
-    check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
-
-    if not options.yml_config:
-        files, sizes = get_files_in_hdfs(filepath)
-    print 'New file(s) to be registered:', files
-    if files_update:
-        print 'Files(s) catalog info need to be update:', files_update
-
-    # set specified eofs
-    if options.filesize:
-        if len(files) != 1:
-            logger.error('-e option is only supported with single file case.')
-            sys.exit(1)
-        sizes = [options.filesize]
-
-    if fileformat == 'Parquet':
-        check_parquet_format(files)
-    if not do_not_move:
-        move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, 
True)
-
-    if (not do_not_move) and force_mode:
-        update_insert_metadata_into_database(dburl, database, tablename, 
seg_name, firstsegno, tabledir, sizes, fileformat, files_update, sizes_update)
-    else:
-        if force_mode:
-            update_metadata_into_database(dburl, seg_name, files_update, 
sizes_update)
-        else:
-            insert_metadata_into_database(dburl, database, tablename, 
seg_name, firstsegno, tabledir, sizes, fileformat)
-
-    logger.info('Hawq Register Succeed.')
+    if local_ssh('hdfs'):
+        logger.error('Command "hdfs" is not available.')
+        sys.exit(1)
+    main(options, args)


Reply via email to