Repository: incubator-hawq
Updated Branches:
  refs/heads/master ec098032e -> 2b50f7fee


HAWQ-760. Add hawq register function and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2b50f7fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2b50f7fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2b50f7fe

Branch: refs/heads/master
Commit: 2b50f7fee9a71ade3ca0e569e50562dde9a64bbc
Parents: ec09803
Author: Yancheng Luo <[email protected]>
Authored: Thu Jun 2 10:13:30 2016 +0800
Committer: Lili Ma <[email protected]>
Committed: Thu Jun 2 10:28:56 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbparquetfooterserializer.c    | 129 ++++++---
 src/backend/cdb/cdbparquetstoragewrite.c        |   5 +-
 src/backend/executor/execMain.c                 |   5 +-
 src/include/cdb/cdbparquetstoragewrite.h        |   5 +
 .../testhawqregister/test-hawq-register.cpp     | 149 ++++++++++
 src/test/feature/testhawqregister/test_hawq.paq | Bin 0 -> 657 bytes
 src/test/feature/testhawqregister/test_hive.paq | Bin 0 -> 212 bytes
 src/test/feature/testhawqregister/test_not_paq  | Bin 0 -> 48 bytes
 tools/bin/hawq                                  |   3 +
 tools/bin/hawqregister                          | 272 +++++++++++++++++++
 tools/doc/hawqregister_help                     | 135 +++++++++
 11 files changed, 669 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/cdb/cdbparquetfooterserializer.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbparquetfooterserializer.c 
b/src/backend/cdb/cdbparquetfooterserializer.c
index 802793f..b142600 100644
--- a/src/backend/cdb/cdbparquetfooterserializer.c
+++ b/src/backend/cdb/cdbparquetfooterserializer.c
@@ -25,6 +25,7 @@
  */
 
 #include "cdb/cdbparquetfooterserializer.h"
+#include "cdb/cdbparquetstoragewrite.h"
 #include "access/parquetmetadata_c++/MetadataInterface.h"
 #include "lib/stringinfo.h"
 #include "postgres.h"
@@ -245,7 +246,21 @@ readParquetFileMetadata(
                                break;
                        }
                        break;
+               case 5:
+                       /* Skip this optional field now */
+                       if (ftype == T_LIST) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
+               case 6:
+                       /* Skip this optional field now */
+                       if (ftype == T_STRING) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
                default:
+                       ereport(ERROR,
+                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata field not recognized with fid: %d", fid)));
                        break;
                }
 
@@ -491,7 +506,27 @@ readSchemaElement_Single(
                                xfer += skipType(prot, ftype);
                        }
                        break;
+               case 7:
+                       /* Skip this optional field now */
+                       if (ftype == T_I32) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
+               case 8:
+                       /* Skip this optional field now */
+                       if (ftype == T_I32) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
+               case 9:
+                       /* Skip this optional field now */
+                       if (ftype == T_I32) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
                default:
+                       ereport(ERROR,
+                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata: schema element field not recognizd with fid: %d", fid)));
                        break;
                }
        }
@@ -567,7 +602,15 @@ readRowGroupInfo(
                                isset_num_rows = true;
                        }
                        break;
+               case 4:
+                       /* Skip this optional field now */
+                       if (ftype == T_LIST) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
                default:
+                       ereport(ERROR,
+                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata: row group field not recognized with fid: %d", fid)));
                        break;
                }
        }
@@ -633,6 +676,8 @@ readColumnChunk(
                        }
                        break;
                default:
+                       ereport(ERROR,
+                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata: column chunk field not recognized with fid: %d", fid)));
                        break;
                }
        }
@@ -773,7 +818,21 @@ readColumnMetadata(
                                xfer += skipType(prot, ftype);
                        }
                        break;
+               case 12:
+                       /* Skip this optional field now */
+                       if (ftype == T_STRUCT) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
+               case 13:
+                       /* Skip this optional field now */
+                       if (ftype == T_LIST) {
+                               xfer += skipType(prot, ftype);
+                       }
+                       break;
                default:
+                       ereport(ERROR,
+                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata: column metadata field not recognized with fid: %d", 
fid)));
                        break;
                }
        }
@@ -864,14 +923,15 @@ uint32_t readKeyValue(
                        }
                        break;
                default:
+                       ereport(ERROR,
+                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata: key value field not recognized with fid: %d", fid)));
                        break;
                }
        }
        readStructEnd(prot);
 
        if (!isset_key)
-               ereport(ERROR,
-                               (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata key value: key not set")));
+               ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("file metadata key value: key not set")));
        return xfer;
 }
 
@@ -912,39 +972,43 @@ endDeserializerFooter(
        int16_t fid;
        int xfer = 0;
 
-       xfer += readFieldBegin(*prot, &ftype, &fid);
-       if (ftype != T_STOP) {
-               /*should remain the last field, keyvalue*/
-               if (fid != 5) {
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("incorrect file metadata format")));
-               }
 
-               /* process keyvalue part*/
-               if (ftype == T_LIST) {
-                       uint32_t lsize;
-                       TType etype;
-                       xfer += readListBegin(*prot, &etype, &lsize);
-
-                       for (int i = 0; i < lsize; i++) {
-                               char *key = NULL;
-                               char *value = NULL;
-                               xfer += readKeyValue(*prot, &key, &value);
-
-                               if ((key != NULL) && (strcmp(key, 
"hawq.schema") == 0)) {
-                                       int schemaLen = strlen(value);
-                                       parquetMetadata->hawqschemastr =
-                                                       (char*) 
palloc0(schemaLen + 1);
-                                       strcpy(parquetMetadata->hawqschemastr, 
value);
+       while (true) {
+               xfer += readFieldBegin(*prot, &ftype, &fid);
+               if (ftype == T_STOP) {
+                       break;
+               }
+               switch (fid) {
+               case 5:
+                       if (ftype == T_LIST) {
+                               uint32_t lsize;
+                               TType etype;
+                               xfer += readListBegin(*prot, &etype, &lsize);
+
+                               for (int i = 0; i < lsize; i++) {
+                                       char *key = NULL;
+                                       char *value = NULL;
+                                       xfer += readKeyValue(*prot, &key, 
&value);
+
+                                       if ((key != NULL) && (strcmp(key, 
"hawq.schema") == 0)) {
+                                               int schemaLen = strlen(value);
+                                               parquetMetadata->hawqschemastr =
+                                                               (char*) 
palloc0(schemaLen + 1);
+                                               
strcpy(parquetMetadata->hawqschemastr, value);
+                                       }
                                }
                        }
-               }
-
-               /*Then should read the T_STOP*/
-               readFieldBegin(*prot, &ftype, &fid);
-               if (ftype != T_STOP) {
+                       break;
+               case 6:
+                       /* Skip this optional field now */
+                       if (ftype == T_STRING) {
+                               xfer += skipType(*prot, ftype);
+                       }
+                       break;
+               default:
                        ereport(ERROR,
-                                       (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("incorrect file metadata format")));
+                                       (errcode(ERRCODE_GP_INTERNAL_ERROR), 
errmsg("incorrect file metadata format with fid: %d", fid)));
+                       break;
                }
        }
 
@@ -1295,6 +1359,9 @@ writeEndofParquetMetadata(
        xfer += writeString(prot, "hawq.schema", strlen("hawq.schema"));
        /*write out value*/
        xfer += writeFieldBegin(prot, T_STRING, 2);
+       if (parquetMetadata->hawqschemastr == NULL)
+               parquetMetadata->hawqschemastr = 
generateHAWQSchemaStr(parquetMetadata->pfield,
+                                                       
parquetMetadata->fieldCount);
        xfer += writeString(prot, parquetMetadata->hawqschemastr, 
strlen(parquetMetadata->hawqschemastr));
        /*write out end of key value*/
        xfer += writeFieldStop(prot);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/cdb/cdbparquetstoragewrite.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbparquetstoragewrite.c 
b/src/backend/cdb/cdbparquetstoragewrite.c
index 6b5f847..e57e875 100644
--- a/src/backend/cdb/cdbparquetstoragewrite.c
+++ b/src/backend/cdb/cdbparquetstoragewrite.c
@@ -173,7 +173,8 @@ static int appendParquetColumn_Circle(
                int r,
                int d);
 
-static char *generateHAWQSchemaStr(
+/* Used by cdbparquetfooterserializer.c */
+char *generateHAWQSchemaStr(
                ParquetFileField pfields,
                int fieldCount);
 
@@ -216,7 +217,7 @@ static bool ensureBufferCapacity(ParquetDataPage page,
 
  What about Array?
  */
-static char *
+char *
 generateHAWQSchemaStr(ParquetFileField pfields,
                                          int fieldCount)
 {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/executor/execMain.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 3a43cec..578b9e8 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -2383,10 +2383,13 @@ initResultRelInfo(ResultRelInfo *resultRelInfo,
                                                 
RelationGetRelationName(resultRelationDesc))));
                        break;
                case RELKIND_AOSEGMENTS:
-                       ereport(ERROR,
+                       /* Relax the constraint here to allow hawq register */
+                       if (!allowSystemTableModsDML && 
IsSystemRelation(resultRelationDesc)) {
+                               ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("cannot change AO segment 
listing relation \"%s\"",
                                                 
RelationGetRelationName(resultRelationDesc))));
+                       }
                        break;
                case RELKIND_AOBLOCKDIR:
                        ereport(ERROR,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/include/cdb/cdbparquetstoragewrite.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbparquetstoragewrite.h 
b/src/include/cdb/cdbparquetstoragewrite.h
index c0832c0..c64a105 100644
--- a/src/include/cdb/cdbparquetstoragewrite.h
+++ b/src/include/cdb/cdbparquetstoragewrite.h
@@ -235,4 +235,9 @@ void estimateColumnWidth(int *columnWidths,
                                        Form_pg_attribute att,
                                        bool expandEmbeddingType);
 
+/*
+ * Generate key_value_metadata filed used in FileMetaData.
+ */
+char * generateHAWQSchemaStr(ParquetFileField pfields,
+                                       int fieldCount);
 #endif /* CDBPARQUETSTORAGEWRITE_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test-hawq-register.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test-hawq-register.cpp 
b/src/test/feature/testhawqregister/test-hawq-register.cpp
new file mode 100644
index 0000000..eb15d32
--- /dev/null
+++ b/src/test/feature/testhawqregister/test-hawq-register.cpp
@@ -0,0 +1,149 @@
+#include <string>
+
+#include "lib/command.h"
+#include "lib/common.h"
+#include "lib/sql-util.h"
+
+#include "gtest/gtest.h"
+
+using std::string;
+
+/* This test suite may consume more than 80 seconds. */
+class TestHawqRegister : public ::testing::Test {
+ public:
+  TestHawqRegister() {}
+  ~TestHawqRegister() {}
+};
+
+TEST_F(TestHawqRegister, TestSingleHawqFile) {
+       SQLUtility util;
+       string rootPath(util.getTestRootPath());
+       string relativePath("/testhawqregister/test_hawq.paq");
+       string filePath = rootPath + relativePath;
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " 
/hawq_register_hawq.paq"));
+
+       util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_hawq.paq"));
+
+       util.query("select * from hawqregister;", 3);
+       util.execute("insert into hawqregister values(1);");
+       util.query("select * from hawqregister;", 4);
+       util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestSingleHiveFile) {
+       SQLUtility util;
+       string rootPath(util.getTestRootPath());
+       string relativePath("/testhawqregister/test_hive.paq");
+       string filePath = rootPath + relativePath;
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " 
/hawq_register_hive.paq"));
+
+       util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_hive.paq"));
+
+       util.query("select * from hawqregister;", 1);
+       util.execute("insert into hawqregister values(1);");
+       util.query("select * from hawqregister;", 2);
+       util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestFiles) {
+       SQLUtility util;
+       string rootPath(util.getTestRootPath());
+       string relativePath("/testhawqregister/test_hawq.paq");
+       string filePath1 = rootPath + relativePath;
+       relativePath = "/testhawqregister/test_hive.paq";
+       string filePath2 = rootPath + relativePath;
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -mkdir -p 
/hawq_register_test/t/t"));
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + 
" /hawq_register_test/hawq1.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + 
" /hawq_register_test/hawq2.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + 
" /hawq_register_test/t/hawq.paq"));
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + 
" /hawq_register_test/hive1.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + 
" /hawq_register_test/hive2.paq"));
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + 
" /hawq_register_test/t/hive.paq"));
+
+       util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_test"));
+
+       util.query("select * from hawqregister;", 12);
+       util.execute("insert into hawqregister values(1);");
+       util.query("select * from hawqregister;", 13);
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm -r 
/hawq_register_test"));
+       util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestHashDistributedTable) {
+       SQLUtility util;
+       string rootPath(util.getTestRootPath());
+       string relativePath("/testhawqregister/test_hawq.paq");
+       string filePath = rootPath + relativePath;
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " 
/hawq_register_hawq.paq"));
+
+       util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet) distributed by (i);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_hawq.paq"));
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
/hawq_register_hawq.paq"));
+       util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestNotParquetFile) {
+       SQLUtility util;
+       string rootPath(util.getTestRootPath());
+       string relativePath("/testhawqregister/test_not_paq");
+       string filePath = rootPath + relativePath;
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " 
/hawq_register_test_not_paq"));
+
+       util.execute("create table hawqregister(i int) with (appendonly=true, 
orientation=parquet);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_test_not_paq"));
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
/hawq_register_test_not_paq"));
+       util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestNotParquetTable) {
+       SQLUtility util;
+       string rootPath(util.getTestRootPath());
+       string relativePath("/testhawqregister/test_hawq.paq");
+       string filePath = rootPath + relativePath;
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " 
/hawq_register_hawq.paq"));
+
+       util.execute("create table hawqregister(i int);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_hawq.paq"));
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm 
/hawq_register_hawq.paq"));
+       util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestFileNotExist) {
+       SQLUtility util;
+
+       util.execute("create table hawqregister(i int);");
+       util.query("select * from hawqregister;", 0);
+
+       EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres 
hawqregister /hawq_register_file_not_exist"));
+       util.query("select * from hawqregister;", 0);
+
+       util.execute("drop table hawqregister;");
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_hawq.paq
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test_hawq.paq 
b/src/test/feature/testhawqregister/test_hawq.paq
new file mode 100644
index 0000000..f2adb4b
Binary files /dev/null and b/src/test/feature/testhawqregister/test_hawq.paq 
differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_hive.paq
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test_hive.paq 
b/src/test/feature/testhawqregister/test_hive.paq
new file mode 100644
index 0000000..a356fc7
Binary files /dev/null and b/src/test/feature/testhawqregister/test_hive.paq 
differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_not_paq
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test_not_paq 
b/src/test/feature/testhawqregister/test_not_paq
new file mode 100644
index 0000000..dc75c44
Binary files /dev/null and b/src/test/feature/testhawqregister/test_not_paq 
differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/bin/hawq
----------------------------------------------------------------------
diff --git a/tools/bin/hawq b/tools/bin/hawq
index 08c30e4..41130e9 100755
--- a/tools/bin/hawq
+++ b/tools/bin/hawq
@@ -175,6 +175,9 @@ def main():
     elif hawq_command == "scp":
         cmd = "%s; gpscp %s" % (source_hawq_env, sub_args)
         result = local_ssh(cmd)
+    elif hawq_command == "register":
+        cmd = "%s; hawqregister %s" % (source_hawq_env, sub_args)
+        result = local_ssh(cmd)
     elif hawq_command == "version" or hawq_command == "--version":
         print_version()
     else:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
new file mode 100755
index 0000000..876aed7
--- /dev/null
+++ b/tools/bin/hawqregister
@@ -0,0 +1,272 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+'''
+hawq register [options] database_name table_name file_or_dir_path_in_hdfs
+'''
+import os, sys, optparse, getpass, re, urlparse
+try:
+    from gppylib.commands.unix import getLocalHostname, getUserName
+    from gppylib.db import dbconn
+    from gppylib.gplog import get_default_logger, setup_tool_logging
+    from gppylib.gpparseopts import OptParser, OptChecker
+    from pygresql import pg
+    from pygresql.pgdb import DatabaseError
+    from hawqpylib.hawqlib import local_ssh, local_ssh_output
+except ImportError, e:
+    print e
+    sys.stderr.write('cannot import module, please check that you have source 
greenplum_path.sh\n')
+    sys.exit(2)
+
+# setup logging
+logger = get_default_logger()
+EXECNAME = os.path.split(__file__)[-1]
+setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
+
+def create_opt_parser(version):
+    parser = OptParser(option_class=OptChecker,
+                       usage='usage: %prog [options] database_name table_name 
file_or_dir_path_in_hdfs',
+                       version=version)
+    parser.remove_option('-h')
+    parser.add_option('-?', '--help', action='help')
+    parser.add_option('-h', '--host', help="host of the target DB")
+    parser.add_option('-p', '--port', help="port of the target DB", 
type='int', default=0)
+    parser.add_option('-U', '--user', help="username of the target DB")
+    return parser
+
+
+def get_seg_name(options, databasename, tablename):
+    try:
+        relfilenode = 0
+        relname = ""
+        query = "select pg_class2.relname from pg_class as pg_class1, 
pg_appendonly, pg_class as pg_class2 where pg_class1.relname ='%s' and  
pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = 
pg_class2.oid;" % tablename
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        conn = dbconn.connect(dburl, True)
+        rows = dbconn.execSQL(conn, query)
+       conn.commit()
+        if rows.rowcount == 0:
+            logger.error("table '%s' not found in db '%s'" % (tablename, 
databasename));
+            sys.exit(1)
+        for row in rows:
+            relname = row[0]
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        sys.exit(1)
+
+    # check whether the target table is parquet format
+    if relname.find("paq") == -1:
+        logger.error("table '%s' is not parquet format" % tablename)
+        sys.exit(1)
+
+    return relname
+
+
+def check_hash_type(options, databasename, tablename):
+    try:
+        query = "select attrnums from gp_distribution_policy, pg_class where 
pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % 
tablename
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+       conn.commit()
+        if rows.rowcount == 0:
+            logger.error("target not found in table gp_distribution_policy")
+            sys.exit(1)
+        for row in rows:
+            if row[0] != None:
+                logger.error("Cannot register file(s) to a table which is 
hash-typed")
+                sys.exit(1)
+
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        sys.exit(1)
+
+
+def get_files_in_hdfs(filepath):
+    files = []
+    sizes = []
+    hdfscmd = "hadoop fs -test -e %s" % filepath
+    result = local_ssh(hdfscmd)
+    if result != 0:
+        logger.error("Path '%s' does not exist in hdfs" % filepath)
+        sys.exit(1)
+
+    hdfscmd = "hadoop fs -ls -R %s" % filepath
+    result, out, err = local_ssh_output(hdfscmd)
+    outlines = out.splitlines()
+
+    # recursively search all the files under path 'filepath'
+    i = 0
+    for line in outlines:
+        lineargs = line.split()
+        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
+            files.append(lineargs[7])
+            sizes.append(int(lineargs[4]))
+
+    if len(files) == 0:
+        logger.error("Dir '%s' is empty" % filepath)
+        sys.exit(1)
+
+    return files, sizes
+
+
+def check_parquet_format(options, files):
+    # check whether the files are parquet format by checking the first and 
last four bytes
+    for file in files:
+        hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file
+        result1 = local_ssh(hdfscmd)
+        hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file
+        result2 = local_ssh(hdfscmd)
+        if result1 or result2:
+            logger.error("File %s is not parquet format" % file)
+            sys.exit(1)
+
+
+def get_metadata_from_database(options, databasename, seg_name):
+    try:
+        query = "select segno from pg_aoseg.%s;" % seg_name
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+       conn.commit()
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        sys.exit(1)
+
+    return rows.rowcount + 1
+
+
+def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, 
normal):
+    # get the full path of correspoding file for target table
+    try:
+        query = "select location, 
gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from 
pg_class, gp_persistent_relation_node, gp_persistent_tablespace_node, 
gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = 
gp_persistent_relation_node.relfilenode_oid and 
gp_persistent_relation_node.tablespace_oid = 
gp_persistent_tablespace_node.tablespace_oid and 
gp_persistent_filespace_node.filespace_oid = 
gp_persistent_filespace_node.filespace_oid;" % tablename
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+       conn.commit()
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+        sys.exit(1)
+
+
+    # move file(s) in src path into the folder correspoding to the target table
+    if (normal == True):
+        for row in rows:
+            destdir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + 
"/" + str(row[3]) + "/"
+
+        segno = firstsegno
+        for file in files:
+            srcfile = file
+            dstfile = destdir + str(segno)
+            segno += 1
+            if srcfile != dstfile:
+                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
+                sys.stdout.write("hafscmd: '%s'\n" % hdfscmd)
+                result = local_ssh(hdfscmd)
+                if result != 0:
+                    logger.error("Fail to move '%s' to '%s'" % (srcfile, 
dstfile))
+                    sys.exit(1)
+    else:
+        for row in rows:
+            srcdir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + 
"/" + str(row[3]) + "/"
+
+        segno = firstsegno
+        for file in files:
+            dstfile = file
+            srcfile = srcdir + str(segno)
+            segno += 1
+            if srcfile != dstfile:
+                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
+                sys.stdout.write("hafscmd: '%s'\n" % hdfscmd)
+                result = local_ssh(hdfscmd)
+                if result != 0:
+                    logger.error("Fail to move '%s' to '%s'" % (srcfile, 
dstfile))
+                    sys.exit(1)
+
+
+def insert_metadata_into_database(options, databasename, tablename, seg_name, 
firstsegno, eofs):
+    try:
+        query = "SET allow_system_table_mods='dml';"
+        segno = firstsegno
+        for eof in eofs:
+            query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % 
(seg_name, segno, eof, -1, -1)
+            segno += 1
+
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, 
username=options.user, dbname=databasename)
+        conn = dbconn.connect(dburl, True)
+        rows = dbconn.execSQL(conn, query)
+       conn.commit()
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be 
run when the database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = 
%s" % (options.host, options.port, options.user, databasename, query))
+       move_files_in_hdfs(options, databasename, tablename, files, firstsegno, 
False)
+
+        sys.exit(1)
+
+
+def main(args=None):
+    parser = create_opt_parser('%prog version $Revision: #1 $')
+    options, args = parser.parse_args(args)
+    if len(args) != 3:
+        sys.stderr.write('Incorrect number of arguments\n\n')
+        parser.print_help(sys.stderr)
+        return 1
+
+    databasename = args[0]
+    tablename = args[1]
+    filepath = args[2]
+
+    # 1. get the seg_name from database
+    seg_name = get_seg_name(options, databasename, tablename)
+
+    # 2. check whether target table is hash-typed, in that case simple 
insertion does not work
+    result = check_hash_type(options, databasename, tablename)
+
+    # 3. get all the files refered by 'filepath', which could be a file or a 
directory containing all the files
+    files, sizes = get_files_in_hdfs(filepath)
+    print "File(s) to be registered:"
+    print files
+
+    # 4. check whether the file to be registered is parquet format
+    check_parquet_format(options, files)
+
+    # 5. get the metadata to be inserted from hdfs
+    firstsegno = get_metadata_from_database(options, databasename, seg_name)
+
+    # 6. move the file in hdfs to proper location
+    move_files_in_hdfs(options, databasename, tablename, files, firstsegno, 
True)
+
+    # 7. insert the metadata into database
+    insert_metadata_into_database(options, databasename, tablename, seg_name, 
firstsegno, sizes)
+
+if __name__ == '__main__':
+    sys.exit(main())

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/doc/hawqregister_help
----------------------------------------------------------------------
diff --git a/tools/doc/hawqregister_help b/tools/doc/hawqregister_help
new file mode 100644
index 0000000..d5a5a0d
--- /dev/null
+++ b/tools/doc/hawqregister_help
@@ -0,0 +1,135 @@
+COMMAND NAME: hawq register
+
+Register parquet files generated by other system into the corrsponding table 
in HAWQ
+
+*****************************************************
+SYNOPSIS
+*****************************************************
+
+hawq register [-h hostname] [-p port] [-U username] <databasename> <tablename> 
<hdfspath>
+
+hawq register help
+hawq register -?
+
+hawq register --version
+
+*****************************************************
+DESCRIPTION
+*****************************************************
+
+"hawq register" is a utility to register file(s) on HDFS into
+the table in HAWQ. It moves the file in the path(if path
+refers to a file) or files under the path(if path refers to a
+directory) into the table directory corresponding to the table,
+and then update the table meta data to include the files.
+
+To use "hawq register", HAWQ must have been started.
+
+Currently "hawq register" supports parquet tables only.
+User have to make sure that the meta data of the parquet file(s)
+and the table are consistent.
+The table to be registered into should not be hash distributed, which
+is created by using "distributed by" statement when creating that table.
+
+*****************************************************
+Arguments
+*****************************************************
+
+<databasename>
+
+Name of the database to be operated on.
+
+<tablename>
+
+Name of the table to be registered into.
+
+<hdfspath>
+
+The path of the file or the directory containing the files
+that will be registered.
+
+*****************************************************
+OPTIONS
+*****************************************************
+
+-? (help)
+
+Displays the online help.
+
+--version
+
+Displays the version of this utility.
+
+*****************************************************
+CONNECTION OPTIONS
+*****************************************************
+
+-h hostname
+
+  Specifies the host name of the machine on which the HAWQ master
+  database server is running. If not specified, reads from the
+  environment variable $PGHOST which defaults to localhost.
+
+-p port
+
+  Specifies the TCP port on which the HAWQ master database server
+  is listening for connections. If not specified, reads from the
+  environment variable $PGPORT which defaults to 5432.
+
+-U username
+
+  The database role name to connect as. If not specified, reads
+  from the environment variable $PGUSER which defaults to the current
+  system user name.
+
+*****************************************************
+EXAMPLES
+*****************************************************
+
+Run "hawq register" to register a parquet file with path '/temp/hive.paq'
+in hdfs generated by hive into table 'parquet_table' in HAWQ, which is in the
+database named 'postgres'.
+Assume the location of the database is 'hdfs://localhost:8020/hawq_default',
+tablespace id is '16385', database id is '16387', table filenode id is '77160',
+last file under the filenode numbered '7'.
+
+$ hawq register postgres parquet_table /temp/hive.paq
+
+This will move the file '/temp/hive.paq' into the corresponding new place
+'hdfs://localhost:8020/hawq_default/16385/16387/77160/8' in hdfs, then
+update the meta data of the table 'parquet_table' in HAWQ which is in the
+table 'pg_aoseg.pg_paqseg_77160'.
+
+*****************************************************
+DATA TYPES
+*****************************************************
+The data types used in HAWQ and parquet format are not the same, so there is a
+mapping between them, concluded as follow:
+
+Data types in HAWQ              Data types in parquet
+bool                            boolean
+int2                            int32
+int4                            int32
+date                            int32
+int8                            int64
+time                            int64
+timestamptz                     int64
+timestamp                       int64
+money                           int64
+float4                          float
+float8                          double
+bit                             byte_array
+varbit                          byte_array
+byte                            byte_array
+numeric                         byte_array
+name                            byte_array
+char                            byte_array
+bpchar                          byte_array
+varchar                         byte_array
+text                            byte_array
+xml                             byte_array
+timetz                          byte_array
+interval                        byte_array
+macaddr                         byte_array
+inet                            byte_array
+cidr                            byte_array

Reply via email to