Github user xunzhang commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/904#discussion_r79295679 --- Diff: tools/bin/hawqregister --- @@ -126,182 +127,319 @@ def option_parser_yml(yml_file): return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'] -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number): - try: - query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - for row in rows: - if row[0] != 0: - return False - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) - - try: +class GpRegisterAccessor(object): + def __init__(self, conn): + self.conn = conn + rows = self.exec_query(""" + SELECT oid, datname, dat2tablespace, + pg_encoding_to_char(encoding) encoding + FROM pg_database WHERE datname=current_database()""") + self.dbid = rows[0]['oid'] + self.dbname = rows[0]['datname'] + self.spcid = rows[0]['dat2tablespace'] + self.dbencoding = rows[0]['encoding'] + self.dbversion = self.exec_query('select version()')[0]['version'] + + def exec_query(self, sql): + '''execute query and return dict result''' + return self.conn.query(sql).dictresult() + + def get_table_existed(self, tablename): + qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower() + return self.exec_query(qry)[0]['count'] == 1 + + def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number): + if self.get_table_existed(tablename): + return False schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info]) fmt = 'ROW' if fmt == 'AO' else fmt if fmt == 'ROW': query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;' - % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy)) + % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy)) else: # Parquet query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;' - % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy)) - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() + % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy)) + self.conn.query(query) return True - except DatabaseError, ex: - print DatabaseError, ex - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) + def check_hash_type(self, tablename): + qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename + rows = self.exec_query(qry) + if len(rows) == 0: + logger.error('Table %s not found in table gp_distribution_policy.' % tablename) --- End diff -- sure
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---