Repository: incubator-hawq Updated Branches: refs/heads/master ec098032e -> 2b50f7fee
HAWQ-760. Add hawq register function and tests Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2b50f7fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2b50f7fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2b50f7fe Branch: refs/heads/master Commit: 2b50f7fee9a71ade3ca0e569e50562dde9a64bbc Parents: ec09803 Author: Yancheng Luo <[email protected]> Authored: Thu Jun 2 10:13:30 2016 +0800 Committer: Lili Ma <[email protected]> Committed: Thu Jun 2 10:28:56 2016 +0800 ---------------------------------------------------------------------- src/backend/cdb/cdbparquetfooterserializer.c | 129 ++++++--- src/backend/cdb/cdbparquetstoragewrite.c | 5 +- src/backend/executor/execMain.c | 5 +- src/include/cdb/cdbparquetstoragewrite.h | 5 + .../testhawqregister/test-hawq-register.cpp | 149 ++++++++++ src/test/feature/testhawqregister/test_hawq.paq | Bin 0 -> 657 bytes src/test/feature/testhawqregister/test_hive.paq | Bin 0 -> 212 bytes src/test/feature/testhawqregister/test_not_paq | Bin 0 -> 48 bytes tools/bin/hawq | 3 + tools/bin/hawqregister | 272 +++++++++++++++++++ tools/doc/hawqregister_help | 135 +++++++++ 11 files changed, 669 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/cdb/cdbparquetfooterserializer.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbparquetfooterserializer.c b/src/backend/cdb/cdbparquetfooterserializer.c index 802793f..b142600 100644 --- a/src/backend/cdb/cdbparquetfooterserializer.c +++ b/src/backend/cdb/cdbparquetfooterserializer.c @@ -25,6 +25,7 @@ */ #include "cdb/cdbparquetfooterserializer.h" +#include "cdb/cdbparquetstoragewrite.h" #include "access/parquetmetadata_c++/MetadataInterface.h" #include "lib/stringinfo.h" #include "postgres.h" @@ -245,7 +246,21 @@ readParquetFileMetadata( break; } break; + case 5: + /* Skip this optional field now */ + if (ftype == T_LIST) { + xfer += skipType(prot, ftype); + } + break; + case 6: + /* Skip this optional field now */ + if (ftype == T_STRING) { + xfer += skipType(prot, ftype); + } + break; default: + ereport(ERROR, + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata field not recognized with fid: %d", fid))); break; } @@ -491,7 +506,27 @@ readSchemaElement_Single( xfer += skipType(prot, ftype); } break; + case 7: + /* Skip this optional field now */ + if (ftype == T_I32) { + xfer += skipType(prot, ftype); + } + break; + case 8: + /* Skip this optional field now */ + if (ftype == T_I32) { + xfer += skipType(prot, ftype); + } + break; + case 9: + /* Skip this optional field now */ + if (ftype == T_I32) { + xfer += skipType(prot, ftype); + } + break; default: + ereport(ERROR, + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: schema element field not recognizd with fid: %d", fid))); break; } } @@ -567,7 +602,15 @@ readRowGroupInfo( isset_num_rows = true; } break; + case 4: + /* Skip this optional field now */ + if (ftype == T_LIST) { + xfer += skipType(prot, ftype); + } + break; default: + ereport(ERROR, + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: row group field not recognized with fid: %d", fid))); break; } } @@ -633,6 +676,8 @@ readColumnChunk( } break; default: + ereport(ERROR, + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: column chunk field not recognized with fid: %d", fid))); break; } } @@ -773,7 +818,21 @@ readColumnMetadata( xfer += skipType(prot, ftype); } break; + case 12: + /* Skip this optional field now */ + if (ftype == T_STRUCT) { + xfer += skipType(prot, ftype); + } + break; + case 13: + /* Skip this optional field now */ + if (ftype == T_LIST) { + xfer += skipType(prot, ftype); + } + break; default: + ereport(ERROR, + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: column metadata field not recognized with fid: %d", fid))); break; } } @@ -864,14 +923,15 @@ uint32_t readKeyValue( } break; default: + ereport(ERROR, + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: key value field not recognized with fid: %d", fid))); break; } } readStructEnd(prot); if (!isset_key) - ereport(ERROR, - (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata key value: key not set"))); + ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata key value: key not set"))); return xfer; } @@ -912,39 +972,43 @@ endDeserializerFooter( int16_t fid; int xfer = 0; - xfer += readFieldBegin(*prot, &ftype, &fid); - if (ftype != T_STOP) { - /*should remain the last field, keyvalue*/ - if (fid != 5) { - ereport(ERROR, - (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("incorrect file metadata format"))); - } - /* process keyvalue part*/ - if (ftype == T_LIST) { - uint32_t lsize; - TType etype; - xfer += readListBegin(*prot, &etype, &lsize); - - for (int i = 0; i < lsize; i++) { - char *key = NULL; - char *value = NULL; - xfer += readKeyValue(*prot, &key, &value); - - if ((key != NULL) && (strcmp(key, "hawq.schema") == 0)) { - int schemaLen = strlen(value); - parquetMetadata->hawqschemastr = - (char*) palloc0(schemaLen + 1); - strcpy(parquetMetadata->hawqschemastr, value); + while (true) { + xfer += readFieldBegin(*prot, &ftype, &fid); + if (ftype == T_STOP) { + break; + } + switch (fid) { + case 5: + if (ftype == T_LIST) { + uint32_t lsize; + TType etype; + xfer += readListBegin(*prot, &etype, &lsize); + + for (int i = 0; i < lsize; i++) { + char *key = NULL; + char *value = NULL; + xfer += readKeyValue(*prot, &key, &value); + + if ((key != NULL) && (strcmp(key, "hawq.schema") == 0)) { + int schemaLen = strlen(value); + parquetMetadata->hawqschemastr = + (char*) palloc0(schemaLen + 1); + strcpy(parquetMetadata->hawqschemastr, value); + } } } - } - - /*Then should read the T_STOP*/ - readFieldBegin(*prot, &ftype, &fid); - if (ftype != T_STOP) { + break; + case 6: + /* Skip this optional field now */ + if (ftype == T_STRING) { + xfer += skipType(*prot, ftype); + } + break; + default: ereport(ERROR, - (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("incorrect file metadata format"))); + (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("incorrect file metadata format with fid: %d", fid))); + break; } } @@ -1295,6 +1359,9 @@ writeEndofParquetMetadata( xfer += writeString(prot, "hawq.schema", strlen("hawq.schema")); /*write out value*/ xfer += writeFieldBegin(prot, T_STRING, 2); + if (parquetMetadata->hawqschemastr == NULL) + parquetMetadata->hawqschemastr = generateHAWQSchemaStr(parquetMetadata->pfield, + parquetMetadata->fieldCount); xfer += writeString(prot, parquetMetadata->hawqschemastr, strlen(parquetMetadata->hawqschemastr)); /*write out end of key value*/ xfer += writeFieldStop(prot); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/cdb/cdbparquetstoragewrite.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbparquetstoragewrite.c b/src/backend/cdb/cdbparquetstoragewrite.c index 6b5f847..e57e875 100644 --- a/src/backend/cdb/cdbparquetstoragewrite.c +++ b/src/backend/cdb/cdbparquetstoragewrite.c @@ -173,7 +173,8 @@ static int appendParquetColumn_Circle( int r, int d); -static char *generateHAWQSchemaStr( +/* Used by cdbparquetfooterserializer.c */ +char *generateHAWQSchemaStr( ParquetFileField pfields, int fieldCount); @@ -216,7 +217,7 @@ static bool ensureBufferCapacity(ParquetDataPage page, What about Array? */ -static char * +char * generateHAWQSchemaStr(ParquetFileField pfields, int fieldCount) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/executor/execMain.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 3a43cec..578b9e8 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -2383,10 +2383,13 @@ initResultRelInfo(ResultRelInfo *resultRelInfo, RelationGetRelationName(resultRelationDesc)))); break; case RELKIND_AOSEGMENTS: - ereport(ERROR, + /* Relax the constraint here to allow hawq register */ + if (!allowSystemTableModsDML && IsSystemRelation(resultRelationDesc)) { + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot change AO segment listing relation \"%s\"", RelationGetRelationName(resultRelationDesc)))); + } break; case RELKIND_AOBLOCKDIR: ereport(ERROR, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/include/cdb/cdbparquetstoragewrite.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/cdbparquetstoragewrite.h b/src/include/cdb/cdbparquetstoragewrite.h index c0832c0..c64a105 100644 --- a/src/include/cdb/cdbparquetstoragewrite.h +++ b/src/include/cdb/cdbparquetstoragewrite.h @@ -235,4 +235,9 @@ void estimateColumnWidth(int *columnWidths, Form_pg_attribute att, bool expandEmbeddingType); +/* + * Generate key_value_metadata filed used in FileMetaData. + */ +char * generateHAWQSchemaStr(ParquetFileField pfields, + int fieldCount); #endif /* CDBPARQUETSTORAGEWRITE_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test-hawq-register.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/testhawqregister/test-hawq-register.cpp b/src/test/feature/testhawqregister/test-hawq-register.cpp new file mode 100644 index 0000000..eb15d32 --- /dev/null +++ b/src/test/feature/testhawqregister/test-hawq-register.cpp @@ -0,0 +1,149 @@ +#include <string> + +#include "lib/command.h" +#include "lib/common.h" +#include "lib/sql-util.h" + +#include "gtest/gtest.h" + +using std::string; + +/* This test suite may consume more than 80 seconds. */ +class TestHawqRegister : public ::testing::Test { + public: + TestHawqRegister() {} + ~TestHawqRegister() {} +}; + +TEST_F(TestHawqRegister, TestSingleHawqFile) { + SQLUtility util; + string rootPath(util.getTestRootPath()); + string relativePath("/testhawqregister/test_hawq.paq"); + string filePath = rootPath + relativePath; + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hawq.paq")); + + util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hawq.paq")); + + util.query("select * from hawqregister;", 3); + util.execute("insert into hawqregister values(1);"); + util.query("select * from hawqregister;", 4); + util.execute("drop table hawqregister;"); +} + +TEST_F(TestHawqRegister, TestSingleHiveFile) { + SQLUtility util; + string rootPath(util.getTestRootPath()); + string relativePath("/testhawqregister/test_hive.paq"); + string filePath = rootPath + relativePath; + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hive.paq")); + + util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hive.paq")); + + util.query("select * from hawqregister;", 1); + util.execute("insert into hawqregister values(1);"); + util.query("select * from hawqregister;", 2); + util.execute("drop table hawqregister;"); +} + +TEST_F(TestHawqRegister, TestFiles) { + SQLUtility util; + string rootPath(util.getTestRootPath()); + string relativePath("/testhawqregister/test_hawq.paq"); + string filePath1 = rootPath + relativePath; + relativePath = "/testhawqregister/test_hive.paq"; + string filePath2 = rootPath + relativePath; + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -mkdir -p /hawq_register_test/t/t")); + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + " /hawq_register_test/hawq1.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + " /hawq_register_test/hawq2.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + " /hawq_register_test/t/hawq.paq")); + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + " /hawq_register_test/hive1.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + " /hawq_register_test/hive2.paq")); + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + " /hawq_register_test/t/hive.paq")); + + util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_test")); + + util.query("select * from hawqregister;", 12); + util.execute("insert into hawqregister values(1);"); + util.query("select * from hawqregister;", 13); + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm -r /hawq_register_test")); + util.execute("drop table hawqregister;"); +} + +TEST_F(TestHawqRegister, TestHashDistributedTable) { + SQLUtility util; + string rootPath(util.getTestRootPath()); + string relativePath("/testhawqregister/test_hawq.paq"); + string filePath = rootPath + relativePath; + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hawq.paq")); + + util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet) distributed by (i);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hawq.paq")); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm /hawq_register_hawq.paq")); + util.execute("drop table hawqregister;"); +} + +TEST_F(TestHawqRegister, TestNotParquetFile) { + SQLUtility util; + string rootPath(util.getTestRootPath()); + string relativePath("/testhawqregister/test_not_paq"); + string filePath = rootPath + relativePath; + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_test_not_paq")); + + util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_test_not_paq")); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm /hawq_register_test_not_paq")); + util.execute("drop table hawqregister;"); +} + +TEST_F(TestHawqRegister, TestNotParquetTable) { + SQLUtility util; + string rootPath(util.getTestRootPath()); + string relativePath("/testhawqregister/test_hawq.paq"); + string filePath = rootPath + relativePath; + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hawq.paq")); + + util.execute("create table hawqregister(i int);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hawq.paq")); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm /hawq_register_hawq.paq")); + util.execute("drop table hawqregister;"); +} + +TEST_F(TestHawqRegister, TestFileNotExist) { + SQLUtility util; + + util.execute("create table hawqregister(i int);"); + util.query("select * from hawqregister;", 0); + + EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_file_not_exist")); + util.query("select * from hawqregister;", 0); + + util.execute("drop table hawqregister;"); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_hawq.paq ---------------------------------------------------------------------- diff --git a/src/test/feature/testhawqregister/test_hawq.paq b/src/test/feature/testhawqregister/test_hawq.paq new file mode 100644 index 0000000..f2adb4b Binary files /dev/null and b/src/test/feature/testhawqregister/test_hawq.paq differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_hive.paq ---------------------------------------------------------------------- diff --git a/src/test/feature/testhawqregister/test_hive.paq b/src/test/feature/testhawqregister/test_hive.paq new file mode 100644 index 0000000..a356fc7 Binary files /dev/null and b/src/test/feature/testhawqregister/test_hive.paq differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_not_paq ---------------------------------------------------------------------- diff --git a/src/test/feature/testhawqregister/test_not_paq b/src/test/feature/testhawqregister/test_not_paq new file mode 100644 index 0000000..dc75c44 Binary files /dev/null and b/src/test/feature/testhawqregister/test_not_paq differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/bin/hawq ---------------------------------------------------------------------- diff --git a/tools/bin/hawq b/tools/bin/hawq index 08c30e4..41130e9 100755 --- a/tools/bin/hawq +++ b/tools/bin/hawq @@ -175,6 +175,9 @@ def main(): elif hawq_command == "scp": cmd = "%s; gpscp %s" % (source_hawq_env, sub_args) result = local_ssh(cmd) + elif hawq_command == "register": + cmd = "%s; hawqregister %s" % (source_hawq_env, sub_args) + result = local_ssh(cmd) elif hawq_command == "version" or hawq_command == "--version": print_version() else: http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister new file mode 100755 index 0000000..876aed7 --- /dev/null +++ b/tools/bin/hawqregister @@ -0,0 +1,272 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +''' +hawq register [options] database_name table_name file_or_dir_path_in_hdfs +''' +import os, sys, optparse, getpass, re, urlparse +try: + from gppylib.commands.unix import getLocalHostname, getUserName + from gppylib.db import dbconn + from gppylib.gplog import get_default_logger, setup_tool_logging + from gppylib.gpparseopts import OptParser, OptChecker + from pygresql import pg + from pygresql.pgdb import DatabaseError + from hawqpylib.hawqlib import local_ssh, local_ssh_output +except ImportError, e: + print e + sys.stderr.write('cannot import module, please check that you have source greenplum_path.sh\n') + sys.exit(2) + +# setup logging +logger = get_default_logger() +EXECNAME = os.path.split(__file__)[-1] +setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) + +def create_opt_parser(version): + parser = OptParser(option_class=OptChecker, + usage='usage: %prog [options] database_name table_name file_or_dir_path_in_hdfs', + version=version) + parser.remove_option('-h') + parser.add_option('-?', '--help', action='help') + parser.add_option('-h', '--host', help="host of the target DB") + parser.add_option('-p', '--port', help="port of the target DB", type='int', default=0) + parser.add_option('-U', '--user', help="username of the target DB") + return parser + + +def get_seg_name(options, databasename, tablename): + try: + relfilenode = 0 + relname = "" + query = "select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;" % tablename + dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + conn = dbconn.connect(dburl, True) + rows = dbconn.execSQL(conn, query) + conn.commit() + if rows.rowcount == 0: + logger.error("table '%s' not found in db '%s'" % (tablename, databasename)); + sys.exit(1) + for row in rows: + relname = row[0] + conn.close() + + except DatabaseError, ex: + logger.error("Failed to connect to database, this script can only be run when the database is up") + logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + sys.exit(1) + + # check whether the target table is parquet format + if relname.find("paq") == -1: + logger.error("table '%s' is not parquet format" % tablename) + sys.exit(1) + + return relname + + +def check_hash_type(options, databasename, tablename): + try: + query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename + dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + conn = dbconn.connect(dburl, False) + rows = dbconn.execSQL(conn, query) + conn.commit() + if rows.rowcount == 0: + logger.error("target not found in table gp_distribution_policy") + sys.exit(1) + for row in rows: + if row[0] != None: + logger.error("Cannot register file(s) to a table which is hash-typed") + sys.exit(1) + + conn.close() + + except DatabaseError, ex: + logger.error("Failed to connect to database, this script can only be run when the database is up") + logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + sys.exit(1) + + +def get_files_in_hdfs(filepath): + files = [] + sizes = [] + hdfscmd = "hadoop fs -test -e %s" % filepath + result = local_ssh(hdfscmd) + if result != 0: + logger.error("Path '%s' does not exist in hdfs" % filepath) + sys.exit(1) + + hdfscmd = "hadoop fs -ls -R %s" % filepath + result, out, err = local_ssh_output(hdfscmd) + outlines = out.splitlines() + + # recursively search all the files under path 'filepath' + i = 0 + for line in outlines: + lineargs = line.split() + if len(lineargs) == 8 and lineargs[0].find ("d") == -1: + files.append(lineargs[7]) + sizes.append(int(lineargs[4])) + + if len(files) == 0: + logger.error("Dir '%s' is empty" % filepath) + sys.exit(1) + + return files, sizes + + +def check_parquet_format(options, files): + # check whether the files are parquet format by checking the first and last four bytes + for file in files: + hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file + result1 = local_ssh(hdfscmd) + hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file + result2 = local_ssh(hdfscmd) + if result1 or result2: + logger.error("File %s is not parquet format" % file) + sys.exit(1) + + +def get_metadata_from_database(options, databasename, seg_name): + try: + query = "select segno from pg_aoseg.%s;" % seg_name + dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + conn = dbconn.connect(dburl, False) + rows = dbconn.execSQL(conn, query) + conn.commit() + conn.close() + + except DatabaseError, ex: + logger.error("Failed to connect to database, this script can only be run when the database is up") + logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + sys.exit(1) + + return rows.rowcount + 1 + + +def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, normal): + # get the full path of correspoding file for target table + try: + query = "select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;" % tablename + dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + conn = dbconn.connect(dburl, False) + rows = dbconn.execSQL(conn, query) + conn.commit() + conn.close() + + except DatabaseError, ex: + logger.error("Failed to connect to database, this script can only be run when the database is up") + logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + sys.exit(1) + + + # move file(s) in src path into the folder correspoding to the target table + if (normal == True): + for row in rows: + destdir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3]) + "/" + + segno = firstsegno + for file in files: + srcfile = file + dstfile = destdir + str(segno) + segno += 1 + if srcfile != dstfile: + hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile) + sys.stdout.write("hafscmd: '%s'\n" % hdfscmd) + result = local_ssh(hdfscmd) + if result != 0: + logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile)) + sys.exit(1) + else: + for row in rows: + srcdir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3]) + "/" + + segno = firstsegno + for file in files: + dstfile = file + srcfile = srcdir + str(segno) + segno += 1 + if srcfile != dstfile: + hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile) + sys.stdout.write("hafscmd: '%s'\n" % hdfscmd) + result = local_ssh(hdfscmd) + if result != 0: + logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile)) + sys.exit(1) + + +def insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno, eofs): + try: + query = "SET allow_system_table_mods='dml';" + segno = firstsegno + for eof in eofs: + query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % (seg_name, segno, eof, -1, -1) + segno += 1 + + dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + conn = dbconn.connect(dburl, True) + rows = dbconn.execSQL(conn, query) + conn.commit() + conn.close() + + except DatabaseError, ex: + logger.error("Failed to connect to database, this script can only be run when the database is up") + logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + move_files_in_hdfs(options, databasename, tablename, files, firstsegno, False) + + sys.exit(1) + + +def main(args=None): + parser = create_opt_parser('%prog version $Revision: #1 $') + options, args = parser.parse_args(args) + if len(args) != 3: + sys.stderr.write('Incorrect number of arguments\n\n') + parser.print_help(sys.stderr) + return 1 + + databasename = args[0] + tablename = args[1] + filepath = args[2] + + # 1. get the seg_name from database + seg_name = get_seg_name(options, databasename, tablename) + + # 2. check whether target table is hash-typed, in that case simple insertion does not work + result = check_hash_type(options, databasename, tablename) + + # 3. get all the files refered by 'filepath', which could be a file or a directory containing all the files + files, sizes = get_files_in_hdfs(filepath) + print "File(s) to be registered:" + print files + + # 4. check whether the file to be registered is parquet format + check_parquet_format(options, files) + + # 5. get the metadata to be inserted from hdfs + firstsegno = get_metadata_from_database(options, databasename, seg_name) + + # 6. move the file in hdfs to proper location + move_files_in_hdfs(options, databasename, tablename, files, firstsegno, True) + + # 7. insert the metadata into database + insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno, sizes) + +if __name__ == '__main__': + sys.exit(main()) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/doc/hawqregister_help ---------------------------------------------------------------------- diff --git a/tools/doc/hawqregister_help b/tools/doc/hawqregister_help new file mode 100644 index 0000000..d5a5a0d --- /dev/null +++ b/tools/doc/hawqregister_help @@ -0,0 +1,135 @@ +COMMAND NAME: hawq register + +Register parquet files generated by other system into the corrsponding table in HAWQ + +***************************************************** +SYNOPSIS +***************************************************** + +hawq register [-h hostname] [-p port] [-U username] <databasename> <tablename> <hdfspath> + +hawq register help +hawq register -? + +hawq register --version + +***************************************************** +DESCRIPTION +***************************************************** + +"hawq register" is a utility to register file(s) on HDFS into +the table in HAWQ. It moves the file in the path(if path +refers to a file) or files under the path(if path refers to a +directory) into the table directory corresponding to the table, +and then update the table meta data to include the files. + +To use "hawq register", HAWQ must have been started. + +Currently "hawq register" supports parquet tables only. +User have to make sure that the meta data of the parquet file(s) +and the table are consistent. +The table to be registered into should not be hash distributed, which +is created by using "distributed by" statement when creating that table. + +***************************************************** +Arguments +***************************************************** + +<databasename> + +Name of the database to be operated on. + +<tablename> + +Name of the table to be registered into. + +<hdfspath> + +The path of the file or the directory containing the files +that will be registered. + +***************************************************** +OPTIONS +***************************************************** + +-? (help) + +Displays the online help. + +--version + +Displays the version of this utility. + +***************************************************** +CONNECTION OPTIONS +***************************************************** + +-h hostname + + Specifies the host name of the machine on which the HAWQ master + database server is running. If not specified, reads from the + environment variable $PGHOST which defaults to localhost. + +-p port + + Specifies the TCP port on which the HAWQ master database server + is listening for connections. If not specified, reads from the + environment variable $PGPORT which defaults to 5432. + +-U username + + The database role name to connect as. If not specified, reads + from the environment variable $PGUSER which defaults to the current + system user name. + +***************************************************** +EXAMPLES +***************************************************** + +Run "hawq register" to register a parquet file with path '/temp/hive.paq' +in hdfs generated by hive into table 'parquet_table' in HAWQ, which is in the +database named 'postgres'. +Assume the location of the database is 'hdfs://localhost:8020/hawq_default', +tablespace id is '16385', database id is '16387', table filenode id is '77160', +last file under the filenode numbered '7'. + +$ hawq register postgres parquet_table /temp/hive.paq + +This will move the file '/temp/hive.paq' into the corresponding new place +'hdfs://localhost:8020/hawq_default/16385/16387/77160/8' in hdfs, then +update the meta data of the table 'parquet_table' in HAWQ which is in the +table 'pg_aoseg.pg_paqseg_77160'. + +***************************************************** +DATA TYPES +***************************************************** +The data types used in HAWQ and parquet format are not the same, so there is a +mapping between them, concluded as follow: + +Data types in HAWQ Data types in parquet +bool boolean +int2 int32 +int4 int32 +date int32 +int8 int64 +time int64 +timestamptz int64 +timestamp int64 +money int64 +float4 float +float8 double +bit byte_array +varbit byte_array +byte byte_array +numeric byte_array +name byte_array +char byte_array +bpchar byte_array +varchar byte_array +text byte_array +xml byte_array +timetz byte_array +interval byte_array +macaddr byte_array +inet byte_array +cidr byte_array
