This is an automated email from the ASF dual-hosted git repository.

huor pushed a commit to branch huor_cloud
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit e696bbd0718435c3785f7d66c684f5863de37d90
Author: Ruilong Huo <[email protected]>
AuthorDate: Mon Mar 18 15:43:11 2019 +0800

    HAWQ-1689. Add test for cloud environment support
---
 src/test/feature/cloudtest/sql/normal/all.sql      | 101 +++
 .../feature/cloudtest/sql/normal/before_normal.sql |  41 ++
 .../cloudtest/sql/parallel/before_parallel.sql     |  99 +++
 .../feature/cloudtest/sql/parallel/parallel.sql    |  73 +++
 src/test/feature/cloudtest/test_cloud.cpp          | 271 ++++++++
 src/test/feature/cloudtest/test_cloud.h            | 143 +++++
 src/test/feature/cloudtest/test_cloud.xml          |  40 ++
 src/test/feature/lib/parse_out.cpp                 | 167 +++++
 src/test/feature/lib/parse_out.h                   |  85 +++
 src/test/feature/lib/psql.cpp                      | 203 +++++-
 src/test/feature/lib/psql.h                        |  45 +-
 src/test/feature/lib/sql_util.cpp                  | 673 +++++++++++++++++---
 src/test/feature/lib/sql_util.h                    | 301 ++++++++-
 src/test/feature/lib/sql_util_parallel.cpp         | 705 +++++++++++++++++++++
 src/test/feature/lib/sql_util_parallel.h           | 133 ++++
 src/test/feature/lib/sqlfile-parsebase.cpp         |  89 +++
 src/test/feature/lib/sqlfile-parsebase.h           |  60 ++
 17 files changed, 3132 insertions(+), 97 deletions(-)

diff --git a/src/test/feature/cloudtest/sql/normal/all.sql 
b/src/test/feature/cloudtest/sql/normal/all.sql
new file mode 100644
index 0000000..ed31c43
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/normal/all.sql
@@ -0,0 +1,101 @@
+create table a(i int);
+
+create language plpythonu;
+
+CREATE OR REPLACE FUNCTION f4() RETURNS TEXT AS $$ plpy.execute("select * from 
a order by i") $$ LANGUAGE plpythonu VOLATILE;
+
+select * from f4();
+
+CREATE OR REPLACE FUNCTION normalize_si(text) RETURNS text AS $$ BEGIN RETURN 
substring($1, 9, 2) || substring($1, 7, 2) || substring($1, 5, 2) || 
substring($1, 1, 4); END; $$LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION si_lt(text, text) RETURNS boolean AS $$ BEGIN 
RETURN normalize_si($1) < normalize_si($2); END; $$ LANGUAGE 'plpgsql' 
IMMUTABLE;
+
+CREATE OPERATOR <# ( PROCEDURE=si_lt,LEFTARG=text, RIGHTARG=text);
+
+CREATE OR REPLACE FUNCTION si_same(text, text) RETURNS int AS $$ BEGIN IF 
normalize_si($1) < normalize_si($2) THEN RETURN -1; END IF; END; $$ LANGUAGE 
'plpgsql' IMMUTABLE;
+
+CREATE OPERATOR CLASS sva_special_ops FOR TYPE text USING btree AS OPERATOR 1 
<#, FUNCTION 1 si_same(text, text);
+
+CREATE RESOURCE QUEUE myqueue WITH (PARENT='pg_root', ACTIVE_STATEMENTS=20, 
MEMORY_LIMIT_CLUSTER=50%, CORE_LIMIT_CLUSTER=50%);   
+
+CREATE TABLESPACE mytblspace FILESPACE dfs_system;    
+
+CREATE TABLE foo(i int) TABLESPACE mytblspace;
+
+insert into foo(i) values(1234);
+
+COPY a TO '/tmp/a.txt';
+
+COPY a FROM '/tmp/a.txt';
+
+COPY a TO STDOUT WITH DELIMITER '|';
+
+CREATE EXTERNAL TABLE ext_t ( N_NATIONKEY INTEGER ,N_NAME CHAR(25), 
N_REGIONKEY  INTEGER ,N_COMMENT    VARCHAR(152))location 
('gpfdist://localhost:7070/nation_error50.tbl')FORMAT 'text' (delimiter 
'|')SEGMENT REJECT LIMIT 51;
+
+--select * from ext_t order by N_NATIONKEY;   
+
+CREATE WRITABLE EXTERNAL TABLE ext_t2 (i int) LOCATION 
('gpfdist://localhost:7070/ranger2.out') FORMAT 'TEXT' ( DELIMITER '|' NULL ' 
');
+
+--insert into ext_t2(i) values(234);
+
+create schema sa;
+
+create temp table ta(i int);
+
+create view av as select * from a order by i;
+
+create table aa as select * from a order by i;
+
+create table sa.t(a int, b int);
+
+CREATE SEQUENCE myseq START 1;
+
+insert into a values(1);
+
+insert into a values(1);
+
+insert into a VALUES (nextval('myseq'));
+
+select * from pg_database, a order by oid, i limit 1;
+
+select generate_series(1,3);
+
+select * from av;
+
+SELECT setval('myseq', 1);
+
+SELECT * INTO aaa FROM a WHERE i > 0 order by i;
+
+PREPARE fooplan (int) AS INSERT INTO a VALUES($1);EXECUTE 
fooplan(1);DEALLOCATE fooplan;
+
+explain select * from a;
+
+CREATE FUNCTION scube_accum(numeric, numeric) RETURNS numeric AS 'select $1 + 
$2 * $2 * $2' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;
+
+CREATE AGGREGATE scube(numeric) ( SFUNC = scube_accum, STYPE = numeric, 
INITCOND = 0 );
+
+ALTER AGGREGATE scube(numeric) RENAME TO scube2;   
+
+CREATE TYPE mytype AS (f1 int, f2 int);
+
+CREATE FUNCTION getfoo() RETURNS SETOF mytype AS $$ SELECT i, i FROM a order 
by i $$ LANGUAGE SQL;
+
+select getfoo();
+
+begin; DECLARE mycursor CURSOR FOR SELECT * FROM a order by i; FETCH FORWARD 2 
FROM mycursor; commit;
+
+BEGIN; INSERT INTO a VALUES (1); SAVEPOINT my_savepoint; INSERT INTO a VALUES 
(1); RELEASE SAVEPOINT my_savepoint; COMMIT;
+
+\d
+
+analyze a;
+
+analyze;
+
+vacuum aa;
+
+vacuum analyze;
+
+truncate aa;
+
+alter table a rename column i to j;
\ No newline at end of file
diff --git a/src/test/feature/cloudtest/sql/normal/before_normal.sql 
b/src/test/feature/cloudtest/sql/normal/before_normal.sql
new file mode 100644
index 0000000..f234f82
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/normal/before_normal.sql
@@ -0,0 +1,41 @@
+drop function f4();
+
+drop language plpythonu CASCADE;
+
+drop OPERATOR CLASS sva_special_ops USING btree;
+
+drop OPERATOR <# (text,text) CASCADE;
+
+drop FUNCTION si_same(text, text);
+
+drop FUNCTION si_lt(text, text);
+
+drop FUNCTION normalize_si(text);
+
+DROP RESOURCE QUEUE myqueue;
+
+drop table foo;
+
+drop tablespace mytblspace;
+
+drop EXTERNAL TABLE ext_t;
+
+drop EXTERNAL TABLE ext_t2;
+
+DROP AGGREGATE scube2(numeric);
+
+DROP FUNCTION scube_accum(numeric, numeric);
+
+drop type mytype cascade;
+
+drop SEQUENCE myseq;
+
+drop view av;
+
+drop table aaa;
+
+drop table aa;
+
+drop table a;
+
+drop schema sa CASCADE;
\ No newline at end of file
diff --git a/src/test/feature/cloudtest/sql/parallel/before_parallel.sql 
b/src/test/feature/cloudtest/sql/parallel/before_parallel.sql
new file mode 100644
index 0000000..f81c1e5
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/parallel/before_parallel.sql
@@ -0,0 +1,99 @@
+-- Drop object ,clean env
+drop OPERATOR CLASS sva_special_ops USING btree;
+
+drop OPERATOR <# (text,text) CASCADE;
+
+drop FUNCTION si_same(text, text);
+
+drop FUNCTION si_lt(text, text);
+
+drop FUNCTION normalize_si(text);
+
+DROP RESOURCE QUEUE myqueue;
+
+drop table foo;
+
+drop tablespace mytblspace;
+
+drop EXTERNAL TABLE ext_t;
+
+drop EXTERNAL TABLE ext_t2;
+
+drop function f4();
+
+DROP AGGREGATE scube2(numeric);
+
+DROP FUNCTION scube_accum(numeric, numeric);
+
+drop type mytype cascade;
+
+drop language plpythonu;
+
+truncate aa;
+
+drop SEQUENCE myseq;
+
+drop view av;
+
+--drop table aaa;
+
+drop table aa;
+
+drop table a;
+
+drop schema sa CASCADE;
+
+-- Create object
+
+create table a(i int);
+
+CREATE OR REPLACE FUNCTION si_same(text, text) RETURNS int AS $$ BEGIN IF 
normalize_si($1) < normalize_si($2) THEN RETURN -1; END IF; END; $$ LANGUAGE 
'plpgsql' IMMUTABLE;
+
+CREATE OPERATOR CLASS sva_special_ops FOR TYPE text USING btree AS OPERATOR 1 
<#, FUNCTION 1 si_same(text, text);
+
+CREATE RESOURCE QUEUE myqueue WITH (PARENT='pg_root', ACTIVE_STATEMENTS=20, 
MEMORY_LIMIT_CLUSTER=50%, CORE_LIMIT_CLUSTER=50%);   
+
+CREATE TABLESPACE mytblspace FILESPACE dfs_system;    
+
+create language plpythonu;
+
+CREATE TABLE foo(i int) TABLESPACE mytblspace;
+
+CREATE EXTERNAL TABLE ext_t ( N_NATIONKEY INTEGER ,N_NAME CHAR(25), 
N_REGIONKEY  INTEGER ,N_COMMENT    VARCHAR(152))location 
('gpfdist://localhost:7070/nation_error50.tbl')FORMAT 'text' (delimiter 
'|')SEGMENT REJECT LIMIT 51;
+
+CREATE WRITABLE EXTERNAL TABLE ext_t2 (i int) LOCATION 
('gpfdist://localhost:7070/ranger2.out') FORMAT 'TEXT' ( DELIMITER '|' NULL ' 
');
+
+CREATE OR REPLACE FUNCTION f4() RETURNS TEXT AS $$ plpy.execute("select * from 
a order by i") $$ LANGUAGE plpythonu VOLATILE;
+
+create schema sa;
+
+create temp table ta(i int);
+
+create view av as select * from a order by i;
+
+create table aa as select * from a order by i;
+
+create table sa.t(a int, b int);
+
+CREATE SEQUENCE myseq START 1;
+
+PREPARE fooplan (int) AS INSERT INTO a VALUES($1);EXECUTE 
fooplan(1);DEALLOCATE fooplan;
+
+CREATE FUNCTION scube_accum(numeric, numeric) RETURNS numeric AS 'select $1 + 
$2 * $2 * $2' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;
+
+CREATE AGGREGATE scube(numeric) ( SFUNC = scube_accum, STYPE = numeric, 
INITCOND = 0 );
+
+ALTER AGGREGATE scube(numeric) RENAME TO scube2;   
+
+CREATE TYPE mytype AS (f1 int, f2 int);
+
+CREATE FUNCTION getfoo() RETURNS SETOF mytype AS $$ SELECT i, i FROM a order 
by i $$ LANGUAGE SQL;
+
+--alter table a rename column i to j;
+
+CREATE OR REPLACE FUNCTION normalize_si(text) RETURNS text AS $$ BEGIN RETURN 
substring($1, 9, 2) || substring($1, 7, 2) || substring($1, 5, 2) || 
substring($1, 1, 4); END; $$LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION si_lt(text, text) RETURNS boolean AS $$ BEGIN 
RETURN normalize_si($1) < normalize_si($2); END; $$ LANGUAGE 'plpgsql' 
IMMUTABLE;
+
+CREATE OPERATOR <# ( PROCEDURE=si_lt,LEFTARG=text, RIGHTARG=text);
+
diff --git a/src/test/feature/cloudtest/sql/parallel/parallel.sql 
b/src/test/feature/cloudtest/sql/parallel/parallel.sql
new file mode 100644
index 0000000..de12e14
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/parallel/parallel.sql
@@ -0,0 +1,73 @@
+--set session role=usertest21;
+insert into foo(i) values(1234);
+
+--set session role=usertest24;
+--COPY (select * from a) to '/tmp/a.txt';
+--COPY a FROM '/tmp/a.txt';
+
+--set session role=usertest25;
+COPY a TO STDOUT WITH DELIMITER '|';
+
+--set session role=usertest27;
+--select * from ext_t order by N_NATIONKEY;   
+
+
+--set session role=usertest29;
+--insert into ext_t2(i) values(234);
+
+--set session role=usertest38;
+insert into a values(1);
+
+--set session role=usertest39;
+insert into a values(1);
+
+--set session role=usertest4;
+select * from f4();
+
+--set session role=usertest40;
+insert into a VALUES (nextval('myseq'));
+
+--set session role=usertest41;
+select * from pg_database, a order by oid, i limit 1;
+
+--set session role=usertest42;
+select generate_series(1,3);
+
+--set session role=usertest43;
+select * from av;
+
+--set session role=usertest44;
+SELECT setval('myseq', 1);
+
+--set session role=usertest45;
+--SELECT * INTO aaa FROM a WHERE i > 0 order by i;
+
+--set session role=usertest46;
+PREPARE fooplan (int) AS INSERT INTO a VALUES($1);EXECUTE 
fooplan(1);DEALLOCATE fooplan;
+
+--set session role=usertest47;
+explain select * from a;
+
+--set session role=usertest55;
+select getfoo();
+
+--set session role=usertest57;
+begin; DECLARE mycursor CURSOR FOR SELECT * FROM a order by i; FETCH FORWARD 2 
FROM mycursor; commit;
+
+--set session role=usertest58;
+BEGIN; INSERT INTO a VALUES (1); SAVEPOINT my_savepoint; INSERT INTO a VALUES 
(1); RELEASE SAVEPOINT my_savepoint; COMMIT;
+
+--set session role=usertest59;
+\d
+
+--set session role=usertest60;
+analyze a;
+
+--set session role=usertest61;
+--analyze;
+
+--set session role=usertest62;
+vacuum aa;
+
+--set session role=usertest63;
+--vacuum analyze;
diff --git a/src/test/feature/cloudtest/test_cloud.cpp 
b/src/test/feature/cloudtest/test_cloud.cpp
new file mode 100644
index 0000000..9730238
--- /dev/null
+++ b/src/test/feature/cloudtest/test_cloud.cpp
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <fstream>
+
+#include "lib/command.h"
+#include "lib/sql_util_parallel.h"
+#include "lib/xml_parser.h"
+#include "test_cloud.h"
+
+#define CREATE_ROLE "CREATE ROLE"
+#define ALTER_ROLE "ALTER ROLE"
+#define DROP_ROLE "DROP ROLE"
+
+using hawq::test::Command;
+
+TEST_F(TestCloud, Permissions) {
+  ASSERT_TRUE(testLogin(getUserName(), HAWQ_PASSWORD))
+      << "Connect Error: Can't login as " + getUserName();
+  execSql("drop role if exists lava;");
+  execSql("drop role if exists user1;");
+  execSql("drop role if exists user2;");
+  execSql(getHost2(), "drop role if exists user2;");
+
+  std::string result;
+  result = execSql("create role lava password \'lava00000000\';");
+  const std::string lava_pwd("lava00000000");
+  ASSERT_TRUE(checkOut(result, CREATE_ROLE))
+      << "create role lava failure: " + result;
+  EXPECT_FALSE(testLogin("lava", lava_pwd))
+      << "Error: lava can login before grant login permission";
+
+  result = execSql("alter role lava login;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "grant lava login permission failure: " + result;
+  ASSERT_TRUE(testLogin("lava", lava_pwd))
+      << "Error: lava can't login after grant login permission";
+
+  result = execSql("alter role lava Superuser;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "grant lava Superuser permision failure: " + result;
+  result = execSql("alter role lava Createrole;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "grant lava Createrole permision failure: " + result;
+  result = execSql("lava", lava_pwd,
+                   "create role user1 login password \'user100000000\';");
+  ASSERT_TRUE((checkOut(result, CREATE_ROLE)))
+      << "lava create user1 failure: " + result;
+  ASSERT_TRUE(testLogin("user1", "user100000000")) << "login as user1 failure";
+
+  result = execSql("lava", lava_pwd,
+                   "alter role user1 password \'newPwd00000000\';");
+  const std::string user_newPwd = "newPwd00000000";
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava reset user1's password failure";
+  EXPECT_FALSE(testLogin("user1", "user100000000"))
+      << "user1 can login after reset password";
+  EXPECT_TRUE(testLogin("user1", user_newPwd))
+      << "user1 user new password can't login after reset password";
+
+  result = execSql("user1", user_newPwd,
+                   "create role user2 password \'user200000000\'");
+  EXPECT_FALSE(checkOut(result, CREATE_ROLE))
+      << "user1 can create user2 before get creatrole permissions: " + result;
+  result = execSql("lava", lava_pwd, "alter role user1 createrole;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava grant user1 createrole permissions failure: " + result;
+  result = execSql("user1", user_newPwd,
+                   "create role user2 password \'user200000000\'");
+  ASSERT_TRUE(checkOut(result, CREATE_ROLE))
+      << "user1 can't create user2 after get creatrole permissions: " + result;
+
+  result = execSql("lava", lava_pwd, "alter role user1 nocreaterole;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava can't revoke createrole from user1:" << result;
+  result = execSql("user1", user_newPwd, "drop role user2;");
+  ASSERT_FALSE(checkOut(result, DROP_ROLE))
+      << "user can drop user2 after revoked createrole permissions: " + result;
+
+  result = execSql("lava", lava_pwd, "drop role user2;");
+  ASSERT_TRUE(checkOut(result, DROP_ROLE)) << "lava can't drop user2";
+
+  result = execSql(getHost2(), "create role user2 password 
\'user200000000\';");
+  ASSERT_TRUE(checkOut(result, CREATE_ROLE))
+      << "lavaIntern can't create user2 in cluster2: " + result;
+
+  result = execSql("lava", lava_pwd, "alter role user2 login;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava can't grant login permission in cluster1 to user2: " + result;
+
+  ASSERT_TRUE(testLogin("user2", "user200000000"))
+      << "login as user2 in cluster1 failure";
+  result = execSql(getHost2(), "drop role user2;");
+  ASSERT_TRUE(checkOut(result, DROP_ROLE))
+      << "drop user2 failure in cluster2: " + result;
+  ASSERT_FALSE(testLogin("user2", "user200000000"))
+      << "user2 can login in cluster1 after drop user2 in cluster1";
+}
+
+TEST_F(TestCloud, Normal) {
+  Command::getCommandStatus("mkdir -p " + getTestRootPath() +
+                            "/cloudtest/out/normal");
+  configConnection();
+  execSQLfile("cloudtest/sql/normal/before_normal.sql",
+              "cloudtest/out/normal/before_normal.out");
+  execSQLfileandCheck("cloudtest/sql/normal/all.sql",
+                      "cloudtest/out/normal/all.out");
+}
+
+#define PROCESS_NUM 10
+#define ITERATION_NUM 1
+TEST_F(TestCloud, Parallel) {
+  std::string result;
+  Command::getCommandStatus("mkdir -p " + getTestRootPath() +
+                            "/cloudtest/out/parallel");
+  Command::getCommandStatus("rm -rf " + getTestRootPath() +
+                            "/cloudtest/out/parallel/round*");
+  configConnection();
+  execSQLfile("cloudtest/sql/parallel/before_parallel.sql",
+              "cloudtest/out/parallel/before_parallel.out");
+  for (int i = 0; i < ITERATION_NUM; i++) {
+    std::string outpath =
+        "cloudtest/out/parallel/round" + std::to_string(i) + "/";
+    Command::getCommandStatus("mkdir -p " + getTestRootPath() + "/" + outpath);
+    runParallelSQLFile("cloudtest/sql/parallel/parallel.sql",
+                       outpath + "parallel.out", PROCESS_NUM);
+  }
+
+  std::string outputFile;
+  for (int i = 0; i < ITERATION_NUM; ++i)
+    for (int j = 0; j < PROCESS_NUM; ++j) {
+      outputFile = "cloudtest/out/parallel/round" + std::to_string(i) + "/" +
+                   "parallel_process_" + std::to_string(j) + ".out";
+      bool result = hawq::test::SQLUtility::checkPatternInFile(
+          outputFile, ": ERROR:|FATAL|PANIC");
+      EXPECT_TRUE(result) << outputFile << " find : ERROR:|FATAL|PANIC";
+    }
+}
+
+TestCloud::TestCloud()
+    : testRootPath(hawq::test::SQLUtility::getTestRootPath()), usexml(false) {
+  setConfigXml("cloudtest/test_cloud.xml");
+}
+
+bool TestCloud::setConfigXml(const std::string &xmlfile) {
+  if (xmlfile.empty()) {
+    std::cout << "xml file should not be empty when you call setConfigXml";
+    return false;
+  } else {
+    std::string xmlabsfile = testRootPath + "/" + xmlfile;
+    if (!std::ifstream(xmlabsfile)) {
+      std::cout << xmlabsfile << " doesn't exist";
+      return false;
+    }
+    try {
+      std::unique_ptr<hawq::test::XmlConfig> xmlconf;
+      xmlconf.reset(new hawq::test::XmlConfig(xmlabsfile));
+      xmlconf->parse();
+      hostName = xmlconf->getString("cloud_host");
+      cluster2host = xmlconf->getString("cloud_host2");
+      port = xmlconf->getString("cloud_port", "5432");
+      databaseName = xmlconf->getString("cloud_database", "postgres");
+      userName = xmlconf->getString("cloud_user");
+      usexml = true;
+      xmlconf.release();
+      return true;
+    } catch (...) {
+      std::cout << "Parse Xml file" << xmlabsfile << "error";
+      return false;
+    }
+  }
+}
+
+void TestCloud::configConnection() {
+  conn.reset(new hawq::test::PSQL(databaseName, hostName, port, userName,
+                                  HAWQ_PASSWORD));
+}
+
+std::string TestCloud::getTestRootPath() const { return this->testRootPath; }
+
+void TestCloud::execSQLfile(const std::string &sqlFile,
+                            const std::string &outFile) {
+  std::string sqlFileAbsPath = testRootPath + "/" + sqlFile;
+  std::string outFileAbsPath = testRootPath + "/" + outFile;
+  conn->setOutputFile(outFileAbsPath);
+  EXPECT_EQ(0, conn->runSQLFile(sqlFileAbsPath, "").getLastStatus());
+  conn->resetOutput();
+}
+
+void TestCloud::execSQLfileandCheck(const std::string &sqlFile,
+                                    const std::string &outFile) {
+  execSQLfile(sqlFile, outFile);
+  bool result = hawq::test::SQLUtility::checkPatternInFile(
+      outFile, ": ERROR:|FATAL|PANIC");
+  EXPECT_TRUE(result) << outFile << " find ERROR|FATAL|PANIC";
+}
+
+bool TestCloud::testLogin(const std::string &host, const std::string &portStr,
+                          const std::string &database, const std::string &user,
+                          const std::string &pwd) const {
+  std::string loginCmd = "psql -h " + host + " -p " + portStr + " -d " +
+                         database + " -U " + user + " --c '\\q'";
+  std::string result = Command::getCommandOutput("export PGPASSWORD=\'" + pwd +
+                                                 "\' && " + loginCmd);
+  return result.empty();
+}
+
+bool TestCloud::testLogin(const std::string &user,
+                          const std::string &pwd) const {
+  return testLogin(hostName, port, databaseName, user, pwd);
+}
+
+bool TestCloud::testLogin(const std::string &pwd) const {
+  return testLogin(hostName, port, databaseName, userName, pwd);
+}
+
+std::string TestCloud::execSql(const std::string &host,
+                               const std::string &sql) {
+  return execSql(host, port, databaseName, userName, HAWQ_PASSWORD, sql);
+}
+
+std::string TestCloud::execSql(const std::string &sql) {
+  return execSql(userName, HAWQ_PASSWORD, sql);
+}
+
+std::string TestCloud::execSql(const std::string &user, const std::string &pwd,
+                               const std::string &sql) {
+  return execSql(hostName, port, databaseName, user, pwd, sql);
+}
+
+std::string TestCloud::execSql(const std::string &host,
+                               const std::string &portStr,
+                               const std::string &database,
+                               const std::string &user, const std::string &pwd,
+                               const std::string &sql) {
+  std::string sqlCmd = "echo \"" + sql + "\" | psql -h " + host + " -p " +
+                       portStr + " -d " + database + " -U " + user;
+  std::string result = Command::getCommandOutput("export PGPASSWORD=\'" + pwd +
+                                                 "\' && " + sqlCmd);
+  return result;
+}
+
+bool TestCloud::checkOut(const std::string &out, const std::string &pattern) {
+  return (out.find("ERROR:") != std::string::npos)
+             ? false
+             : (out.find(pattern) != std::string::npos);
+}
+
+void TestCloud::runParallelSQLFile(const std::string sqlFile,
+                                   const std::string outputFile, int 
processnum,
+                                   const std::string ansFile) {
+  hawq::test::SQLUtility sqlUtil(this->databaseName, this->hostName, 
this->port,
+                                 this->userName);
+  sqlUtil.runParallelSQLFile(sqlFile, outputFile, processnum, ansFile);
+}
diff --git a/src/test/feature/cloudtest/test_cloud.h 
b/src/test/feature/cloudtest/test_cloud.h
new file mode 100644
index 0000000..0e05ea3
--- /dev/null
+++ b/src/test/feature/cloudtest/test_cloud.h
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TEST_CLOUD_H
+#define TEST_CLOUD_H
+
+#include <string>
+
+#include "gtest/gtest.h"
+#include "lib/psql.h"
+
+class TestCloud : public ::testing::Test {
+ public:
+  TestCloud();
+  ~TestCloud() {}
+
+  // config connection param use member hostName,port,databaseName,userName
+  void configConnection();
+
+  // get some value from xmlfile
+  // @xmlfile the abs path of xml file
+  // @return config done status
+  bool setConfigXml(const std::string &xmlfile);
+
+  // @return test root path
+  std::string getTestRootPath() const;
+
+  // test login user member hostName,port databaseName
+  // @user user Name
+  // @pwd  login password
+  // @return login status
+  bool testLogin(const std::string &user, const std::string &pwd) const;
+
+  // test login use member hostName,port,databaseName,userName
+  // @pwd login password
+  // @return login status
+  bool testLogin(const std::string &pwd) const;
+
+  // test login
+  // @host host name
+  // @portStr which port
+  // @database database name to connect
+  // @user user name
+  // @pwd login password
+  // @return login status
+  bool testLogin(const std::string &host, const std::string &portStr,
+                 const std::string &database, const std::string &user,
+                 const std::string &pwd) const;
+
+  // execute sql file and check outfile
+  // @sqlfile sql file relative path
+  // @outfile out file relative path
+  void execSQLfileandCheck(const std::string &sqlfile,
+                           const std::string &outfile);
+
+  // execute sql file
+  // @sqlfile sql file relative path
+  // @outfile out file relative path
+  void execSQLfile(const std::string &sqlfile, const std::string &outfile);
+
+  // execute sql in host,other param use member
+  // @host host name
+  // @sql sql query
+  // @return out of sql
+  std::string execSql(const std::string &host, const std::string &sql);
+
+  // execute sql
+  // @sql sql query
+  // @return out of sql
+  std::string execSql(const std::string &sql);
+
+  // execute sql query by user
+  // @user who execute sql
+  // @pwd user's password
+  // @sql sql query
+  // @return out of execute sql
+  std::string execSql(const std::string &user, const std::string &pwd,
+                      const std::string &sql);
+
+  // execute sql query
+  // @host name
+  // @portStr port
+  // @database database name
+  // @user who execute sql
+  // @pwd user's password
+  // @sql sql query
+  // @return out of execute sql
+  std::string execSql(const std::string &host, const std::string &portStr,
+                      const std::string &database, const std::string &user,
+                      const std::string &pwd, const std::string &sql);
+  // check out use pattern
+  // @out out of sql
+  // @pattern out need have string
+  // @return out is satisfactory
+  bool checkOut(const std::string &out, const std::string &pattern = "");
+
+  // run parallel sql file
+  // @sqlFile to be run sql file
+  // @outputFile out file of run sql file
+  // @processnum number of process
+  // @ansFile expect out file to compare to out file
+  void runParallelSQLFile(const std::string sqlFile,
+                          const std::string outputFile, int processnum,
+                          const std::string ansFile = "");
+
+  // @return another host
+  std::string getHost2() const { return this->cluster2host; }
+
+  // @return port
+  std::string getPort() const { return this->port; }
+
+  // @return user name
+  std::string getUserName() const { return this->userName; }
+
+ private:
+  std::unique_ptr<hawq::test::PSQL> conn;
+  std::string testRootPath;
+
+  std::string hostName;
+  std::string cluster2host;
+  std::string port;
+  std::string databaseName;
+  std::string userName;
+  bool usexml;
+};
+
+#endif
diff --git a/src/test/feature/cloudtest/test_cloud.xml 
b/src/test/feature/cloudtest/test_cloud.xml
new file mode 100644
index 0000000..f886ea8
--- /dev/null
+++ b/src/test/feature/cloudtest/test_cloud.xml
@@ -0,0 +1,40 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+       <property>
+                <name>cloud_host</name>
+                <value>HOST1</value>
+        </property>
+        <property>
+                <name>cloud_host2</name>
+                <value>HOST2</value>
+        </property>
+        <property>
+                <name>cloud_port</name>
+                <value>5432</value>
+        </property>
+
+        <property>
+                <name>cloud_database</name>
+                <value>postgres</value>
+        </property>
+        <property>
+                <name>cloud_user</name>
+                <value>oushu</value>
+        </property>
+</configuration>
diff --git a/src/test/feature/lib/parse_out.cpp 
b/src/test/feature/lib/parse_out.cpp
new file mode 100644
index 0000000..5a2299b
--- /dev/null
+++ b/src/test/feature/lib/parse_out.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "parse_out.h"
+
+namespace hawq {
+namespace test {
+using namespace std;
+
+const std::string Sqlout_parser::ignore_start_tag = "--ignore_start";
+const std::string Sqlout_parser::ignore_end_tag = "--ignore_end";
+const std::string Sqlout_parser::oneline_sql_start_tag = "--oneline_sql_start";
+const std::string Sqlout_parser::oneline_sql_end_tag = "--oneline_sql_end";
+const std::string Sqlout_parser::sql_start_tag = "--sql_start";
+const std::string Sqlout_parser::sql_end_tag = "--sql_end";
+const std::string Sqlout_parser::out_start_tag = " --out_start";
+
+bool Sqlout_parser::parse_sql_time(
+    const string &input_sqloutfile,
+    vector<pair<string, double>> &output_sqloutinfo) {
+  vector<SqlOut> sqloutinfo;
+  vector<pair<string, double>> temp_output;
+  if (__parse_sql_time(input_sqloutfile, sqloutinfo) == false) return false;
+  for (size_t i = 0; i != sqloutinfo.size(); ++i) {
+    if (!sqloutinfo[i].is_ignore) {
+      temp_output.emplace_back(std::move(sqloutinfo[i].sql),
+                               sqloutinfo[i].time);
+    }
+  }
+  output_sqloutinfo.swap(temp_output);
+  return true;
+}
+
+bool Sqlout_parser::__parse_sql_time(const string &outfile,
+                                     vector<SqlOut> &ouf) {
+  vector<string> v_inf;
+  if (__trans_file_to_vector(outfile, v_inf) == false) return false;
+  if (__parse_sql_time_imple(v_inf, ouf) == false) return false;
+  return true;
+}
+
+bool Sqlout_parser::__parse_sql_time_imple(const vector<string> &inf,
+                                           vector<SqlOut> &ouf) {
+  size_t module_start_index = 0, module_end_index = 0;
+  for (;;) {
+    if (__is_tag(inf[module_end_index], ignore_start_tag) == true) {
+      if (__parse_ignore(inf, ouf, module_start_index, module_end_index,
+                         false) == false)
+        return false;
+      module_start_index = module_end_index + 1;
+      for (; __is_tag(inf[module_end_index], ignore_end_tag) == false;) {
+        if (++module_end_index == inf.size())
+          return __not_found_tag_error(module_start_index, ignore_end_tag);
+      }
+      if (__parse_ignore(inf, ouf, module_start_index, module_end_index,
+                         true) == false)
+        return false;
+    } else if (++module_end_index == inf.size()) {
+      if (__parse_ignore(inf, ouf, module_start_index, module_end_index,
+                         false) == false)
+        return false;
+      return true;
+    } else {
+    }
+  }
+  return false;
+}
+
+bool Sqlout_parser::__parse_ignore(const vector<string> &inf,
+                                   vector<SqlOut> &ouf, size_t &start_index,
+                                   const size_t end_index, bool ignore) {
+  while (start_index != end_index) {
+    if (__parse_sqlout(inf, ouf, start_index, end_index, ignore) == false)
+      return false;
+  }
+  return true;
+}
+
+bool Sqlout_parser::__parse_sqlout(const vector<string> &inf,
+                                   vector<SqlOut> &ouf, size_t &start_index,
+                                   const size_t end_index, bool ignore) {
+  for (; start_index != end_index;) {
+    const string &line = inf[start_index++];
+    if (__is_tag(line, oneline_sql_start_tag) == true) {
+      if (__parse_sqlout_imple(inf, ouf, start_index, end_index,
+                               oneline_sql_end_tag, ignore) == false)
+        return false;
+    } else if (__is_tag(line, sql_start_tag) == true) {
+      if (__parse_sqlout_imple(inf, ouf, start_index, end_index, sql_end_tag,
+                               ignore) == false)
+        return false;
+    } else {
+    }
+  }
+  return true;
+}
+
+bool Sqlout_parser::__parse_sqlout_imple(const vector<string> &inf,
+                                         vector<SqlOut> &ouf,
+                                         size_t &start_index,
+                                         const size_t end_index,
+                                         const string &end_tag, bool ignore) {
+  string sql;
+  size_t sql_start_index = start_index;
+  size_t out_pos;
+  if (end_tag == oneline_sql_end_tag) {
+    if ((out_pos = inf[start_index].find(out_start_tag)) == string::npos)
+      return __not_found_tag_error(start_index, out_start_tag);
+    sql = inf[start_index].substr(0, out_pos);
+  } else {
+    if ((out_pos = inf[start_index].find(out_start_tag)) != string::npos) {
+      sql = inf[start_index].substr(0, out_pos);  // if sql is empty or oneline
+    } else {
+      sql = inf[start_index];
+      for (++start_index; start_index != end_index; ++start_index) {
+        const string &line = inf[start_index];
+        if ((out_pos = line.find(out_start_tag)) != string::npos) {
+          sql.append("\n").append(line.substr(0, out_pos));
+          break;
+        } else {
+          sql.append("\n").append(line);
+        }
+      }
+    }
+
+    if (start_index == end_index)
+      return __not_found_tag_error(start_index, out_start_tag);
+  }
+  double outtime = -1.0;
+  for (++start_index; start_index != end_index; ++start_index) {
+    const string &line = inf[start_index];
+    __parse_time(line, outtime);
+    if (__sub_equal(line, end_tag) == true) {
+      ouf.emplace_back(ignore, outtime, std::move(sql));
+      return true;
+    }
+  }
+  return __not_found_tag_error(sql_start_index, end_tag);
+}
+
+bool Sqlout_parser::__parse_time(const string &line, double &outtime) {
+  if (__sub_equal(line, 0, time_tag)) {
+    size_t start_pos = time_tag.size();
+    size_t end_pos = line.find(' ', start_pos);
+    outtime = stod(line.substr(start_pos, end_pos - start_pos));
+    return true;
+  }
+  return false;
+}
+
+}  // namespace test
+}  // namespace hawq
\ No newline at end of file
diff --git a/src/test/feature/lib/parse_out.h b/src/test/feature/lib/parse_out.h
new file mode 100644
index 0000000..ef681a1
--- /dev/null
+++ b/src/test/feature/lib/parse_out.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HAWQ_SRC_TEST_FEATURE_PARSE_OUT_H_
+#define HAWQ_SRC_TEST_FEATURE_PARSE_OUT_H_
+
+#include "lib/sqlfile-parsebase.h"
+
+namespace hawq {
+namespace test {
+
+struct SqlOut {
+  SqlOut(bool is_ignore, double time, std::string &&sql)
+      : is_ignore(is_ignore), time(time), sql(std::move(sql)) {}
+
+  bool is_ignore;
+  double time;
+  std::string sql;
+};
+
+class Sqlout_parser : public SqlFileParseBase {
+ public:
+  Sqlout_parser() : SqlFileParseBase("parse_out") {}
+  bool parse_sql_time(
+      const std::string &input_sqloutfile,
+      std::vector<std::pair<std::string, double>> &output_sqloutinfo);
+
+  static inline std::string reportSql(const std::string &oneSql) {
+    return sql_start_tag + "\n" + oneSql + out_start_tag + "\n" + sql_end_tag;
+  }
+
+  static inline std::string reportIgnoreSql(const std::string oneSql) {
+    return ignore_start_tag + "\n" + reportSql(oneSql) + "\n" + ignore_end_tag;
+  }
+
+ private:
+  bool __parse_sql_time(const std::string &outfile, std::vector<SqlOut> &ouf);
+
+  bool __parse_sql_time_imple(const std::vector<std::string> &inf,
+                              std::vector<SqlOut> &ouf);
+
+  bool __parse_ignore(const std::vector<std::string> &inf,
+                      std::vector<SqlOut> &ouf, size_t &start_index,
+                      const size_t end_index, bool ignore);
+
+  bool __parse_sqlout(const std::vector<std::string> &inf,
+                      std::vector<SqlOut> &ouf, size_t &start_index,
+                      const size_t end_index, bool ignore);
+
+  bool __parse_sqlout_imple(const std::vector<std::string> &inf,
+                            std::vector<SqlOut> &ouf, size_t &start_index,
+                            const size_t end_index, const std::string &end_tag,
+                            bool ignore);
+
+  bool __parse_time(const std::string &line, double &outtime);
+
+ private:
+  const std::string time_tag = "Time: ";
+
+ private:
+  static const std::string ignore_start_tag;
+  static const std::string ignore_end_tag;
+  static const std::string oneline_sql_start_tag;
+  static const std::string oneline_sql_end_tag;
+  static const std::string sql_start_tag;
+  static const std::string sql_end_tag;
+  static const std::string out_start_tag;
+};
+}  // namespace test
+}  // namespace hawq
+#endif
\ No newline at end of file
diff --git a/src/test/feature/lib/psql.cpp b/src/test/feature/lib/psql.cpp
index 0e71d56..386864e 100644
--- a/src/test/feature/lib/psql.cpp
+++ b/src/test/feature/lib/psql.cpp
@@ -17,8 +17,14 @@
  */
 
 #include <unistd.h>
+#include <stdio.h>
+#include <string.h>
 #include "psql.h"
+
+//
+#include <iostream>
 #include "command.h"
+#include "../../../interfaces/libpq/libpq-int.h"
 
 using std::string;
 
@@ -49,6 +55,11 @@ const std::vector<string>& PSQLQueryResult::getFields() 
const {
   return this->_fields;
 }
 
+const std::vector<int>& PSQLQueryResult::getFieldSizes() const {
+  return this->_fieldsizes;
+}
+
+
 const std::vector<string>& PSQLQueryResult::getRow(int ri) const {
   return this->getRows()[ri];
 }
@@ -82,11 +93,18 @@ void PSQLQueryResult::savePGResult(const PGresult* res) {
   int nfields = PQnfields(res);
   for (int i = 0; i < nfields; i++) {
     this->_fields.push_back(PQfname(res, i));
+    this->_fieldsizes.push_back(PQfsize(res,i));
   }
   for (int i = 0; i < PQntuples(res); i++) {
     std::vector<string> row;
     for (int j = 0; j < nfields; j++) {
       row.push_back(PQgetvalue(res, i, j));
+      int size = row[j].size();
+      int size2 = _fieldsizes[j];
+      if (size > size2)
+      {
+         _fieldsizes[j] = size;
+      }
     }
     this->_rows.push_back(row);
   }
@@ -96,6 +114,7 @@ void PSQLQueryResult::reset() {
   this->_errmsg.clear();
   this->_rows.clear();
   this->_fields.clear();
+  this->_fieldsizes.clear();
 }
 
 PSQL& PSQL::runSQLCommand(const string& sql_cmd) {
@@ -112,6 +131,21 @@ PSQL& PSQL::runSQLFile(const string& sql_file, bool 
printTupleOnly) {
   return *this;
 }
 
+PSQL& PSQL::runSQLFile(const string& sql_file, const string &psqlOptions, bool 
printTupleOnly) {
+  this->_last_status =
+      
hawq::test::Command::getCommandStatus(this->_getPSQLFileCommand(sql_file, 
psqlOptions, printTupleOnly));
+  return *this;
+}
+
+bool PSQL::testConnection() {
+  PGconn* conn = NULL;
+  conn = PQconnectdb(this->getConnectionString().c_str());
+  if (PQstatus(conn) != CONNECTION_OK) {
+     this->_result.setErrorMessage(PQerrorMessage(conn));
+     return false;
+  }
+  return true;
+}
 const PSQLQueryResult& PSQL::getQueryResult(const string& sql) {
   PGconn* conn = NULL;
   PGresult* res = NULL;
@@ -142,11 +176,19 @@ done:
   return this->_result;
 }
 
+string PSQL::getHost(){
+       return this->_host;
+}
 PSQL& PSQL::setHost(const string& host) {
   this->_host = host;
   return *this;
 }
 
+PSQL& PSQL::setDatabase(const std::string& db){
+  this->_dbname = db;
+  return *this;
+}
+
 PSQL& PSQL::setPort(const string& port) {
   this->_port = port;
   return *this;
@@ -220,12 +262,23 @@ const string PSQL::_getPSQLFileCommand(const string& 
file, bool printTupleOnly)
   }
 }
 
+const string PSQL::_getPSQLFileCommand(const string& file, const string& 
psqlOptions, bool printTupleOnly) const {
+  string command = this->_getPSQLBaseCommand();
+  command.append(psqlOptions);
+  if (printTupleOnly) {
+         return command.append(" -a -A -t -f").append(file);
+  }
+  else {
+         return command.append(" -a -f ").append(file);
+  }
+}
+
 bool PSQL::checkDiff(const string& expect_file,
                      const string& result_file,
                      bool save_diff,
                      const string& global_init_file,
                      const string& init_file) {
-  string diff_file = result_file + ".diff";
+  string diff_file = getDifFilePath(result_file);
   string command;
   command.append("gpdiff.pl -du ")
       .append(PSQL_BASIC_DIFF_OPTS);
@@ -256,5 +309,153 @@ bool PSQL::checkDiff(const string& expect_file,
   }
 }
 
+std::string PSQL::getDifFilePath(const std::string&result_file){
+  return result_file + ".diff";
+}
+
+
+
+LibPQSQL::LibPQSQL(const std::string &connstring)
+{
+       libpqconn = NULL;
+       connstr = connstring;
+}
+
+LibPQSQL::~LibPQSQL()
+{
+       disconnectDb();
+}
+
+bool LibPQSQL::connectDb()
+{
+    libpqconn = PQconnectdb(this->connstr.c_str());
+    if (PQstatus(libpqconn) != CONNECTION_OK) {
+        this->_result.setErrorMessage(PQerrorMessage(libpqconn));
+       if (libpqconn) {
+           PQfinish(libpqconn);
+           libpqconn = NULL;
+       }
+        return false;
+    }
+    return true;
+}
+
+
+bool LibPQSQL::execQuery(const string& sql,FILE* outputfile)
+{
+    if (!libpqconn)
+        return false;
+
+    PGresult* res = NULL;
+    bool status = true;
+    res = PQexec(libpqconn, sql.c_str());
+
+    if(PQresultStatus(res)!=PGRES_TUPLES_OK&&strcmp(res->cmdStatus,"SET")!=0)
+       if(outputfile!=NULL)
+               fprintf(outputfile,"%s\n",res->cmdStatus);
+    switch (PQresultStatus(res))
+    {
+        case PGRES_TUPLES_OK:
+        {
+            this->_result.reset();
+            this->_result.savePGResult(res);
+            break;
+        }
+        case PGRES_COMMAND_OK:
+        {
+            this->_result.reset();
+            this->_result.savePGResult(res);
+            break;
+        }
+        case PGRES_BAD_RESPONSE:
+        case PGRES_NONFATAL_ERROR:
+        case PGRES_FATAL_ERROR:
+        {
+            this->_result.reset();
+            //this->_result.savePGResult(res);
+            this->_result.setErrorMessage(res->errMsg);
+            break;
+        }
+        default:
+        {
+            this->_result.setErrorMessage(PQerrorMessage(libpqconn));
+            status = false;
+            break;
+        }
+    }
+    if (res)
+    {
+        PQclear(res);
+        res = NULL;
+    }
+    return status;
+}
+
+int32_t LibPQSQL::getRownum()
+{
+    const std::vector<std::vector<std::string>> rowdata = 
this->_result.getRows();
+    return rowdata.size();
+}
+
+
+void LibPQSQL::printQueryResult(FILE* outputfile)
+{
+    if(this->_result.isError()){
+        fprintf(outputfile,"%s",this->_result.getErrorMessage().c_str());
+        return ;
+    }
+    const std::vector<std::string> fields = this->_result.getFields();
+    if (fields.size() == 0 )
+        return;
+
+    const std::vector<std::vector<std::string>> rowdata = 
this->_result.getRows();
+    const std::vector<int> fieldsize = this->_result.getFieldSizes();
+
+    int totalsize = 0;
+    uint16_t i = 0;
+    string formatstring;
+    for ( i = 0; i < fields.size() ; i++)
+    {
+        totalsize += fieldsize[i] + 3;
+        if (i == 0)
+            formatstring = " %-" + std::to_string(fieldsize[i]) + "s ";
+        else
+            formatstring = "| %-" + std::to_string(fieldsize[i]) + "s ";
+        fprintf(outputfile, formatstring.c_str(), fields[i].c_str());
+    }
+    fprintf(outputfile, "\n");
+    for ( i = 0; i < totalsize ; i++)
+    {
+        fprintf(outputfile, "%s", "-");
+    }
+    fprintf(outputfile, "\n");
+
+    for ( i = 0; i < rowdata.size() ; i++)
+    {
+        for ( uint16_t j = 0; j < rowdata[i].size() ; j++)
+        {
+            if (j == 0)
+                formatstring = " %-" + std::to_string(fieldsize[j]) + "s ";
+            else
+                formatstring = "| %-" + std::to_string(fieldsize[j]) + "s ";
+            fprintf(outputfile, formatstring.c_str(), rowdata[i][j].c_str());
+        }
+        fprintf(outputfile, "\n");
+    }
+    if (rowdata.size() == 0)
+        fprintf(outputfile, "(0 row)\n");
+    else
+        fprintf(outputfile, "(%lu rows)\n", rowdata.size());
+}
+bool LibPQSQL::disconnectDb()
+{
+    if (libpqconn) {
+        PQfinish(libpqconn);
+        libpqconn = NULL;
+        return true;
+    }
+    return false;
+}
+
 } // namespace test
 } // namespace hawq
diff --git a/src/test/feature/lib/psql.h b/src/test/feature/lib/psql.h
index 215a43b..3e3ed23 100644
--- a/src/test/feature/lib/psql.h
+++ b/src/test/feature/lib/psql.h
@@ -21,7 +21,7 @@
 
 #include <string>
 #include <vector>
-
+#include <unistd.h>
 #include "command.h"
 #include "libpq-fe.h"
 
@@ -39,6 +39,7 @@ class PSQLQueryResult {
 
   const std::vector<std::vector<std::string> >& getRows() const;
   const std::vector<std::string>& getFields() const;
+  const std::vector<int>& getFieldSizes() const;
 
   const std::vector<std::string>& getRow(int ri) const;
   const std::string& getData(int ri, int ci) const;
@@ -53,10 +54,10 @@ class PSQLQueryResult {
  private:
   PSQLQueryResult(const PSQLQueryResult&);
   const PSQLQueryResult& operator=(const PSQLQueryResult&);
-
   std::string _errmsg;
   std::vector<std::vector<std::string> > _rows;
   std::vector<std::string> _fields;
+  std::vector<int> _fieldsizes;
 };
 
 class PSQL {
@@ -65,22 +66,27 @@ class PSQL {
        const std::string& port = "5432", const std::string& user = "gpadmin",
        const std::string& password = "")
       : _dbname(db), _host(host), _port(port),
-      _user(user), _password(password) {}
+      _user(user), _password(password) { _last_status = 0; }
   virtual ~PSQL(){};
 
   PSQL& runSQLCommand(const std::string& sql_cmd);
   PSQL& runSQLFile(const std::string& sql_file, bool printTupleOnly = false);
+  PSQL& runSQLFile(const std::string& sql_file,
+                   const std::string& psqlOptions, bool printTupleOnly = 
false);
   const PSQLQueryResult& getQueryResult(const std::string& sql);
 
   PSQL& setHost(const std::string& host);
+  PSQL& setDatabase(const std::string& db);
   PSQL& setPort(const std::string& port);
   PSQL& setUser(const std::string& username);
   PSQL& setPassword(const std::string& password);
   PSQL& setOutputFile(const std::string& out);
   std::string getConnectionString() const;
+  bool testConnection();
 
   int getLastStatus() const;
   const std::string& getLastResult() const;
+  std::string getHost();
 
   static bool checkDiff(const std::string& expect_file,
                         const std::string& result_file,
@@ -88,6 +94,8 @@ class PSQL {
                         const std::string& global_init_file = "",
                         const std::string& init_file = "");
 
+  static std::string getDifFilePath(const std::string& result_file);
+
   void resetOutput();
 
  private:
@@ -98,6 +106,12 @@ class PSQL {
   const std::string _getPSQLQueryCommand(const std::string& query) const;
   const std::string _getPSQLFileCommand(const std::string& file, bool 
printTupleOnly = false) const;
 
+  // Generate psql execute command
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param printTupleOnly whether only print or not
+  const std::string _getPSQLFileCommand(const std::string& file, const 
std::string& psqlOptions, bool printTupleOnly = false) const;
+
   std::string _dbname;
   std::string _host;
   std::string _port;
@@ -110,6 +124,31 @@ class PSQL {
   std::string _last_result;
 };
 
+
+class LibPQSQL {
+ public:
+       LibPQSQL(const std::string &connstring);
+       ~LibPQSQL();
+
+       //Connect database with the input connection string
+    bool connectDb();
+
+    //Disconnect database
+    bool disconnectDb();
+
+    //Run SQL and store result in _result
+    bool execQuery(const std::string &sql,FILE* outputfile = NULL);
+
+    //Output result to sqlfile
+    void printQueryResult(FILE* outputfile);
+
+    int32_t getRownum();
+
+ private:
+    PGconn* libpqconn;
+    std::string connstr;
+    PSQLQueryResult _result;
+};
 } // namespace test
 } // namespace hawq
 
diff --git a/src/test/feature/lib/sql_util.cpp 
b/src/test/feature/lib/sql_util.cpp
index 07071f4..1dbef13 100644
--- a/src/test/feature/lib/sql_util.cpp
+++ b/src/test/feature/lib/sql_util.cpp
@@ -21,8 +21,11 @@
 #include <unistd.h>
 
 #include <fstream>
+#include <iomanip>
+#include <regex>
 #include <vector>
 
+#include "hawq_config.h"
 #include "sql_util.h"
 #include "string_util.h"
 
@@ -41,83 +44,153 @@ namespace test {
 SQLUtility::SQLUtility(SQLUtilityMode mode)
     : testRootPath(getTestRootPath()),
       test_info(::testing::UnitTest::GetInstance()->current_test_info()) {
-  auto getConnection = [&] () {
+  auto getConnection = [&]() {
     string user = HAWQ_USER;
-    if(user.empty()) {
+    if (user.empty()) {
       struct passwd *pw;
       uid_t uid = geteuid();
       pw = getpwuid(uid);
       user.assign(pw->pw_name);
     }
-    conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user, 
HAWQ_PASSWORD));
+    conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user,
+                                    HAWQ_PASSWORD));
   };
   getConnection();
 
-  if (mode == MODE_SCHEMA) {
+  if (mode == MODE_SCHEMA || mode == MODE_SCHEMA_NODROP ||
+      mode == MODE_MADLIB) {
+    schemaName = string(test_info->test_case_name()) + "_" + test_info->name();
+    databaseName = HAWQ_DB;
+    sql_util_mode = mode;
+    if (mode == MODE_SCHEMA || mode == MODE_MADLIB) {
+      exec("DROP SCHEMA IF EXISTS " + schemaName + " CASCADE");
+      exec("CREATE SCHEMA " + schemaName);
+    } else {
+      execIgnore("CREATE SCHEMA " + schemaName);
+    }
+  } else if (mode == MODE_MADLIB) {
     schemaName = string(test_info->test_case_name()) + "_" + test_info->name();
     databaseName = HAWQ_DB;
     exec("DROP SCHEMA IF EXISTS " + schemaName + " CASCADE");
     exec("CREATE SCHEMA " + schemaName);
-    sql_util_mode = MODE_SCHEMA;
+
   } else {
     schemaName = HAWQ_DEFAULT_SCHEMA;
-    databaseName = "db_" + string(test_info->test_case_name()) + "_" + 
test_info->name();
-    std::transform(databaseName.begin(), databaseName.end(), 
databaseName.begin(), ::tolower);
+    databaseName =
+        "db_" + string(test_info->test_case_name()) + "_" + test_info->name();
+    std::transform(databaseName.begin(), databaseName.end(),
+                   databaseName.begin(), ::tolower);
     exec("DROP DATABASE IF EXISTS " + databaseName);
     exec("CREATE DATABASE " + databaseName);
     sql_util_mode = MODE_DATABASE;
   }
 }
 
+SQLUtility::SQLUtility(const std::string &db, const std::string &host,
+                       const std::string &port, const std::string &user)
+    : testRootPath(getTestRootPath()),
+      test_info(::testing::UnitTest::GetInstance()->current_test_info()) {
+  conn.reset(new hawq::test::PSQL(db, host, port, user, HAWQ_PASSWORD));
+  schemaName = HAWQ_DEFAULT_SCHEMA;
+  databaseName = db;
+  sql_util_mode = MODE_SCHEMA_NODROP;
+}
+
 SQLUtility::~SQLUtility() {
   if (!test_info->result()->Failed()) {
-
-       
//--------------------------------------------------------------------------
-       // This is a temporary work around to sleep a short time window in 
order to
-       // wait for the quit of query dispatcher processes. Because each query
-       // dispatcher has one resource heart-beat thread to be joined before the
-       // exit, in worst case, that thread will sleep 100ms and consequently 
check
-       // the switch variable to complete the exiting logic. This may causes 
the
-       // error reporting that the database is still accessed by other users, 
when
-       // user drops database once finished using database.
-       //
-       // When we have that exit logic improved, we can remove this logic.
-       
//--------------------------------------------------------------------------
+    
//--------------------------------------------------------------------------
+    // This is a temporary work around to sleep a short time window in order to
+    // wait for the quit of query dispatcher processes. Because each query
+    // dispatcher has one resource heart-beat thread to be joined before the
+    // exit, in worst case, that thread will sleep 100ms and consequently check
+    // the switch variable to complete the exiting logic. This may causes the
+    // error reporting that the database is still accessed by other users, when
+    // user drops database once finished using database.
+    //
+    // When we have that exit logic improved, we can remove this logic.
+    
//--------------------------------------------------------------------------
 
     usleep(200000);
-    if (schemaName != HAWQ_DEFAULT_SCHEMA) {
+    if (schemaName != HAWQ_DEFAULT_SCHEMA &&
+        sql_util_mode != MODE_SCHEMA_NODROP) {
       exec("DROP SCHEMA " + schemaName + " CASCADE");
     }
 
-    if (sql_util_mode ==  MODE_DATABASE) {
+    if (sql_util_mode == MODE_DATABASE) {
       exec("DROP DATABASE " + databaseName);
     }
   }
 }
 
-std::string SQLUtility::getDbName() {
-    return databaseName;
+std::string SQLUtility::getStrCurTime() {
+  time_t secs = time(NULL);
+  struct tm *tm_cur = localtime(&secs);
+  std::string str_time = std::to_string(tm_cur->tm_year + 1900) + "-" +
+                         std::to_string(tm_cur->tm_mon + 1) + "-" +
+                         std::to_string(tm_cur->tm_mday) + " " +
+                         std::to_string(tm_cur->tm_hour) + ":" +
+                         std::to_string(tm_cur->tm_min) + ":" +
+                         std::to_string(tm_cur->tm_sec);
+  return str_time;
 }
 
-std::string SQLUtility::getSchemaName() {
-    return schemaName;
+std::ostream &SQLUtility::time_log() {
+  std::cout.flags(std::ios::left);
+  std::cout << std::setw(18) << SQLUtility::getStrCurTime() << " => ";
+  return std::cout;
 }
 
+std::string SQLUtility::getHost() { return conn->getHost(); }
+
+std::string SQLUtility::getDbName() { return databaseName; }
+
+std::string SQLUtility::getSchemaName() { return schemaName; }
+
+void SQLUtility::setSchemaName(const std::string &name, bool isSwitch) {
+  schemaName = name;
+  if (!isSwitch) {
+    assert(name != HAWQ_DEFAULT_SCHEMA);
+    if (sql_util_mode != MODE_SCHEMA_NODROP) {
+      exec("DROP SCHEMA IF EXISTS " + schemaName + " CASCADE");
+      exec("CREATE SCHEMA " + schemaName);
+    } else {
+      execIgnore("CREATE SCHEMA " + schemaName);
+    }
+  }
+}
+
+std::string SQLUtility::getVersion() {
+  return getQueryResult("select version();");
+}
+
+// Set Host of connection value
+void SQLUtility::setHost(const std::string &host) { conn->setHost(host); }
+
+void SQLUtility::setDatabase(const std::string &db) { conn->setDatabase(db); }
+
 void SQLUtility::exec(const string &sql) {
   EXPECT_EQ(0, (conn->runSQLCommand(sql)).getLastStatus())
       << conn->getLastResult();
 }
 
+void SQLUtility::execIgnore(const string &sql) { conn->runSQLCommand(sql); }
+
 string SQLUtility::execute(const string &sql, bool check) {
   conn->runSQLCommand("SET SEARCH_PATH=" + schemaName + ";" + sql);
   EXPECT_NE(conn.get(), nullptr);
   if (check) {
-    EXPECT_EQ(0, conn->getLastStatus()) << conn->getLastResult();
+    EXPECT_EQ(0, conn->getLastStatus()) << sql << " " << conn->getLastResult();
     return "";
   }
   return conn.get()->getLastResult();
 }
 
+bool SQLUtility::executeSql(const std::string &sql) {
+  conn->runSQLCommand("SET SEARCH_PATH=" + schemaName + ";" + sql);
+  EXPECT_NE(conn.get(), nullptr);
+  return !conn->getLastStatus();
+}
+
 void SQLUtility::executeExpectErrorMsgStartWith(const std::string &sql,
                                                 const std::string &errmsg) {
   std::string errout = execute(sql, false);
@@ -147,13 +220,51 @@ void SQLUtility::query(const string &sql, const string 
&expectStr) {
   EXPECT_EQ(expectStr, resultStr);
 }
 
-void SQLUtility::execSQLFile(const string &sqlFile,
-                             const string &ansFile,
-                             const string &initFile,
-                             bool usingDefaultSchema,
+bool SQLUtility::checkAnsFile(const string &ansFileAbsPath,
+                              const string &outFileAbsPath,
+                              const string &initFile, const string &newSqlFile,
+                              FilePath &fp) {
+  // initFile if any
+  string initFileAbsPath;
+  if (!initFile.empty()) {
+    initFileAbsPath = testRootPath + "/" + initFile;
+    if (!std::ifstream(initFileAbsPath)) {
+      EXPECT_TRUE(false) << initFileAbsPath << " doesn't exist";
+      return false;
+    }
+    fp = splitFilePath(initFileAbsPath);
+    // double check to avoid empty fileBaseName
+    if (fp.fileBaseName.empty()) {
+      EXPECT_TRUE(false) << initFileAbsPath << " is invalid";
+      return false;
+    }
+  } else {
+    initFileAbsPath = "";
+  }
+
+  string globalInitFileAbsPath;
+  globalInitFileAbsPath = testRootPath + "/lib/global_init_file";
+
+  bool is_sql_ans_diff =
+      conn->checkDiff(ansFileAbsPath, outFileAbsPath, true,
+                      globalInitFileAbsPath, initFileAbsPath);
+  if (is_sql_ans_diff == false) {
+    // no diff, continue to delete the generated sql file
+    if (!newSqlFile.empty() && remove(newSqlFile.c_str())) {
+      EXPECT_TRUE(false) << "Error deleting file " << newSqlFile;
+      return false;
+    }
+    return true;
+  } else {
+    EXPECT_FALSE(is_sql_ans_diff) << conn->getDifFilePath(outFileAbsPath);
+    return false;
+  }
+}
+
+void SQLUtility::execSQLFile(const string &sqlFile, const string &ansFile,
+                             const string &initFile, bool usingDefaultSchema,
                              bool printTupleOnly) {
   FilePath fp;
-
   // do precheck for sqlFile & ansFile
   if (hawq::test::startsWith(sqlFile, "/") ||
       hawq::test::startsWith(ansFile, "/"))
@@ -177,42 +288,74 @@ void SQLUtility::execSQLFile(const string &sqlFile,
   conn->resetOutput();
 
   // initFile if any
-  string initFileAbsPath;
-  if (!initFile.empty()) {
-    initFileAbsPath = testRootPath + "/" + initFile;
-    if (!std::ifstream(initFileAbsPath))
-      ASSERT_TRUE(false) << initFileAbsPath << " doesn't exist";
-    fp = splitFilePath(initFileAbsPath);
-    // double check to avoid empty fileBaseName
-    if (fp.fileBaseName.empty())
-      ASSERT_TRUE(false) << initFileAbsPath << " is invalid";
-  } else {
-    initFileAbsPath = "";
+  checkAnsFile(ansFileAbsPath, outFileAbsPath, initFile, newSqlFile, fp);
+}
+
+void SQLUtility::execSQLFileandCheck(const string &sqlFile,
+                                     const string &outFile,
+                                     const std::string &pattern,
+                                     const std::string &sqlOptions) {
+  // do precheck for sqlFile
+  if (hawq::test::startsWith(sqlFile, "/")) {
+    ASSERT_TRUE(false)
+        << "For sqlFile, relative path to feature test root dir is needed";
   }
+  string sqlFileAbsPath = testRootPath + "/" + sqlFile;
+  FilePath fp = splitFilePath(sqlFileAbsPath);
+  string outFileAbsPath = testRootPath + "/" + outFile;
 
-  string globalInitFileAbsPath;
-  globalInitFileAbsPath = testRootPath + "/lib/global_init_file";
+  if (fp.fileBaseName.empty()) ASSERT_TRUE(false) << sqlFile << " is invalid";
 
-  bool is_sql_ans_diff = conn->checkDiff(ansFileAbsPath, outFileAbsPath, true, 
globalInitFileAbsPath, initFileAbsPath);
-  EXPECT_FALSE(is_sql_ans_diff);
-  if (is_sql_ans_diff == false) {
-    // no diff, continue to delete the generated sql file
-    if (remove(newSqlFile.c_str()))
-      ASSERT_TRUE(false) << "Error deleting file " << newSqlFile;
-  } else {
-    EXPECT_FALSE(true);
+  // generate new sql file with set search_path added at the begining
+  const string newSqlFile = generateSQLFile(sqlFile, false);
+  conn->setOutputFile(outFileAbsPath);
+
+  EXPECT_EQ(0, conn->runSQLFile(newSqlFile, sqlOptions).getLastStatus());
+  conn->resetOutput();
+  bool result = this->checkPatternInFile(outFile, pattern);
+  ASSERT_TRUE(result) << outFile << " find " << pattern;
+}
+
+bool SQLUtility::checkPatternInFile(const std::string &outFile,
+                                    const std::string &pattern) {
+  if (hawq::test::startsWith(outFile, "/")) {
+    std::cout << "ERROR : outfile format is error ";
+    return false;
   }
+  string outFileAbsPath = getTestRootPath() + "/" + outFile;
+  std::ifstream fin(outFileAbsPath);
+  std::string line;
+  bool result = true;
+  while (getline(fin, line) && result) {
+    std::regex re(pattern, std::regex_constants::icase);
+    if (std::regex_search(line, re)) {
+      result = false;
+      break;
+    }
+  }
+  fin.close();
+  return result;
+}
+
+bool SQLUtility::execAbsSQLFile(const string &sqlFile) {
+  // double check to avoid empty fileBaseName
+  FilePath fp = splitFilePath(sqlFile);
+  if (fp.fileBaseName.empty()) return false;
+  // outFile is located in the same folder with ansFile
+  string outFileAbsPath = "/tmp/" + fp.fileBaseName + ".out";
+
+  // run sql file and store its result in output file
+  conn->setOutputFile(outFileAbsPath);
+  return conn->runSQLFile(sqlFile).getLastStatus() == 0 ? true : false;
 }
 
 bool SQLUtility::execSQLFile(const string &sqlFile) {
   // do precheck for sqlFile
-  if (hawq::test::startsWith(sqlFile, "/"))
-    return false;
+  if (hawq::test::startsWith(sqlFile, "/")) return false;
 
   // double check to avoid empty fileBaseName
   FilePath fp = splitFilePath(sqlFile);
-  if (fp.fileBaseName.empty())
-    return false;
+  if (fp.fileBaseName.empty()) return false;
   // outFile is located in the same folder with ansFile
   string outFileAbsPath = "/tmp/" + fp.fileBaseName + ".out";
 
@@ -224,9 +367,290 @@ bool SQLUtility::execSQLFile(const string &sqlFile) {
   return conn->runSQLFile(newSqlFile).getLastStatus() == 0 ? true : false;
 }
 
-const string SQLUtility::generateSQLFile(const string &sqlFile, bool 
usingDefaultSchema) {
+int32_t SQLUtility::getErrorNumber(const string &outputFile) {
+  // do precheck for sqlFile
+  string outputAbspath;
+  if (hawq::test::startsWith(outputFile, "/"))
+    outputAbspath = outputFile;
+  else
+    outputAbspath = testRootPath + "/" + outputFile;
+  string cmdstr = "grep -i ERROR " + outputAbspath + " | wc -l";
+  string output = Command::getCommandOutput(cmdstr);
+  try {
+    return std::stoi(output);
+  } catch (...) {
+    return -1;
+  }
+}
+
+bool SQLUtility::__beforeExecSpecificSQLFile(const string &sqlFile,
+                                             const string &outputFile,
+                                             const string &sqlFilePrefix,
+                                             string &newSqlFile,
+                                             string &outputAbsPath) {
+  // do precheck for sqlFile
+  if (hawq::test::startsWith(sqlFile, "/")) return false;
+
+  // double check to avoid empty fileBaseName
+  FilePath fp = splitFilePath(sqlFile);
+  if (fp.fileBaseName.empty()) return false;
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile = generateSQLFile(sqlFile, false, sqlFilePrefix);
+
+  // outFile is located in the same folder with ansFile
+  outputAbsPath = testRootPath + "/" + outputFile;
+
+  return true;
+}
+
+bool SQLUtility::execSpecificSQLFile(const string &sqlFile,
+                                     const string &outputFile,
+                                     const string &psqlOptions,
+                                     const string &sqlFilePrefix) {
+  string newSqlFile, outputAbsPath;
+  if (!__beforeExecSpecificSQLFile(sqlFile, outputFile, sqlFilePrefix,
+                                   newSqlFile, outputAbsPath))
+    return false;
+
+  conn->setOutputFile(outputAbsPath);
+
+  bool result = conn->runSQLFile(newSqlFile, psqlOptions).getLastStatus() == 0
+                    ? true
+                    : false;
+  conn->resetOutput();
+  return result;
+}
+
+bool SQLUtility::__beforeBoolExecAppendSQLFile(const string &sqlFile,
+                                               const string &outputFile,
+                                               const string &sqlFilePrefix,
+                                               const string &appendString,
+                                               string &newSqlFile) {
+  // do precheck for sqlFile
+  if (hawq::test::startsWith(sqlFile, "/")) return false;
+
+  // double check to avoid empty fileBaseName
+  FilePath fp = splitFilePath(sqlFile);
+  if (fp.fileBaseName.empty()) return false;
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile = generateSQLFile(sqlFile, false, sqlFilePrefix, appendString);
+
+  // outFile is located in the same folder with ansFile
+  std::string outputAbsPath = testRootPath + "/" + outputFile;
+  conn->setOutputFile(outputAbsPath);
+  return true;
+}
+
+bool SQLUtility::execAppendSQLFile(const string &sqlFile,
+                                   const string &outputFile,
+                                   const string &psqlOptions,
+                                   const string &sqlFilePrefix,
+                                   const string &appendString) {
+  std::string newSqlFile;
+  if (!__beforeBoolExecAppendSQLFile(sqlFile, outputFile, sqlFilePrefix,
+                                     appendString, newSqlFile))
+    return false;
+
+  bool result = conn->runSQLFile(newSqlFile, psqlOptions).getLastStatus() == 0
+                    ? true
+                    : false;
+  conn->resetOutput();
+  return result;
+}
+
+void SQLUtility::__beforeRunSqlfile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &sqlFilePrefix, const string &appendString,
+    bool usingDefaultSchema, FilePath &fp, std::string &ansFileAbsPath,
+    std::string &newSqlFile, std::string &outputAbsPath) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  ansFileAbsPath = testRootPath + "/" + ansFile;
+  if (!std::ifstream(ansFileAbsPath))
+    ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+  fp = splitFilePath(ansFileAbsPath);
+  // double check to avoid empty fileBaseName
+  if (fp.fileBaseName.empty())
+    ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile =
+      generateSQLFile(sqlFile, usingDefaultSchema, sqlFilePrefix, 
appendString);
+
+  // outFile is located in the same folder with ansFile
+  outputAbsPath = testRootPath + "/" + outputFile;
+}
+
+bool SQLUtility::__afterRunSqlfile(const std::string &initFile,
+                                   bool sortoutfile, FilePath &fp,
+                                   std::string &ansFileAbsPath,
+                                   std::string &outputAbsPath,
+                                   std::string &newSqlFile) {
+  if (sortoutfile == false)
+    return checkAnsFile(ansFileAbsPath, outputAbsPath, initFile, newSqlFile,
+                        fp);
+  else {
+    hawq::test::Command comm;
+    std::string command;
+    fp = splitFilePath(outputAbsPath);
+    if (fp.fileBaseName.empty()) {
+      EXPECT_TRUE(false) << outputAbsPath << " is invalid";
+      return false;
+    }
+    std::string sortoutFileAbsPath =
+        fp.path + "/" + fp.fileBaseName + "_sort.out";
+    command = "sort " + outputAbsPath + " > " + sortoutFileAbsPath;
+    int result = comm.getCommandStatus(command);
+    EXPECT_TRUE(result != -1);
+    return checkAnsFile(ansFileAbsPath, sortoutFileAbsPath, initFile, "", fp);
+  }
+}
+
+void SQLUtility::execAppendSQLFile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &psqlOptions, const string &sqlFilePrefix,
+    const string &appendString, const string &initFile, bool 
usingDefaultSchema,
+    bool printTupleOnly, bool sortoutfile) {
+  FilePath fp;
+  std::string ansFileAbsPath, newSqlFile, outputAbsPath;
+  __beforeRunSqlfile(sqlFile, outputFile, ansFile, sqlFilePrefix, appendString,
+                     usingDefaultSchema, fp, ansFileAbsPath, newSqlFile,
+                     outputAbsPath);
+  conn->setOutputFile(outputAbsPath);
+  EXPECT_EQ(0, conn->runSQLFile(newSqlFile, psqlOptions, printTupleOnly)
+                   .getLastStatus());
+  conn->resetOutput();
+  __afterRunSqlfile(initFile, sortoutfile, fp, ansFileAbsPath, outputAbsPath,
+                    newSqlFile);
+}
+
+void SQLUtility::runParallelSQLFile(const string sqlFile,
+                                    const string outputFile, int processnum,
+                                    const string ansFile,
+                                    bool usingDefaultSchema) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  // generate new sql file with set search_path added at the begining
+  const string newSqlFile = generateSQLFile(sqlFile, usingDefaultSchema);
+  pid_t childpid;
+  std::vector<pid_t> childprocess;
+  for (int i = 0; i < processnum; i++) {
+    childpid = fork();
+    if (childpid == -1) ASSERT_TRUE(false) << "Fork child process error";
+    if (childpid == 0) {
+      auto filename = hawq::test::split(outputFile, '.');
+      string outFileAbsPath = testRootPath + "/" + filename[0] + "_process_" +
+                              std::to_string(i) + ".out";
+      conn->setOutputFile(outFileAbsPath);
+      EXPECT_TRUE(conn->runSQLFile(newSqlFile).getLastStatus() == 0 ? true
+                                                                    : false);
+      exit(0);
+    } else {
+      childprocess.push_back(childpid);
+    }
+  }
+  while (childprocess.size() > 0) {
+    int length = childprocess.size();
+    pid_t pid = waitpid(childprocess[length - 1], NULL, 0);
+    if (pid == childprocess[length - 1]) childprocess.pop_back();
+  }
+}
+
+void SQLUtility::runParallelSQLFile(const string sqlFile,
+                                    const string outputFile, int processnum,
+                                    std::vector<std::string> &options,
+                                    const string ansFile,
+                                    bool usingDefaultSchema) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  // generate new sql file with set search_path added at the begining
+  const string newSqlFile = generateSQLFile(sqlFile, usingDefaultSchema);
+  pid_t childpid;
+  std::vector<pid_t> childprocess;
+  for (int i = 0; i < processnum; i++) {
+    childpid = fork();
+    if (childpid == -1) ASSERT_TRUE(false) << "Fork child process error";
+    if (childpid == 0) {
+      auto filename = hawq::test::split(outputFile, '.');
+      string outFileAbsPath = testRootPath + "/" + filename[0] + "_process_" +
+                              std::to_string(i) + ".out";
+      conn->setOutputFile(outFileAbsPath);
+      EXPECT_TRUE(conn->runSQLFile(newSqlFile, options[i]).getLastStatus() == 0
+                      ? true
+                      : false);
+      exit(0);
+    } else {
+      childprocess.push_back(childpid);
+    }
+  }
+  while (childprocess.size() > 0) {
+    int length = childprocess.size();
+    pid_t pid = waitpid(childprocess[length - 1], NULL, 0);
+    if (pid == childprocess[length - 1]) childprocess.pop_back();
+  }
+}
+
+void SQLUtility::__beforeExecSpecificSQLFile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &sqlFilePrefix, bool usingDefaultSchema, FilePath &fp,
+    string &ansFileAbsPath, string &newSqlFile, string &outputAbsPath) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  ansFileAbsPath = testRootPath + "/" + ansFile;
+  if (!std::ifstream(ansFileAbsPath))
+    ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+  fp = splitFilePath(ansFileAbsPath);
+  // double check to avoid empty fileBaseName
+  if (fp.fileBaseName.empty())
+    ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile = generateSQLFile(sqlFile, usingDefaultSchema, sqlFilePrefix);
+
+  // outFile is located in the same folder with ansFile
+  outputAbsPath = testRootPath + "/" + outputFile;
+}
+
+void SQLUtility::execSpecificSQLFile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &psqlOptions, const string &sqlFilePrefix,
+    const string &initFile, bool usingDefaultSchema, bool printTupleOnly) {
+  FilePath fp;
+  std::string ansFileAbsPath, newSqlFile, outputAbsPath;
+
+  __beforeExecSpecificSQLFile(sqlFile, outputFile, ansFile, sqlFilePrefix,
+                              usingDefaultSchema, fp, ansFileAbsPath,
+                              newSqlFile, outputAbsPath);
+
+  conn->setOutputFile(outputAbsPath);
+  EXPECT_EQ(0, conn->runSQLFile(newSqlFile, psqlOptions, printTupleOnly)
+                   .getLastStatus());
+  conn->resetOutput();
+
+  checkAnsFile(ansFileAbsPath, outputAbsPath, initFile, newSqlFile, fp);
+}
+
+const string SQLUtility::generateSQLFile(const string &sqlFile,
+                                         bool usingDefaultSchema,
+                                         const string &sqlFileSurfix,
+                                         const string &appendString) {
   const string originSqlFile = testRootPath + "/" + sqlFile;
-  const string newSqlFile = "/tmp/" + string(test_info->test_case_name()) + 
"_" + test_info->name() + ".sql";
+  const string newSqlFile = "/tmp/" + string(test_info->test_case_name()) +
+                            "_" + test_info->name() + sqlFileSurfix + ".sql";
   std::fstream in;
   in.open(originSqlFile, std::ios::in);
   if (!in.is_open()) {
@@ -238,13 +662,18 @@ const string SQLUtility::generateSQLFile(const string 
&sqlFile, bool usingDefaul
     EXPECT_TRUE(false) << "Error opening file " << newSqlFile;
   }
   out << "-- start_ignore" << std::endl;
-  if (!usingDefaultSchema) {
-      out << "SET SEARCH_PATH=" + schemaName + ";" << std::endl;
+  if (sql_util_mode == MODE_MADLIB) {
+    out << "SET SEARCH_PATH=" << schemaName << ",madlib;" << std::endl;
+  } else if (!usingDefaultSchema) {
+    out << "SET SEARCH_PATH=" + schemaName + ";" << std::endl;
   }
-  if (sql_util_mode ==  MODE_DATABASE) {
+  if (sql_util_mode == MODE_DATABASE) {
     out << "\\c " << databaseName << std::endl;
   }
   out << "-- end_ignore" << std::endl;
+
+  if (appendString.size() > 0) out << appendString << "\n";
+
   string line;
   while (getline(in, line)) {
     out << line << std::endl;
@@ -260,9 +689,60 @@ const hawq::test::PSQLQueryResult 
&SQLUtility::executeQuery(const string &sql) {
   return result;
 }
 
+std::string SQLUtility::getHawqDfsURL() {
+  std::string url = this->getGUCValue("hawq_dfs_url");
+  if (url[url.size() - 1] != '/') {
+    url += "/";
+  }
+  return url;
+}
+
+bool SQLUtility::isDarwin() {
+  std::string output = Command::getCommandOutput("uname | grep Darwin | wc 
-l");
+  int num = atoi(output.c_str());
+  return num != 0;
+}
+
+std::string SQLUtility::getHawqMagmaURL() {
+  hawq::test::HawqConfig hc;
+  std::vector<std::string> segs;
+  hc.getSlaves(segs);
+
+  if (segs.empty()) {
+    return "";
+  } else {
+    // use magma service on first segment host specified in $GPHOME/etc/slaves
+    return segs[0] + ":" + this->getGUCValue("hawq_magma_port_segment");
+  }
+}
+
+std::string SQLUtility::getHawqDfsHost() {
+  string url = this->getGUCValue("hawq_dfs_url");
+  std::size_t pos = url.find("/");
+  std::string result = url.substr(0, pos);
+  return result;
+}
+
+std::string SQLUtility::getHdfsPath() {
+  string url = this->getGUCValue("hawq_dfs_url");
+  std::size_t found = url.find("/");
+  std::string result = url.substr(found);
+  if (result[result.size() - 1] != '/') {
+    result += "/";
+  }
+  return result;
+}
+
+std::string SQLUtility::getHdfsNamenode() {
+  string url = this->getGUCValue("hawq_dfs_url");
+  std::size_t found = url.find("/");
+  std::string result = url.substr(0, found + 1);
+  return result;
+}
+
 const hawq::test::PSQL *SQLUtility::getPSQL() const { return conn.get(); }
 
-string SQLUtility::getTestRootPath() const {
+string SQLUtility::getTestRootPath() {
   string result;
 #ifdef __linux__
   char pathbuf[PATH_MAX];
@@ -294,19 +774,22 @@ std::string SQLUtility::getGUCValue(const std::string 
&guc) {
   string sql = "show " + guc;
   const hawq::test::PSQLQueryResult &result = executeQuery(sql);
   EXPECT_EQ(result.rowCount(), 1);
-  std::vector<std::string> row = result.getRows()[0];
-  return row[0];
+  if (result.rowCount() == 1) {
+    std::vector<std::string> row = result.getRows()[0];
+    return row[0];
+  }
+  return "";
 }
 
-std::string SQLUtility::getQueryResult(const std::string &query) {
+std::string SQLUtility::getQueryResult(const std::string &query, bool check) {
   const hawq::test::PSQLQueryResult &result = executeQuery(query);
-  EXPECT_LE(result.rowCount(), 1);
   std::string value;
-  if (result.rowCount() == 1)
-  {
+  if (check) EXPECT_LE(result.rowCount(), 1);
+  if (result.rowCount() == 1) {
     value = result.getRows()[0][0];
+  } else {
+    value = "";
   }
-
   return value;
 }
 
@@ -321,7 +804,7 @@ std::string SQLUtility::getQueryResultSetString(const 
std::string &query) {
   return resultStr;
 }
 
-FilePath SQLUtility::splitFilePath(const string &filePath) const {
+FilePath SQLUtility::splitFilePath(const string &filePath) {
   FilePath fp;
   size_t found1 = filePath.find_last_of("/");
   size_t found2 = filePath.find_last_of(".");
@@ -331,5 +814,51 @@ FilePath SQLUtility::splitFilePath(const string &filePath) 
const {
   return fp;
 }
 
-} // namespace test
-} // namespace hawq
+void SQLUtility::runParallelSQLFile_sqllist(
+    std::vector<std::string> &sqlfiles, std::vector<std::string> &ansfiles) {
+  int len = sqlfiles.size();
+  int anslen = ansfiles.size();
+  int i;
+  FilePath fp;
+  std::string sqlFileAbsPath, ansFileAbsPath, outFileAbsPath;
+  // do precheck for sqlFile & ansFile
+  if (len != anslen)
+    ASSERT_TRUE(false)
+        << "The number of sqlfile is not equal to the number of ansfile";
+  for (i = 0; i < len; i++) {
+    if (hawq::test::startsWith(sqlfiles[i], "/") ||
+        hawq::test::startsWith(ansfiles[i], "/"))
+      ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature 
"
+                            "test root dir is needed";
+    ansFileAbsPath = testRootPath + "/" + ansfiles[i];
+    if (!std::ifstream(ansFileAbsPath))
+      ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+  }
+  pid_t childpid;
+  std::vector<pid_t> childprocess;
+  for (i = 0; i < len; i++) {
+    childpid = fork();
+    if (childpid == -1) ASSERT_TRUE(false) << "Fork child process error";
+    if (childpid == 0) {
+      sqlFileAbsPath = testRootPath + "/" + sqlfiles[i];
+      ansFileAbsPath = testRootPath + "/" + ansfiles[i];
+      fp = splitFilePath(ansFileAbsPath);
+      outFileAbsPath = fp.path + "/" + fp.fileBaseName + ".out";
+      conn->setOutputFile(outFileAbsPath);
+      EXPECT_TRUE(
+          conn->runSQLFile(sqlFileAbsPath).getLastStatus() == 0 ? true : 
false);
+      conn->resetOutput();
+      checkAnsFile(ansFileAbsPath, outFileAbsPath, "", "", fp);
+      exit(0);
+    } else
+      childprocess.push_back(childpid);
+  }
+  while (childprocess.size() > 0) {
+    int length = childprocess.size();
+    pid_t pid = waitpid(childprocess[length - 1], NULL, 0);
+    if (pid == childprocess[length - 1]) childprocess.pop_back();
+  }
+}
+
+}  // namespace test
+}  // namespace hawq
diff --git a/src/test/feature/lib/sql_util.h b/src/test/feature/lib/sql_util.h
index 3bdce10..748ad31 100644
--- a/src/test/feature/lib/sql_util.h
+++ b/src/test/feature/lib/sql_util.h
@@ -28,14 +28,25 @@
 #define HAWQ_DB (getenv("PGDATABASE") ? getenv("PGDATABASE") : "postgres")
 #define HAWQ_HOST (getenv("PGHOST") ? getenv("PGHOST") : "localhost")
 #define HAWQ_PORT (getenv("PGPORT") ? getenv("PGPORT") : "5432")
-#define HAWQ_USER (getenv("PGUSER") ? getenv("PGUSER") : "")
+#define HAWQ_USER (getenv("PGUSER") ? getenv("PGUSER") : getenv("USER"))
 #define HAWQ_PASSWORD (getenv("PGPASSWORD") ? getenv("PGPASSWORD") : "")
 #define HAWQ_DEFAULT_SCHEMA ("public")
 #define RANGER_HOST (getenv("RANGERHOST") ? getenv("RANGERHOST") : "localhost")
+#define KUBENET_MASTER \
+  (getenv("KUBENET_MASTER") ? getenv("KUBENET_MASTER") : "localhost")
+#define HIVE_HOST (getenv("HIVEHOST") ? getenv("HIVEHOST") : "localhost")
+#define HIVE_PORT (getenv("HIVEPORT") ? getenv("HIVEPORT") : "9083")
+#define TIME_LOG() hawq::test::SQLUtility::time_log()
+#define EXPECT_TRUE_RET_IF_FALSE(expression, errorMessage) \
+  {                                                        \
+    bool ret;                                              \
+    EXPECT_TRUE(ret = (expression)) << errorMessage;       \
+    if (!ret) return false;                                \
+  }
 
 namespace hawq {
 namespace test {
- 
+
 struct FilePath {
   std::string path;
   std::string fileBaseName;
@@ -43,15 +54,44 @@ struct FilePath {
 };
 
 enum SQLUtilityMode {
-    MODE_SCHEMA,
-    MODE_DATABASE,
-    MODE_MAX_NUM
+  MODE_SCHEMA,
+  MODE_DATABASE,
+  MODE_MAX_NUM,
+  MODE_MADLIB,
+  MODE_SCHEMA_NODROP
 };
 
+#define COST_TIME(START_TIMEB, END_TIMEB, UINT32_COST_TIME, CODES) \
+  timeb START_TIMEB, END_TIMEB;                                    \
+  ftime(&START_TIMEB);                                             \
+  CODES                                                            \
+  ftime(&END_TIMEB);                                               \
+  UINT32_COST_TIME =                                               \
+      (uint32_t)(((END_TIMEB.timezone - END_TIMEB.timezone) * 60 + \
+                  (END_TIMEB.time - START_TIMEB.time)) *           \
+                     1000 +                                        \
+                 (END_TIMEB.millitm - START_TIMEB.millitm));
+
 class SQLUtility {
  public:
   SQLUtility(SQLUtilityMode mode = MODE_SCHEMA);
+
+  SQLUtility(const std::string &db, const std::string &host = "localhost",
+             const std::string &port = "5432",
+             const std::string &user = "gpadmin");
+
   ~SQLUtility();
+  // Get the local system time
+  // @return string of local system time
+  static std::string getStrCurTime();
+
+  // print log with current local time
+  // @return std::cout
+  static std::ostream &time_log();
+
+  // Get the test Host
+  // @return string of the test Host
+  std::string getHost();
 
   // Get the test database name
   // @return string of the test database name
@@ -60,13 +100,31 @@ class SQLUtility {
   // Get the test schema name
   // @return string of the test schema name
   std::string getSchemaName();
-  
+
+  // Get the test database version
+  // @return string of test database version
+  std::string getVersion();
+
+  // Set a new test SchemaName
+  void setSchemaName(const std::string &name, bool isSwitch = false);
+
+  // test the connection is ok or not
+  bool testConnection() { return conn->testConnection(); }
+
+  // return test info of gtest
+  const ::testing::TestInfo *const getTestInfo() { return this->test_info; }
+
   // Execute sql command
   // @param sql The given sql command
   // @param check true(default) if expected correctly executing, false 
otherwise
   // @return error or notice message if check is false, else return empty
   std::string execute(const std::string &sql, bool check = true);
 
+  // Execute sql command
+  // @param sql The given sql command
+  // @return true if executed sql successfully, else return false
+  bool executeSql(const std::string &sql);
+
   // Execute sql command and ignore the behavior of its running status
   // @param sql The given sql command
   // @return void
@@ -87,34 +145,167 @@ class SQLUtility {
   // Execute sql file and diff with ans file
   // @param sqlFile The given sqlFile which is relative path to test root dir
   // @param ansFile The given ansFile which is relative path to test root dir
-  // @param initFile The given initFile (used by gpdiff.pl) which is relative 
path to test root dir
+  // @param initFile The given initFile (used by gpdiff.pl) which is relative
+  // path to test root dir
   // @return void
   void execSQLFile(const std::string &sqlFile, const std::string &ansFile,
-                 const std::string &initFile = "", bool usingDefaultSchema = 
false,
-                 bool printTupleOnly = false);
+                   const std::string &initFile = "",
+                   bool usingDefaultSchema = false,
+                   bool printTupleOnly = false);
 
   // Execute sql file and check its return status
   // @param sqlFile The given sqlFile which is relative path to test root dir
   // @return true if the sql file is executed successfully, false otherwise
   bool execSQLFile(const std::string &sqlFile);
 
+  // Execute sql file and check its return status
+  // @param sqlFile The given sqlFile which is absolute path
+  // @return true if the sql file is executed successfully, false otherwise
+  bool execAbsSQLFile(const std::string &sqlFile);
+
+  // Execute sql file and check its output file
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outFile The given outFile which is relatice path to test root dir
+  // @param pattern provides the keywords such as ERROR to check in output file
+  // @return true if the sql file is executed successfully, false otherwise
+  void execSQLFileandCheck(const std::string &sqlFile,
+                           const std::string &outFile,
+                           const std::string &pattern = "",
+                           const std::string &sqloptions = "");
+
+  // Execute sql file and diff with ans file
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test 
root
+  // dir
+  // @param ansFile The given ansFile which is relative path to test root dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param initFile The given initFile (used by gpdiff.pl) which is relative
+  // path to test root dir
+  // @return void
+  void execSpecificSQLFile(
+      const std::string &sqlFile, const std::string &outputFile,
+      const std::string &ansFile, const std::string &psqlOptions,
+      const std::string &sqlFilePrefix, const std::string &initFile = "",
+      bool usingDefaultSchema = false, bool printTupleOnly = false);
+
+  bool execSpecificSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                           const std::string &outputFile,
+                           const std::string &ansFile,
+                           const std::string &psqlOptions,
+                           const std::string &sqlFilePrefix,
+                           const std::string &initFile = "",
+                           bool usingDefaultSchema = false,
+                           bool printTupleOnly = false);
+
+  // Execute sql file and check its return status
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test 
root
+  // dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @return true if the sql file is executed successfully, false otherwise
+  bool execSpecificSQLFile(const std::string &sqlFile,
+                           const std::string &outputFile,
+                           const std::string &psqlOptions,
+                           const std::string &sqlFilePrefix);
+
+  bool execSpecificSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                           const std::string &outputFile,
+                           const std::string &psqlOptions,
+                           const std::string &sqlFilePrefix);
+
+  // Execute sql file with append string in the head and check its return 
status
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test 
root
+  // dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param appendString The append string such as \timing  explain analyze
+  // @return true if the sql file is executed successfully, false otherwise
+  bool execAppendSQLFile(const std::string &sqlFile,
+                         const std::string &outputFile,
+                         const std::string &psqlOptions,
+                         const std::string &sqlFilePrefix,
+                         const std::string &appendString);
+
+  bool execAppendSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                         const std::string &outputFile,
+                         const std::string &psqlOptions,
+                         const std::string &sqlFilePrefix,
+                         const std::string &appendString);
+  // Execute sql file and diff with ans file
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test 
root
+  // dir
+  // @param ansFile The given ansFile which is relative path to test root dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param appendString The append string such as \timing  explain analyze
+  // @param initFile The given initFile (used by gpdiff.pl) which is relative
+  // path to test root dir
+  // @param sortoutfile new executor do not support orderby sometimes check
+  // outfile need to sort it before
+  // @return void
+  void execAppendSQLFile(
+      const std::string &sqlFile, const std::string &outputFile,
+      const std::string &ansFile, const std::string &psqlOptions,
+      const std::string &sqlFilePrefix, const std::string &appendString,
+      const std::string &initFile = "", bool usingDefaultSchema = false,
+      bool printTupleOnly = false, bool sortoutfile = false);
+
+  bool execAppendSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                         const std::string &outputFile,
+                         const std::string &ansFile,
+                         const std::string &psqlOptions,
+                         const std::string &sqlFilePrefix,
+                         const std::string &appendString,
+                         const std::string &initFile = "",
+                         bool usingDefaultSchema = false,
+                         bool printTupleOnly = false, bool sortoutfile = 
false);
+
+  void runParallelSQLFile(const std::string sqlFile,
+                          const std::string outputFile, int processnum,
+                          const std::string ansFile = "",
+                          bool usingDefaultSchema = false);
+
+  void runParallelSQLFile(const std::string sqlFile,
+                          const std::string outputFile, int processnum,
+                          std::vector<std::string> &options,
+                          const std::string ansFile = "",
+                          bool usingDefaultSchema = false);
+
+  // Return the number of error in the output file
+  // @param filename or matched string such as path/*.out
+  int32_t getErrorNumber(const std::string &outputFile);
+
+  // check error in the file
+  // @param outFile The given outFile which is relatice path to test root dir
+  // @param pattern provides the keywords such as ERROR to check in output file
+  // @return true if the sql file is executed successfully, false otherwis
+  static bool checkPatternInFile(const std::string &outFile,
+                                 const std::string &pattern);
+
   // Get PSQL connection: do not suggest to use
   // @return PSQL raw pointer
   const hawq::test::PSQL *getPSQL() const;
 
   // Get test root dir abs path
   // @return path string
-  std::string getTestRootPath() const;
+  static std::string getTestRootPath();
 
   // Set GUC value
   void setGUCValue(const std::string &guc, const std::string &value);
+
+  // Set Host of connection value
+  void setHost(const std::string &host);
+
+  // Set Database of connection value
+  void setDatabase(const std::string &db);
+
   // Get GUC value
   std::string getGUCValue(const std::string &guc);
 
   // execute given query and return query result
   // @param query the given query
   // @return the query result
-  std::string getQueryResult(const std::string &query);
+  std::string getQueryResult(const std::string &query, bool check = true);
 
   std::string getQueryResultSetString(const std::string &query);
 
@@ -122,26 +313,94 @@ class SQLUtility {
   // @param sql the given sql command
   // @param errmsg the expected sql error message
   // @return void
-  void executeExpectErrorMsgStartWith(const std::string &sql, const 
std::string &errmsg);
+  void executeExpectErrorMsgStartWith(const std::string &sql,
+                                      const std::string &errmsg);
 
   const hawq::test::PSQLQueryResult &executeQuery(const std::string &sql);
 
+  // return hdfs path : hawq_default
+  std::string getHdfsPath();
+
+  // return hdfs namenode and port: localhost:8020
+  std::string getHdfsNamenode();
+
+  // return hdfs dfs url: localhost:8020/hawq_default
+  std::string getHawqDfsURL();
+
+  // return hdfs dfs host: localhost:8020
+  std::string getHawqDfsHost();
+
+  // return magma service url: hostname:50001
+  std::string getHawqMagmaURL();
+  void runParallelSQLFile_sqllist(std::vector<std::string> &sqlfile,
+                                  std::vector<std::string> &ansfile);
+  // return true if OS is Darwin
+  bool isDarwin();
+
  private:
   std::unique_ptr<hawq::test::PSQL> getConnection();
-  const std::string generateSQLFile(const std::string &sqlFile, bool 
usingDefaultSchema);
-  FilePath splitFilePath(const std::string &filePath) const;
+  const std::string generateSQLFile(const std::string &sqlFile,
+                                    bool usingDefaultSchema,
+                                    const std::string &sqlFileSurfix = "",
+                                    const std::string &appendString = "");
+
+  bool __beforeBoolExecAppendSQLFile(const std::string &sqlFile,
+                                     const std::string &outputFile,
+                                     const std::string &sqlFilePrefix,
+                                     const std::string &appendString,
+                                     std::string &newSqlFile);
+
+  void __beforeRunSqlfile(const std::string &sqlFile,
+                          const std::string &outputFile,
+                          const std::string &ansFile,
+                          const std::string &sqlFilePrefix,
+                          const std::string &appendString,
+                          bool usingDefaultSchema, FilePath &fp,
+                          std::string &ansFileAbsPath, std::string &newSqlFile,
+                          std::string &outputAbsPath);
+
+  bool __afterRunSqlfile(const std::string &initFile, bool sortoutfile,
+                         FilePath &fp, std::string &ansFileAbsPath,
+                         std::string &outputAbsPath, std::string &newSqlFile);
+
+  bool __beforeExecSpecificSQLFile(const std::string &sqlFile,
+                                   const std::string &outputFile,
+                                   const std::string &sqlFilePrefix,
+                                   std::string &newSqlFile,
+                                   std::string &outputAbsPath);
+
+  void __beforeExecSpecificSQLFile(
+      const std::string &sqlFile, const std::string &outputFile,
+      const std::string &ansFile, const std::string &sqlFilePrefix,
+      bool usingDefaultSchema, FilePath &fp, std::string &ansFileAbsPath,
+      std::string &newSqlFile, std::string &outputAbsPath);
+
   void exec(const std::string &sql);
 
+  void execIgnore(const std::string &sql);
+
+ public:
+  static FilePath splitFilePath(const std::string &filePath);
+  bool checkAnsFile(const std::string &ansFileAbsPath,
+                    const std::string &outFileAbsPath,
+                    const std::string &initFile, const std::string &newSqlFile,
+                    FilePath &fp);
+
+  std::string getConnectionString() { return conn->getConnectionString(); }
+
+ protected:
+  std::string testRootPath;
+  const ::testing::TestInfo *const test_info;
+
  private:
-  std::string schemaName;
-  std::string databaseName;
   SQLUtilityMode sql_util_mode;
   std::unique_ptr<hawq::test::PSQL> conn;
-  std::string testRootPath;
-  const ::testing::TestInfo *const test_info;
-}; // class SQLUtility
+  std::string databaseName;
+  std::string schemaName;
+
+};  // class SQLUtility
 
-} // namespace test
-} // namespace hawq
+}  // namespace test
+}  // namespace hawq
 
 #endif  // SRC_TEST_FEATURE_LIB_SQL_UTIL_H_
diff --git a/src/test/feature/lib/sql_util_parallel.cpp 
b/src/test/feature/lib/sql_util_parallel.cpp
new file mode 100644
index 0000000..1a5665b
--- /dev/null
+++ b/src/test/feature/lib/sql_util_parallel.cpp
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+#include <fstream>
+#include <stdio.h>
+#include "sql_util_parallel.h"
+#include "string_util.h"
+
+
+
+using std::string;
+
+namespace hawq {
+namespace test {
+ThreadMessage::ThreadMessage(uint16_t tid)
+{
+    command = NULLCOMMAND;
+    messageinfo = "";
+    isrunning = false;
+    thread_id = tid;
+    waittids.clear();
+    waitfortids.clear();
+    submit = 1 ; //commit
+}
+
+ThreadMessage::~ThreadMessage()
+{
+    waittids.clear();
+    waitfortids.clear();
+}
+
+SQLUtilityParallel::SQLUtilityParallel() {
+    outputstream = NULL;
+    exclusive = false;
+    srand((unsigned)time(NULL));
+}
+
+SQLUtilityParallel::~SQLUtilityParallel() {
+}
+
+int32_t SQLUtilityParallel::__getTotalSession(std::fstream &in) {
+    string line;
+    int sessionnum = 0;
+    while (getline(in, line))
+    {
+        if (hawq::test::startsWith(line, "#"))
+            continue;
+        if (!hawq::test::startsWith(hawq::test::lower(line), "total"))
+        {
+            EXPECT_TRUE(false) << "The file format is error, "
+                    "it should begin with total:number of sessions ";
+            return 0;
+        }
+        auto totalstring = hawq::test::split(line, ':');
+        if (totalstring.size() != 2 )
+            return false;
+        try
+        {
+            sessionnum = std::stoi(hawq::test::trim(totalstring[1]));
+        }
+        catch(...)
+        {
+            return 0;
+        }
+        break;
+    }
+    return sessionnum;
+}
+
+
+bool SQLUtilityParallel::__processLine(const string &line, const int32_t 
totalsession,
+                                               int32_t &sessionid, int32_t 
&waittid, string &message)
+{
+    auto querystring = hawq::test::split(line, ':');
+    if (querystring.size() < 2 )
+        return false;
+    sessionid = std::stoi(hawq::test::trim(querystring[0])) ;
+    if (sessionid < 0 || sessionid > totalsession)
+        return false;
+    message = hawq::test::trim(querystring[querystring.size() - 1]);
+    thread_mutex.lock();
+    if(querystring.size() >= 3) {
+        waittid = 1;
+        if(!this->exclusive){
+            for (int i = 1; i < querystring.size() - 1; i++) {
+                int wait_tid = std::stoi(hawq::test::trim(querystring[i]));
+                threadmessage[wait_tid]->waittids.push_back(sessionid);
+                threadmessage[sessionid]->waitfortids.push_back(wait_tid);
+            }
+        }
+    }
+    thread_mutex.unlock();
+    return true;
+}
+
+
+bool SQLUtilityParallel::runParallelControlFile_exclusive(const string 
&sqlFile, const string &outputFile)
+{
+    string line;
+    bool result = true;
+    int32_t totalsession = 0 , i = 0 , commitnum = 0 ,blocknum = 0,commit=0;
+    std::vector<std::unique_ptr<std::thread>> querythread;
+
+    // Step 1: Open input and output file
+    std::fstream in;
+    in.open(sqlFile, std::ios::in);
+    outputstream = fopen(outputFile.c_str(), "w");
+    if (!in.is_open() || !outputstream) {
+        EXPECT_TRUE(false) << "Error opening input/output file ";
+        result = false;
+        goto clear;
+    }
+    // Step 2: clear env and get total session number
+    threadmessage.clear();
+    totalsession = __getTotalSession(in);
+    if (totalsession <= 0 )
+    {
+        result = false;
+        goto clear;
+    }
+    try
+    {
+        // Step 3: Initialize thread message, query thread, and connection pool
+        std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(0));
+        thrm->command = NEXTLINE;
+        threadmessage.push_back(std::move(thrm));
+        string schemamessage = "SET SEARCH_PATH = " + this->getSchemaName() + 
";" ;
+        string connstr = this->getConnectionString();
+        //begin
+        for(i = 1 ; i <= totalsession; i++ ){
+            std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(i));
+            threadmessage.push_back(std::move(thrm));
+        }
+        for(i = 1 ; i <= totalsession; i++ ) {
+            std::unique_ptr<std::thread> thr(new std::thread
+                                                     
(&SQLUtilityParallel::thread_execute_exclusive, this,  i, schemamessage, 
connstr , totalsession));
+            querythread.push_back(std::move(thr));
+        }
+        // Step 4: Read every line of file and pass to sub thread
+        while (getline(in, line))
+        {
+            if (hawq::test::startsWith(line, "#"))
+                continue;
+            if (hawq::test::startsWith(line, "$"))
+            {
+                       commit=1;
+                       continue;
+            }
+            int32_t sessionid = 0 , waittid = 0;
+            string message = "";
+            result = __processLine(line, totalsession, sessionid, waittid, 
message);
+            if (!result)
+                goto clear;
+            if (sessionid == 0)
+            {
+                this->execute(message, true);
+            }
+            else
+            {
+                if ( threadmessage[sessionid]->command != NULLCOMMAND )
+                {
+                    EXPECT_TRUE(false) << "The file format is error or query 
handling error\n";
+                    result = false;
+                    goto clear;
+                }
+                MessageCommandType cmd = (waittid == 0) ? SQLSTRING : 
BLOCKEDSQL;
+                if(cmd == SQLSTRING){
+                    blocknum++;
+                }
+                //lock
+                thread_mutex.lock();
+                threadmessage[sessionid]->command = cmd;
+                threadmessage[sessionid]->messageinfo = message;
+                threadmessage[0]->command = NULLCOMMAND;
+                if(threadmessage[sessionid]->messageinfo.find("vacuum") != 
string::npos)
+                {
+                    threadmessage[sessionid]->submit = -2; //vacuum
+                }
+                else if(threadmessage[sessionid]->messageinfo.find("drop 
table") != string::npos){
+                    threadmessage[sessionid]->submit = -1; //rollback
+                }
+                else{
+                    int32_t n = random() % 2;
+                    if(n == 0)
+                        threadmessage[sessionid]->submit = 1; //commit
+                    else
+                        threadmessage[sessionid]->submit = -1; //rollback
+                }
+                thread_mutex.unlock();
+            }
+
+            while (threadmessage[0]->command == NULLCOMMAND  ) {}
+            if (threadmessage[0]->command == BADSTATUS)
+                break;
+        }
+
+        // Step 5: Set commit/rollback command
+        int32_t j = 0;
+        while(commitnum < totalsession || blocknum < totalsession){
+            j = (j+1) % totalsession ;
+            if( j == 0 )
+            {
+                j = totalsession;
+            }
+            if(threadmessage[j]->command == BLOCKOVER){
+                thread_mutex.lock();
+                threadmessage[j]->command = PRINTRESULT;
+                thread_mutex.unlock();
+                while(threadmessage[j]->command == PRINTRESULT){}
+                blocknum++;
+            }
+            if(threadmessage[j]->submit == -2 && threadmessage[j]->command == 
NULLCOMMAND)
+            {
+                commitnum ++;
+                thread_mutex.lock();
+                threadmessage[j]->submit = 0;
+                thread_mutex.unlock();
+            }
+            else if(threadmessage[j]->submit != 0 && threadmessage[j]->command 
== NULLCOMMAND){
+                commitnum ++;
+                thread_mutex.lock();
+                threadmessage[j]->command = SQLSTRING;
+                if(commit&&j%2==1)
+                    threadmessage[j]->messageinfo = "commit;";
+                else
+                    threadmessage[j]->messageinfo = "rollback;";
+                threadmessage[0]->command = NULLCOMMAND;
+                thread_mutex.unlock();
+                while (threadmessage[0]->command == NULLCOMMAND  ) {}
+                thread_mutex.lock();
+                threadmessage[j]->submit = 0;
+                thread_mutex.unlock();
+            }
+        }
+
+        // Step 6: Setup exit signal to every sub-thread
+        thread_mutex.lock();
+        for(i = 1 ; i <= totalsession; i++ )
+        {
+            threadmessage[i]->command = EXITTHREAD;
+            threadmessage[i]->messageinfo = "";
+        }
+        thread_mutex.unlock();
+        for(i = 0 ; i < totalsession; i++ ){
+            querythread[i]->join();
+        }
+        // Step 7: Check status of every thread
+        for(i = 0 ; i <= totalsession; i++ )
+        {
+            if (threadmessage[i]->isrunning ||
+                threadmessage[i]->messageinfo.size() > 0 ||
+                threadmessage[i]->command == BADSTATUS)
+            {
+                EXPECT_TRUE(false) << "There is not finished or error job";
+                result = false;
+                goto clear;
+            }
+        }
+    }
+
+    catch(...)
+    {
+        EXPECT_TRUE(false) << "There is exception \n";
+        result = false;
+        goto clear;
+    }
+    clear:
+        threadmessage.clear();
+    in.close();
+    fclose(outputstream);
+    outputstream = NULL;
+    return result;
+}
+
+void SQLUtilityParallel::thread_execute_exclusive(uint16_t tid, const 
std::string &schemapath, const std::string &connstr , int32_t totalsession)
+{
+    std::unique_ptr<LibPQSQL> libpqconn;
+    libpqconn.reset(new LibPQSQL(connstr));
+    if (!libpqconn->connectDb())
+        goto errorclear;
+    if (!libpqconn->execQuery(schemapath,outputstream))
+        goto errorclear;
+    do
+    {
+        thread_mutex.lock();
+        MessageCommandType cmd = threadmessage[tid]->command;
+        string message = threadmessage[tid]->messageinfo;
+        thread_mutex.unlock();
+
+        switch (cmd)
+        {
+            case EXITTHREAD:
+            {
+                libpqconn->disconnectDb();
+                libpqconn.release();
+                return;
+            }
+            case BLOCKEDSQL:
+            {
+                thread_mutex.lock();
+                std::cout<<"Tid: "<<tid<<" Blocked 
"<<threadmessage[tid]->messageinfo<<std::endl;
+                threadmessage[tid]->isrunning = true;
+                fprintf(outputstream, "%d:blocked:%s\n", tid, message.c_str());
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+
+                //wait for result from database
+                bool result = libpqconn->execQuery(message,outputstream);
+                if (!result) {
+                    goto errorclear;
+                }
+
+                thread_mutex.lock();
+                threadmessage[tid]->isrunning = false;
+                threadmessage[tid]->command = BLOCKOVER;
+                std::cout<<"Tid: "<<tid<<" Block over "<<std::endl;
+                thread_mutex.unlock();
+                break;
+            }
+            case BLOCKOVER :
+                break;
+            case PRINTRESULT:
+            {
+                thread_mutex.lock();
+                std::cout<<"Tid: "<<tid<<" Print "<<std::endl;
+                fprintf(outputstream, "%d:back:%s\n", tid, message.c_str());
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                thread_mutex.unlock();
+                break;
+            }
+            case SQLSTRING:
+            {
+                thread_mutex.lock();
+                std::cout<<"Tid: "<<tid<<" SQLstring 
"<<threadmessage[tid]->messageinfo<<std::endl;
+                if (threadmessage[tid]->isrunning)
+                    goto errorclearlock;
+                fprintf(outputstream, "%d:%s\n", tid, message.c_str());
+                //submit sql
+                if (message.compare("commit;") == 0 ||
+                    message.compare("rollback;") == 0 ||
+                    threadmessage[tid]->messageinfo.find("vacuum") != 
string::npos ||
+                    threadmessage[tid]->messageinfo.find("VACUUM") != 
string::npos)
+                {
+                    for (uint16_t i = 0; i < 
threadmessage[tid]->waittids.size(); i++)
+                    {
+                        if 
(!threadmessage[threadmessage[tid]->waittids[i]]->isrunning)
+                        {
+                            goto errorclearlock;
+                        }
+                    }
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                }
+                else
+                {
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                }
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+                break;
+            }
+            case NULLCOMMAND:
+                break;
+            case NEXTLINE:
+            case BADSTATUS:
+            {
+                goto errorclear;
+            }
+        }
+    } while (true);
+
+    errorclearlock:
+    thread_mutex.unlock();
+    errorclear:
+    thread_mutex.lock();
+    threadmessage[0]->command = BADSTATUS;
+    threadmessage[tid]->command = BADSTATUS;
+    thread_mutex.unlock();
+    libpqconn->disconnectDb();
+    libpqconn.release();
+}
+
+bool SQLUtilityParallel::runParallelControlFile(const string &sqlFile, const 
string &outputFile)
+{
+    string line;
+    bool result = true;
+    int32_t totalsession = 0, i = 0;
+    std::vector<std::unique_ptr<std::thread>> querythread;
+    // Step 1: Open input and output file
+    std::fstream in;
+    in.open(sqlFile, std::ios::in);
+    outputstream = fopen(outputFile.c_str(), "w");
+    if (!in.is_open() || !outputstream) {
+        EXPECT_TRUE(false) << "Error opening input/output file ";
+        result = false;
+        goto clear;
+    }
+    // Step 2: clear env and get total session number
+    threadmessage.clear();
+    totalsession = __getTotalSession(in);
+    if (totalsession <= 0 )
+    {
+        result = false;
+        goto clear;
+    }
+    try
+    {
+        // Step 3: Initialize thread message, query thread, and connection pool
+        std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(0));
+        thrm->command = NEXTLINE;
+        threadmessage.push_back(std::move(thrm));
+        string schemamessage = "SET SEARCH_PATH = " + this->getSchemaName() + 
";" ;
+        string connstr = this->getConnectionString();
+        //begin
+        for(i = 1 ; i <= totalsession; i++ ){
+            std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(i));
+            threadmessage.push_back(std::move(thrm));
+        }
+        for(i = 1 ; i <= totalsession; i++ ) {
+            std::unique_ptr<std::thread> thr(new std::thread
+                                                     
(&SQLUtilityParallel::thread_execute, this,  i, schemamessage, connstr));
+            querythread.push_back(std::move(thr));
+        }
+
+        // Step 4: Read every line of file and pass to sub thread
+        while (getline(in, line))
+        {
+            if (hawq::test::startsWith(line, "#"))
+                continue;
+            int32_t sessionid = 0 , waittid = 0;
+            string message = "";
+
+            result = __processLine(line, totalsession, sessionid, waittid, 
message);
+            if (!result)
+                goto clear;
+
+            if (sessionid == 0)
+            {
+                this->execute(message, true);
+            }
+            else
+            {
+                if ( threadmessage[sessionid]->command != NULLCOMMAND )
+                {
+                    EXPECT_TRUE(false) << "The file format is error or query 
handling error\n";
+                    result = false;
+                    goto clear;
+                }
+                MessageCommandType cmd = (waittid == 0) ? SQLSTRING : 
BLOCKEDSQL;
+                //lock
+                thread_mutex.lock();
+                threadmessage[sessionid]->command = cmd;
+                threadmessage[sessionid]->messageinfo = message;
+                threadmessage[0]->command = NULLCOMMAND;
+                thread_mutex.unlock();
+            }
+            while (threadmessage[0]->command == NULLCOMMAND  ) {}
+            if (threadmessage[0]->command == BADSTATUS)
+                break;
+        }
+
+        // Step 5: Setup exit signal to every sub-thread
+        thread_mutex.lock();
+        for(i = 1 ; i <= totalsession; i++ )
+        {
+            threadmessage[i]->command = EXITTHREAD;
+            threadmessage[i]->messageinfo = "";
+        }
+        thread_mutex.unlock();
+        for(i = 0 ; i < totalsession; i++ ){
+            querythread[i]->join();
+        }
+        // Step 6: Check status of every thread
+        for(i = 0 ; i <= totalsession; i++ )
+        {
+            if (threadmessage[i]->isrunning ||
+                threadmessage[i]->messageinfo.size() > 0 ||
+                threadmessage[i]->command == BADSTATUS)
+            {
+                EXPECT_TRUE(false) << "There is not finished or error job";
+                result = false;
+                goto clear;
+            }
+        }
+    }
+
+    catch(...)
+    {
+        EXPECT_TRUE(false) << "There is exception \n";
+        result = false;
+        goto clear;
+    }
+    clear:
+    threadmessage.clear();
+    in.close();
+    fclose(outputstream);
+    outputstream = NULL;
+    return result;
+}
+
+void SQLUtilityParallel::thread_execute(uint16_t tid, const std::string 
&schemapath, const std::string &connstr)
+{
+    std::unique_ptr<LibPQSQL> libpqconn;
+    libpqconn.reset(new LibPQSQL(connstr));
+    if (!libpqconn->connectDb())
+        goto errorclear;
+    if (!libpqconn->execQuery(schemapath,outputstream))
+        goto errorclear;
+    do
+    {
+        thread_mutex.lock();
+        MessageCommandType cmd = threadmessage[tid]->command;
+        string message = threadmessage[tid]->messageinfo;
+        thread_mutex.unlock();
+
+        switch (cmd)
+        {
+            case EXITTHREAD:
+            {
+                libpqconn->disconnectDb();
+                libpqconn.release();
+                return;
+            }
+            case BLOCKEDSQL:
+            {
+                thread_mutex.lock();
+                threadmessage[tid]->isrunning = true;
+                std::cout<<"BLOCK Tid: "<<tid<<" 
"<<threadmessage[tid]->messageinfo<<std::endl;
+                fprintf(outputstream, "%d:blocked:%s\n", tid, message.c_str());
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+                bool result = libpqconn->execQuery(message,outputstream);
+                if (!result) {
+                    std::cout<<"Error"<<std::endl;
+                    goto errorclear;
+                }
+
+                thread_mutex.lock();
+                threadmessage[tid]->isrunning = false;
+                threadmessage[tid]->command = BLOCKOVER;
+                std::cout<<"BLOCKOVER Tid: "<<tid<<" 
"<<threadmessage[tid]->messageinfo<<std::endl;
+                thread_mutex.unlock();
+                break;
+            }
+            case BLOCKOVER :
+                break;
+            case PRINTRESULT:
+            {
+                thread_mutex.lock();
+                std::cout<<"PRINT Tid: "<<tid<<" 
"<<threadmessage[tid]->messageinfo<<std::endl;
+                fprintf(outputstream, "%d:back:%s\n", tid, message.c_str());
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                thread_mutex.unlock();
+                break;
+            }
+            case SQLSTRING:
+            {
+                thread_mutex.lock();
+                std::cout<<"SQL Tid: "<<tid<<" 
"<<threadmessage[tid]->messageinfo<<std::endl;
+                if (threadmessage[tid]->isrunning)
+                    goto errorclearlock;
+                fprintf(outputstream, "%d:%s\n", tid, message.c_str());
+
+                if (message.compare("commit;") == 0 ||
+                    message.compare("rollback;") == 0 || 
threadmessage[tid]->messageinfo.find("vacuum") != string::npos) {
+                    for (uint16_t i = 0; i < 
threadmessage[tid]->waittids.size(); i++) {
+                        if 
(!threadmessage[threadmessage[tid]->waittids[i]]->isrunning) {
+                            goto errorclearlock;
+                        }
+                    }
+
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                    thread_mutex.unlock();
+
+                    while (threadmessage[tid]->waittids.size() > 0) {
+                        thread_mutex.lock();
+                        int waitpid = threadmessage[tid]->waittids[0];
+
+                        for (std::vector<int>::iterator iter = 
threadmessage[waitpid]->waitfortids.begin();
+                             iter != 
threadmessage[waitpid]->waitfortids.end(); iter++) {
+                            if (*iter == tid) {
+                                
threadmessage[waitpid]->waitfortids.erase(iter);
+                                break;
+                            }
+                        }
+
+                        thread_mutex.unlock();
+
+                        if (threadmessage[waitpid]->waitfortids.size() == 0) {
+                            thread_mutex.unlock();
+                            while (threadmessage[waitpid]->command != 
BLOCKOVER) {}
+                            thread_mutex.lock();
+                            threadmessage[waitpid]->command = PRINTRESULT;
+                            thread_mutex.unlock();
+                        }
+
+                        while (threadmessage[waitpid]->command == PRINTRESULT) 
{}
+                        thread_mutex.lock();
+                        
threadmessage[tid]->waittids.erase(threadmessage[tid]->waittids.begin());
+                        thread_mutex.unlock();
+                    }
+
+                    thread_mutex.lock();
+                }
+                else
+                {
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                }
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+                break;
+            }
+            case NULLCOMMAND:
+                break;
+            case NEXTLINE:
+            case BADSTATUS:
+            {
+                goto errorclear;
+            }
+        }
+    } while (true);
+
+    errorclearlock:
+    thread_mutex.unlock();
+    errorclear:
+    thread_mutex.lock();
+    threadmessage[0]->command = BADSTATUS;
+    threadmessage[tid]->command = BADSTATUS;
+    thread_mutex.unlock();
+    libpqconn->disconnectDb();
+    libpqconn.release();
+}
+
+
+void SQLUtilityParallel::execParallelControlSQLFile(const string &sqlFile,
+                                                            const string 
&ansFile,
+                                                            bool exclusive,
+                                                            const string 
&initFile)
+{
+    FilePath fp;
+    const string ansFileAbsPath = testRootPath + "/" + ansFile;
+    const string sqlFileAbsPath = testRootPath + "/" + sqlFile;
+    if (!std::ifstream(ansFileAbsPath))
+        ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+    if (!std::ifstream(sqlFileAbsPath))
+        ASSERT_TRUE(false) << sqlFileAbsPath << " doesn't exist";
+    //get file from source
+
+    fp = splitFilePath(ansFileAbsPath);
+    // double check to avoid empty fileBaseName
+    if (fp.fileBaseName.empty())
+        ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
+
+    // outFile is located in the same folder with ansFile
+    const string outFileAbsPath = fp.path + "/" + fp.fileBaseName + ".out";
+
+    bool result;
+    this->exclusive = exclusive ;
+    if( this->exclusive )
+        result = this->runParallelControlFile_exclusive(sqlFileAbsPath, 
outFileAbsPath);
+    else
+        result = this->runParallelControlFile(sqlFileAbsPath, outFileAbsPath);
+
+    if (!result)
+        ASSERT_TRUE(false) << " Execute query is failed";
+
+    //Check result
+    if( !this->exclusive )
+        checkAnsFile(ansFileAbsPath, outFileAbsPath, initFile, "", fp);
+}
+    } // namespace test
+} // namespace hawq
diff --git a/src/test/feature/lib/sql_util_parallel.h 
b/src/test/feature/lib/sql_util_parallel.h
new file mode 100644
index 0000000..46b9e9c
--- /dev/null
+++ b/src/test/feature/lib/sql_util_parallel.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ *
+ * This is control different sessions sequence issue
+ * The file format should be like
+     # This is basic transaction test demo    //# is used for comment
+     Total:2                                  // indicate how sessions in total
+     0:DROP TABLE IF EXISTS TEST;             // 0 indicate it is for prepare
+     0:CREATE TABLE TEST(A INT);
+     1:BEGIN;
+     2:BEGIN;
+     1:SELECT * FROM  TEST;
+     2:INSERT INTO TEST VALUES(1);
+     2:COMMIT;
+     1:SELECT * FROM  TEST;
+     1:COMMIT;
+
+* There is another example for blocked test
+     # for blocked session. in below example,
+     # session 1 will lock whole table, session 2 will be blocked until 
session 1 commit
+     Total:2                                  // indicate how sessions in total
+     0:DROP TABLE IF EXISTS TEST;             // 0 indicate it is for prepare
+     0:CREATE TABLE TEST(A INT);
+     1:BEGIN;
+     2:BEGIN;
+     1:TRUNCATE TEST;
+     2:1:SELECT * FROM TEST;               //Here it indicates session 2 is 
blocked by session 1
+     1:COMMIT;                             //Here session 2 will not be 
blocked and output session 2 result.
+     2:COMMIT;
+ *
+ */
+#ifndef HAWQ_SRC_TEST_FEATURE_LIB_SQL_UTIL_PARALL_H_
+#define HAWQ_SRC_TEST_FEATURE_LIB_SQL_UTIL_PARALL_H_
+
+#include "sql_util.h"
+#include <string>
+#include <mutex>
+
+namespace hawq {
+namespace test {
+ 
+enum MessageCommandType {
+    NULLCOMMAND,
+    NEXTLINE,
+    BLOCKEDSQL,
+    SQLSTRING,
+    EXITTHREAD,
+    PRINTRESULT,
+    BADSTATUS,
+    BLOCKOVER
+};
+
+class ThreadMessage{
+public:
+    ThreadMessage(uint16_t tid = 0 );
+    ~ThreadMessage();
+    int thread_id;
+    int vacuum ;
+    std::string messageinfo;
+    MessageCommandType command;
+    bool isrunning;
+    int32_t submit; // 1: commit ; 0: submit ; -1: rollback; -2 vacuum;
+    std::vector<int> waittids;   //threadids which are blocked by this thread
+    std::vector<int> waitfortids; //threadids which this thread waiting for
+};
+
+class SQLUtilityParallel : public SQLUtility {
+public:
+    SQLUtilityParallel();
+    ~SQLUtilityParallel();
+    bool exclusive; // whether test access_exclusive lock;
+
+  // Execute sql file and diff with ans file.
+  // @param sqlFile The given sqlFile which is relative path to test root dir. 
This file control how the SQL is execute
+  // @param ansFile The given ansFile which is relative path to test root dir
+  // @param exclusive whether we can predict the order of transactions
+  // @return void
+  void execParallelControlSQLFile(const std::string &sqlFile, const 
std::string &ansFile,bool exclusive,
+                                         const std::string &initFile = "");
+  //int32_t getWaitNumber(const std::string &schemapath, const std::string 
&connstr);
+
+  // Run sql file and generate output file
+    bool runParallelControlFile(const std::string &sqlFile, const std::string 
&outputFile);
+    bool runParallelControlFile_exclusive(const std::string &sqlFile, const 
std::string &outputFile);
+private:
+
+  // Handling SQL comand for every thread
+  // @param tid the thread id
+  // @param schemapath it is used that every case should have its own schema
+  // @param connstr the connection string
+  void thread_execute(uint16_t tid, const std::string &schemapath, const 
std::string &connstr);
+  // @param totalsession the number of totalsession
+  void thread_execute_exclusive(uint16_t tid, const std::string &schemapath, 
const std::string &connstr, const int32_t totalsession);
+
+  // Get total session number from input sql file
+  int32_t __getTotalSession(std::fstream &in);
+
+  //Process line to get session id, wait tid and running sql
+  // @param line  input line
+  // @param totalsession the total session for this sql file
+  // @param(output) sessionid current execute thread id
+  // @param(output) waittid the thread which blocks current thread. 0 if it is 
not blocked.
+  // @param(output) message the sql will be executed
+  // @return whether it is executed successfully or not
+  bool __processLine(const std::string &line, const int32_t totalsession,
+                     int32_t &sessionid, int32_t &waittid, std::string 
&message);
+
+private:
+  std::vector<std::unique_ptr<ThreadMessage>> threadmessage;
+  std::mutex thread_mutex;
+  FILE* outputstream;
+}; // class SQLUtilityParallel
+
+} // namespace test
+} // namespace hawq
+
+#endif  // SRC_TEST_FEATURE_LIB_SQL_UTIL_PARALL_H_
diff --git a/src/test/feature/lib/sqlfile-parsebase.cpp 
b/src/test/feature/lib/sqlfile-parsebase.cpp
new file mode 100644
index 0000000..90c70fe
--- /dev/null
+++ b/src/test/feature/lib/sqlfile-parsebase.cpp
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include "sqlfile-parsebase.h"
+
+std::string SqlFileParseBase::trim(std::string str) {
+  size_t beginNotSpace = 0;
+  for (; beginNotSpace != str.size() && isspace(str[beginNotSpace]);
+       ++beginNotSpace)
+    ;
+  str.erase(0, beginNotSpace);
+  if (str.empty()) return str;
+  size_t endNotSpace = str.size() - 1;
+  for (; isspace(str[endNotSpace]); --endNotSpace)
+    ;
+  str.erase(endNotSpace + 1);
+  return str;
+}
+
+size_t SqlFileParseBase::__find_is_not_space(const std::string &line) {
+  std::string::const_iterator pos = std::find_if_not(
+      line.cbegin(), line.cend(), [](const char c) { return isspace(c); });
+  return (pos - line.cbegin());
+}
+
+bool SqlFileParseBase::__sub_equal(const std::string &target, size_t pos,
+                                   const std::string &tag) {
+  return __sub_equal(target.substr(pos), tag);
+}
+
+bool SqlFileParseBase::__sub_equal(const std::string &target,
+                                   std::string::const_iterator pos,
+                                   const std::string &tag) {
+  return __sub_equal(target.substr(pos - target.cbegin()), tag);
+}
+
+bool SqlFileParseBase::__sub_equal(const std::string &target,
+                                   const std::string &tag) {
+  return target.size() >= tag.size() && target.substr(0, tag.size()) == tag;
+}
+
+bool SqlFileParseBase::__open_error(const std::string &filename) {
+  return __error(filename + " open error");
+}
+
+bool SqlFileParseBase::__not_found_tag_error(const size_t index,
+                                             const std::string &tag) {
+  return SqlFileParseBase::__error("line " + std::to_string(index) +
+                                   " not found " + tag);
+}
+
+bool SqlFileParseBase::__trans_file_to_vector(const std::string &infile,
+                                              std::vector<std::string> &v_inf) 
{
+  std::ifstream inf(infile);
+  if (!inf) return __open_error(infile);
+  std::string lineBuf;
+  std::vector<std::string> v_temp_inf;
+  for (; getline(inf, lineBuf);) v_temp_inf.push_back(std::move(lineBuf));
+  v_inf.swap(v_temp_inf);
+  inf.close();
+  return true;
+}
+
+bool SqlFileParseBase::__is_tag(const std::string &line,
+                                const std::string &tag) {
+  return __sub_equal(line, __find_is_not_space(line), tag);
+}
+
+bool SqlFileParseBase::__error(const std::string &error_message) {
+  std::cerr << parseName_ << ": error: " << error_message << std::endl;
+  return false;
+}
\ No newline at end of file
diff --git a/src/test/feature/lib/sqlfile-parsebase.h 
b/src/test/feature/lib/sqlfile-parsebase.h
new file mode 100644
index 0000000..4434422
--- /dev/null
+++ b/src/test/feature/lib/sqlfile-parsebase.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HAWQ_SRC_TEST_FEATURE_SQLFILE_PARSEBASE_OUT_H_
+#define HAWQ_SRC_TEST_FEATURE_SQLFILE_PARSEBASE_OUT_H_
+
+#include <fstream>
+#include <iostream>
+#include <string>
+#include <vector>
+
+class SqlFileParseBase {
+ public:
+  SqlFileParseBase(const std::string &parseName) : parseName_(parseName) {}
+
+  static std::string trim(std::string str);
+
+ protected:
+  size_t __find_is_not_space(const std::string &line);
+
+  bool __sub_equal(const std::string &target, size_t pos,
+                   const std::string &tag);
+
+  bool __sub_equal(const std::string &target, std::string::const_iterator pos,
+                   const std::string &tag);
+
+  bool __sub_equal(const std::string &target, const std::string &tag);
+
+  bool __error(const std::string &error_message);
+
+  bool __open_error(const std::string &filename);
+
+  bool __not_found_tag_error(const size_t index, const std::string &tag);
+
+  bool __trans_file_to_vector(const std::string &infile,
+                              std::vector<std::string> &v_inf);
+
+  bool __is_tag(const std::string &line, const std::string &tag);
+
+  bool __is_comment(const std::string &line) { return __is_tag(line, "--"); }
+
+ private:
+  const std::string parseName_;
+};
+
+#endif
\ No newline at end of file

Reply via email to