Repository: incubator-hawq
Updated Branches:
  refs/heads/master 7ab73156f -> 45adcbdb3


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/45adcbdb/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 492b151..67a188d 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -123,6 +123,25 @@ def register_yaml_dict_check(D, table_column_num, 
src_tablename):
         logger.error('Column number of table in yaml file is not equals to the 
column number of table %s.' % src_tablename)
         sys.exit(1)
 
+def ispartition(yml_file):
+    import yaml
+    try:
+        with open(yml_file, 'r') as f:
+            params = yaml.load(f)
+    except yaml.scanner.ScannerError as e:
+        print e
+        sys.exit(1)
+
+    if params['FileFormat'].lower() == 'parquet':
+        Format = 'Parquet'
+    else: #AO format
+        Format = 'AO'
+    Format_FileLocations = '%s_FileLocations' % Format
+    if params.get(Format_FileLocations):
+        partitionby = params.get(Format_FileLocations).get('PartitionBy')
+        if partitionby:
+            return True
+    return False
 
 class FailureHandler(object):
     def __init__(self, conn):
@@ -256,6 +275,23 @@ class GpRegisterAccessor(object):
         rows = self.exec_query(query)
         return rows[0]['attrnums']
 
+    def get_partition_info(self, tablename):
+        ''' Get partition information from pg_partitions, return a 
constraint-tablename dictionary '''
+        query = "SELECT partitiontablename, partitionboundary FROM 
pg_partitions WHERE tablename = '%s'" % tablename
+        return self.exec_query(query)
+
+    def get_partitionby(self, tablename):
+        query = "SELECT partitionschemaname, partitiontablename, 
partitionname, partitiontype, parentpartitiontablename, partitionboundary FROM 
pg_partitions WHERE tablename='%s';" % tablename
+        parition_type = self.exec_query(query)[0]['partitiontype']
+        query = "SELECT columnname, partitionlevel FROM pg_partition_columns 
WHERE tablename='%s' ORDER BY position_in_partition_key;" % tablename
+        partition_columnname = self.exec_query(query)[0]['columnname']
+        partitionby = 'PARTITION BY %s (%s)' % (parition_type, 
partition_columnname)
+        return partitionby
+
+    def get_partition_num(self, tablename):
+        query = "SELECT partitionschemaname from pg_partitions WHERE 
tablename='%s';" % tablename
+        return len(self.exec_query(query))
+
     def get_bucket_number(self, tablename):
         query = "select oid from pg_class where relname = '%s';" % 
tablename.split('.')[-1].lower()
         rows = self.exec_query(query)
@@ -307,7 +343,13 @@ class HawqRegister(object):
         self.mode = self._init_mode(options.force, options.repair)
         self.srcfiles = []
         self.dstfiles = []
-        self._init()
+        self.files_same_path = []
+        self.sizes_same_path = []
+        self.segnos_same_path = []
+        self.tupcounts_same_path = []
+        self.varblockcounts_same_path = []
+        self.eofuncompresseds_same_path = []
+        self.segnos_same_path = []
 
     def _init_mode(self, force, repair):
         def table_existed():
@@ -374,7 +416,8 @@ class HawqRegister(object):
 
     def _set_yml_data(self, file_format, files, sizes, tupcounts, 
eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, 
file_locations,\
                       bucket_number, partitionby, partitions_constraint, 
partitions_name, partitions_compression_level,\
-                      partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, encoding):
+                      partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, \
+                      partitions_tupcounts, partitions_eofuncompresseds, 
partitions_varblockcounts, encoding):
         self.file_format = file_format
         self.files = files
         self.sizes = sizes
@@ -394,6 +437,9 @@ class HawqRegister(object):
         self.partitions_checksum = partitions_checksum
         self.partitions_filepaths = partitions_filepaths
         self.partitions_filesizes = partitions_filesizes
+        self.partitions_tupcounts = partitions_tupcounts
+        self.partitions_eofuncompresseds = partitions_eofuncompresseds
+        self.partitions_varblockcounts = partitions_varblockcounts
         self.encoding = encoding
 
     def _option_parser_yml(self, yml_file):
@@ -414,6 +460,9 @@ class HawqRegister(object):
         partitions_checksum = []
         partitions_compression_level = []
         partitions_compression_type = []
+        partitions_tupcounts = []
+        partitions_eofuncompresseds = []
+        partitions_varblockcounts = []
         files, sizes, tupcounts, eofuncompresseds, varblockcounts = [], [], 
[], [], []
 
         if params['FileFormat'].lower() == 'parquet':
@@ -422,9 +471,6 @@ class HawqRegister(object):
             Format = 'AO'
         Format_FileLocations = '%s_FileLocations' % Format
         partitionby = params.get(Format_FileLocations).get('PartitionBy')
-        if partitionby:
-            logger.info('Partition table is not supported in current release 
of hawq register.')
-            sys.exit(0)
         if params.get(Format_FileLocations).get('Partitions') and 
len(params[Format_FileLocations]['Partitions']):
             partitions_checksum = [d['Checksum'] for d in 
params[Format_FileLocations]['Partitions']]
             partitions_compression_level = [d['CompressionLevel'] for d in 
params[Format_FileLocations]['Partitions']]
@@ -435,6 +481,9 @@ class HawqRegister(object):
                 for pfile in partitions_files:
                     partitions_filepaths.append([params['DFS_URL'] + 
item['path'] for item in pfile])
                     partitions_filesizes.append([item['size'] for item in 
pfile])
+                    partitions_tupcounts.append([item['tupcount'] if 
item.has_key('tupcount') else -1 for item in pfile])
+                    
partitions_eofuncompresseds.append([item['eofuncompressed'] if 
item.has_key('eofuncompressed') else -1 for item in pfile])
+                    partitions_varblockcounts.append([item['varblockcount'] if 
item.has_key('varblockcount') else -1 for item in pfile])
             partitions_name = [d['Name'] for d in 
params[Format_FileLocations]['Partitions']]
         if len(params[Format_FileLocations]['Files']):
             for ele in params[Format_FileLocations]['Files']:
@@ -446,7 +495,7 @@ class HawqRegister(object):
 
         encoding = params['Encoding']
         bucketNum = params['Bucketnum'] if 
params['Distribution_Policy'].startswith('DISTRIBUTED BY') else 6
-        self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, 
varblockcounts, params['TableName'], params['%s_Schema' % Format], 
params['Distribution_Policy'], params[Format_FileLocations], bucketNum, 
partitionby, partitions_constraint, partitions_name, 
partitions_compression_level, partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, encoding)
+        self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, 
varblockcounts, params['TableName'], params['%s_Schema' % Format], 
params['Distribution_Policy'], params[Format_FileLocations], bucketNum, 
partitionby, partitions_constraint, partitions_name, 
partitions_compression_level, partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, partitions_tupcounts, 
partitions_eofuncompresseds, partitions_varblockcounts, encoding)
 
 
     # check conflicting distributed policy
@@ -528,13 +577,7 @@ class HawqRegister(object):
                     self.failure_handler.rollback()
                     sys.exit(1)
 
-    def _init(self):
-        self.files_same_path = []
-        self.sizes_same_path = []
-        self.tupcounts_same_path = []
-        self.varblockcounts_same_path = []
-        self.eofuncompresseds_same_path = []
-        self.segnos_same_path = []
+    def prepare(self):
         if self.yml:
             self._option_parser_yml(options.yml_config)
             self.filepath = self.files[0][:self.files[0].rfind('/')] if 
self.files else ''
@@ -673,6 +716,27 @@ class HawqRegister(object):
         if self.file_format == 'Parquet':
             self._check_parquet_format(self.files)
 
+    def test_set_move_files_in_hdfs(self):
+        ''' Output of print shoud be:
+        self.files_update = ['1', '2', '3']
+        self.files_same_path = ['5', '6', 'a']
+        self.srcfiles=['5', '6', 'a', '1', '2', '3']
+        self.dstfiles=['5', '6', '4', '7' , '8', '9']
+        '''
+        self.firstsegno = 4
+        self.files_update = ['1', '2', '3', '5', '6', 'a']
+        self.sizes_update = [1, 2, 3, 4, 5, 6]
+        self.files_append = ['1', '2', '3']
+        self.tupcounts_update = [1, 2, 3, 4, 5, 6]
+        self.eofuncompresseds_update = [1, 2, 3, 4, 5, 6]
+        self.varblockcounts_update = [1, 2, 3, 4, 5, 6]
+        self.tabledir = ''
+        self._set_move_files_in_hdfs()
+        print self.files_update
+        print self.files_same_path
+        print self.srcfiles
+        print self.dstfiles
+
     def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir):
         '''Check whether all the files refered by 'filepath' and the location 
corresponding to the table are in the same hdfs cluster'''
         if not filepath:
@@ -915,6 +979,88 @@ class HawqRegister(object):
         logger.info('Hawq Register Succeed.')
 
 
+class HawqRegisterPartition(HawqRegister):
+    def __init__(self, options, table, utility_conn, conn, failure_handler):
+        HawqRegister.__init__(self, options, table, utility_conn, conn, 
failure_handler)
+
+    def _get_partition_info(self):
+        dic = {}
+        for ele in 
self.accessor.get_partition_info(self.dst_table_name.split('.')[-1]):
+            dic[ele['partitionboundary']] = ele['partitiontablename']
+        return dic
+
+    def _check_partitionby(self):
+        def get_partitionby():
+            return 
self.accessor.get_partitionby(self.dst_table_name.split('.')[-1])
+
+        if self.partitionby != get_partitionby():
+            logger.error('PartitionBy of %s is not consistent with previous 
partitionby.' % self.tablename)
+            self.failure_handler.rollback()
+            sys.exit(1)
+
+    def _check_partition_num(self):
+        def get_partition_num():
+            return 
self.accessor.get_partition_num(self.dst_table_name.split('.')[-1])
+
+        if get_partition_num() < len(self.partitions_name):
+            logger.error('Partition Number of %s is not consistent with 
previous partition number.' % self.tablename)
+            self.failure_handler.rollback()
+            sys.exit(1)
+
+    def _check_duplicate_constraint(self):
+        partitions_constraint = sorted(self.partitions_constraint)
+        for k, _ in enumerate(partitions_constraint):
+            if k < len(partitions_constraint) - 1 and partitions_constraint[k] 
== partitions_constraint[k+1]:
+                logger.error('Partition Constraint "%s" in table %s is 
duplicated' % (partitions_constraint[k], self.tablename))
+                self.failure_handler.rollback()
+                sys.exit(1)
+
+    def prepare(self):
+        if self.yml:
+            self._option_parser_yml(options.yml_config)
+        else:
+            if self._is_folder(self.filepath) and self.filesize:
+                logger.error('-e option is only supported with single file 
case.')
+                sys.exit(1)
+            self.file_format = 'Parquet'
+            self._check_hash_type() # Usage1 only support randomly distributed 
table
+        for k, pn in enumerate(self.partitions_name):
+            self.tablename = pn
+            self.files = self.partitions_filepaths[k]
+            self.sizes = self.partitions_filesizes[k]
+            if self.yml:
+                self.filepath = self.files[0][:self.files[0].rfind('/')] if 
self.files else ''
+                self._check_file_not_folder()
+        if self.yml:
+            self._check_database_encoding()
+            if self.mode != 'repair':
+                if not self._create_table() and self.mode != 'force':
+                    self.mode = 'usage2_table_exist'
+                    self._check_partitionby()
+                    self._check_partition_num()
+        partitions = self._get_partition_info()
+        self.queries = "set allow_system_table_mods='dml';"
+        self.queries += "begin transaction;"
+        self._check_duplicate_constraint()
+        for k, pn in enumerate(self.partitions_name):
+            self.constraint = self.partitions_constraint[k]
+            if not partitions.has_key(self.constraint):
+                logger.error('Partition Constraint "%s" is not in table %s' % 
(self.constraint, self.tablename))
+                self.failure_handler.rollback()
+                sys.exit(1)
+            self.tablename = partitions[self.constraint]
+            self.files = self.partitions_filepaths[k]
+            self.sizes = self.partitions_filesizes[k]
+            self.tupcounts = self.partitions_tupcounts[k]
+            self.eofuncompresseds = self.partitions_eofuncompresseds[k]
+            self.varblockcounts = self.partitions_varblockcounts[k]
+            self._do_check()
+            self._prepare_register()
+        self.queries += "end transaction;"
+
+    def register(self):
+        HawqRegister.register(self)
+
 def main(options, args):
     def connectdb(options):
         '''
@@ -939,10 +1085,41 @@ def main(options, args):
 
     failure_handler = FailureHandler(conn)
     # register
-    ins = HawqRegister(options, args[0], utility_conn, conn, failure_handler)
+    if options.yml_config and ispartition(options.yml_config):
+        ins = HawqRegisterPartition(options, args[0], utility_conn, conn, 
failure_handler)
+    else:
+        ins = HawqRegister(options, args[0], utility_conn, conn, 
failure_handler)
+    ins.prepare()
     ins.register()
     conn.close()
 
+def test(options, args):
+    def connectdb(options):
+        '''
+        Trying to connect database, return a connection object.
+        If failed to connect, raise a pg.InternalError
+        '''
+        url = dbconn.DbURL(hostname=options.host, port=options.port,
+                           dbname=options.database, username=options.user)
+        logger.info('try to connect database %s:%s %s' % (url.pghost, 
url.pgport, url.pgdb))
+        utility_conn = pg.connect(dbname=url.pgdb, host=url.pghost, 
port=url.pgport,
+                                  user=url.pguser, passwd=url.pgpass, opt='-c 
gp_session_role=utility')
+        conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport,
+                          user=url.pguser, passwd=url.pgpass)
+        return utility_conn, conn
+
+    # connect db
+    try:
+        utility_conn, conn = connectdb(options)
+    except pg.InternalError:
+        logger.error('Fail to connect to database, this script can only be run 
when database is up.')
+        return 1
+
+    failure_handler = FailureHandler(conn)
+    # register
+    ins = HawqRegister(options, args[0], utility_conn, conn, failure_handler)
+    ins.test_set_move_files_in_hdfs()
+
 
 if __name__ == '__main__':
     parser = option_parser()

Reply via email to