Github user xunzhang commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/846#discussion_r75244483
--- Diff: tools/bin/hawqregister ---
@@ -40,186 +39,195 @@ EXECNAME = os.path.split(__file__)[-1]
setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
-def create_opt_parser(version):
+def option_parser():
parser = OptParser(option_class=OptChecker,
- usage='usage: %prog [options] database_name
table_name file_or_dir_path_in_hdfs',
- version=version)
+ usage='usage: %prog [options] table_name',
+ version='%prog version $Revision: #1 $')
parser.remove_option('-h')
parser.add_option('-?', '--help', action='help')
- parser.add_option('-h', '--host', help="host of the target DB")
- parser.add_option('-p', '--port', help="port of the target DB",
type='int', default=0)
- parser.add_option('-U', '--user', help="username of the target DB")
- return parser
-
-
-def check_hadoop_command():
- hdfscmd = "hadoop"
- result = local_ssh(hdfscmd);
- if result != 0:
- logger.error("command 'hadoop' is not available, please set
environment variable $PATH to fix this")
+ parser.add_option('-h', '--host', help='host of the target DB')
+ parser.add_option('-p', '--port', help='port of the target DB',
type='int', default=0)
+ parser.add_option('-U', '--user', help='username of the target DB')
+ parser.add_option('-d', '--database', default = 'postgres', dest =
'database', help='database name')
+ parser.add_option('-f', '--filepath', dest = 'filepath', help='file
name in HDFS')
+ parser.add_option('-c', '--config', dest = 'yml_config', default = '',
help='configuration file in YAML format')
+ return parser.parse_args()
+
+
+def option_parser_yml(yml_file):
+ import yaml
+ with open(yml_file, 'r') as f:
+ params = yaml.load(f)
+ if params['FileFormat'] == 'Parquet':
+ offset =
params['Parquet_FileLocations']['Files'][0]['path'].rfind('/')
+ filepath = params['DFS_URL'] +
params['Parquet_FileLocations']['Files'][0]['path'][:offset] if
len(params['Parquet_FileLocations']['Files']) != 1 else params['DFS_URL'] +
params['Parquet_FileLocations']['Files'][0]['path']
+ return 'Parquet', filepath, params['Parquet_Schema'],
params['Distribution_Policy']
+ offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/')
+ filepath = params['DFS_URL'] +
params['AO_FileLocations']['Files'][0]['path'][:offset] if
len(params['AO_FileLocations']['Files']) != 1 else params['DFS_URL'] +
params['AO_FileLocations']['Files'][0]['path']
+ return 'AO', filepath, params['AO_Schema'],
params['Distribution_Policy']
+
+
+def create_table(dburl, tablename, schema_info, fmt, distrbution_policy):
+ try:
+ schema = ','.join([k['name'] + ' ' + k['type'] for k in
schema_info])
+ fmt = 'ROW' if fmt == 'AO' else fmt
+ query = 'create table %s(%s) with (appendonly=true,
orientation=%s) %s;' % (tablename, schema, fmt, distrbution_policy)
+ conn = dbconn.connect(dburl, False)
+ rows = dbconn.execSQL(conn, query)
+ conn.commit()
+ except DatabaseError, ex:
+ logger.error('Failed to execute query ""%s"' % query)
sys.exit(1)
-def get_seg_name(options, databasename, tablename):
+def get_seg_name(dburl, tablename, database, fmt):
try:
- relfilenode = 0
- relname = ""
- query = ("select pg_class2.relname from pg_class as pg_class1,
pg_appendonly, pg_class as pg_class2 where pg_class1.relname ='%s' "
- "and pg_class1.oid = pg_appendonly.relid and
pg_appendonly.segrelid = pg_class2.oid;") % tablename
- dburl = dbconn.DbURL(hostname=options.host, port=options.port,
username=options.user, dbname=databasename)
+ relname = ''
+ tablename = tablename.split('.')[-1]
+ query = ("select pg_class2.relname from pg_class as pg_class1,
pg_appendonly, pg_class as pg_class2 "
+ "where pg_class1.relname ='%s' and pg_class1.oid =
pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
conn = dbconn.connect(dburl, True)
rows = dbconn.execSQL(conn, query)
- conn.commit()
- if rows.rowcount == 0:
- logger.error("table '%s' not found in db '%s'" % (tablename,
databasename));
+ conn.commit()
+ if not rows.rowcount:
+ logger.error('table "%s" not found in db "%s"' % (tablename,
database))
sys.exit(1)
for row in rows:
relname = row[0]
conn.close()
-
except DatabaseError, ex:
- logger.error("Failed to connect to database, this script can only
be run when the database is up")
- logger.error("host = %s, port = %d, user = %s, dbname = %s, query
= %s" % (options.host, options.port, options.user, databasename, query))
- sys.exit(1)
-
- # check whether the target table is parquet format
- if relname.find("paq") == -1:
- logger.error("table '%s' is not parquet format" % tablename)
+ logger.error('Failed to run query "%s" with dbname "%s"' % (query,
database))
sys.exit(1)
+ if fmt == 'Parquet':
+ if relname.find("paq") == -1:
+ logger.error("table '%s' is not parquet format" % tablename)
+ sys.exit(1)
return relname
-def check_hash_type(options, databasename, tablename):
+def check_hash_type(dburl, tablename):
+ '''Check whether target table is hash-typed, in that case simple
insertion does not work'''
try:
query = "select attrnums from gp_distribution_policy, pg_class
where pg_class.relname = '%s' and pg_class.oid =
gp_distribution_policy.localoid;" % tablename
- dburl = dbconn.DbURL(hostname=options.host, port=options.port,
username=options.user, dbname=databasename)
conn = dbconn.connect(dburl, False)
rows = dbconn.execSQL(conn, query)
- conn.commit()
- if rows.rowcount == 0:
- logger.error("target not found in table
gp_distribution_policy")
+ conn.commit()
+ if not rows.rowcount:
+ logger.error('Target not found in table
gp_distribution_policy.')
sys.exit(1)
for row in rows:
- if row[0] != None:
- logger.error("Cannot register file(s) to a table which is
hash-typed")
+ if row[0]:
+ logger.error('Cannot register file(s) to a table which is
hash-typed.')
sys.exit(1)
-
conn.close()
-
except DatabaseError, ex:
- logger.error("Failed to connect to database, this script can only
be run when the database is up")
- logger.error("host = %s, port = %d, user = %s, dbname = %s, query
= %s" % (options.host, options.port, options.user, databasename, query))
+ logger.error('Failed to execute query "%s"' % query)
sys.exit(1)
-def get_metadata_from_database(options, databasename, tablename, seg_name):
+def get_metadata_from_database(dburl, tablename, seg_name):
+ '''Get the metadata to be inserted from hdfs'''
try:
- query = "select segno from pg_aoseg.%s;" % seg_name
- dburl = dbconn.DbURL(hostname=options.host, port=options.port,
username=options.user, dbname=databasename)
+ query = 'select segno from pg_aoseg.%s;' % seg_name
conn = dbconn.connect(dburl, False)
rows = dbconn.execSQL(conn, query)
- conn.commit()
+ conn.commit()
conn.close()
-
except DatabaseError, ex:
- logger.error("Failed to connect to database, this script can only
be run when the database is up")
- logger.error("host = %s, port = %d, user = %s, dbname = %s, query
= %s" % (options.host, options.port, options.user, databasename, query))
+ logger.error('Failed to execute query "%s"' % query)
sys.exit(1)
firstsegno = rows.rowcount + 1
- # get the full path of correspoding file for target table
try:
+ # get the full path of correspoding file for target table
query = ("select location,
gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from
pg_class, gp_persistent_relation_node, "
- "gp_persistent_tablespace_node,
gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
- "gp_persistent_relation_node.relfilenode_oid and
gp_persistent_relation_node.tablespace_oid =
gp_persistent_tablespace_node.tablespace_oid "
- "and gp_persistent_filespace_node.filespace_oid =
gp_persistent_filespace_node.filespace_oid;") % tablename
- dburl = dbconn.DbURL(hostname=options.host, port=options.port,
username=options.user, dbname=databasename)
+ "gp_persistent_tablespace_node, gp_persistent_filespace_node
where relname = '%s' and pg_class.relfilenode = "
+ "gp_persistent_relation_node.relfilenode_oid and
gp_persistent_relation_node.tablespace_oid =
gp_persistent_tablespace_node.tablespace_oid "
+ "and gp_persistent_filespace_node.filespace_oid =
gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
conn = dbconn.connect(dburl, False)
rows = dbconn.execSQL(conn, query)
- conn.commit()
+ conn.commit()
conn.close()
-
except DatabaseError, ex:
- logger.error("Failed to connect to database, this script can only
be run when the database is up")
- logger.error("host = %s, port = %d, user = %s, dbname = %s, query
= %s" % (options.host, options.port, options.user, databasename, query))
+ logger.error('Failed to execute query "%s"' % query)
sys.exit(1)
-
for row in rows:
tabledir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2])
+ "/" + str(row[3]) + "/"
-
+ #tabledir = '/'.join([row[0], str(row[1]), str(row[2]),
str(row[3]), ''])
--- End diff --
I will remove L157 and remain L158.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---