PHOENIX-2419 Fix flapping Pig tests

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e9d44986
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e9d44986
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e9d44986

Branch: refs/heads/txn
Commit: e9d449865449ce0ad5406b3680d264db77d991ef
Parents: 7afa91a
Author: James Taylor <[email protected]>
Authored: Sun Nov 15 18:25:15 2015 -0800
Committer: James Taylor <[email protected]>
Committed: Mon Nov 16 09:02:16 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/phoenix/pig/BasePigIT.java  |  83 ++++
 .../phoenix/pig/PhoenixHBaseLoaderIT.java       | 423 ++++++++-----------
 .../phoenix/pig/PhoenixHBaseStorerIT.java       |  43 +-
 .../phoenix/pig/udf/ReserveNSequenceTestIT.java |  62 +--
 4 files changed, 288 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
new file mode 100644
index 0000000..94ccc25
--- /dev/null
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class BasePigIT extends BaseHBaseManagedTimeIT {
+    protected TupleFactory tupleFactory;
+    protected String zkQuorum;
+    protected Connection conn;
+    protected Configuration conf;
+    protected PigServer pigServer;
+
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        // Must update config before starting server
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        conf = getTestClusterConfig();
+        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        pigServer = new PigServer(ExecType.LOCAL, conf);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(conf);
+        tupleFactory = TupleFactory.getInstance();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if(conn != null) {
+            conn.close();
+        }
+        if (pigServer != null) {
+            pigServer.shutdown();
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index 2f33b5f..606282a 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -19,47 +19,32 @@
  */
 package org.apache.phoenix.pig;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Preconditions;
-
 /**
  * 
  * Test class to run all the integration tests against a virtual map reduce 
cluster.
  */
-public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT {
-    
+public class PhoenixHBaseLoaderIT extends BasePigIT {
+
     private static final Log LOG = 
LogFactory.getLog(PhoenixHBaseLoaderIT.class);
     private static final String SCHEMA_NAME = "T";
     private static final String TABLE_NAME = "A";
@@ -67,17 +52,6 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
     private static final String TABLE_FULL_NAME = 
SchemaUtil.getTableName(SCHEMA_NAME, TABLE_NAME);
     private static final String CASE_SENSITIVE_TABLE_NAME = 
SchemaUtil.getEscapedArgument("a");
     private static final String CASE_SENSITIVE_TABLE_FULL_NAME = 
SchemaUtil.getTableName(SCHEMA_NAME,CASE_SENSITIVE_TABLE_NAME);
-    private String zkQuorum;
-    private Connection conn;
-    private PigServer pigServer;
-
-    @Before
-    public void setUp() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        conn = DriverManager.getConnection(getUrl(), props);
-        zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + 
getZKClientPort(getTestClusterConfig());
-        pigServer = new PigServer(ExecType.LOCAL, getTestClusterConfig());
-    }
 
     /**
      * Validates the schema returned for a table with Pig data types.
@@ -94,7 +68,7 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         final Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
         assertEquals(4, fields.size());
@@ -114,18 +88,18 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
      */
     @Test
     public void testSchemaForTableWithSpecificColumns() throws Exception {
-        
+
         //create the table
         final String TABLE = "TABLE2";
         final String ddl = "CREATE TABLE " + TABLE
                 + "  (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE 
INTEGER) ";
         conn.createStatement().execute(ddl);
-        
+
         final String selectColumns = "ID,NAME";
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');",
                 TABLE, selectColumns, zkQuorum));
-        
+
         Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
         assertEquals(2, fields.size());
@@ -134,29 +108,29 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME"));
         assertTrue(fields.get(1).type == DataType.CHARARRAY);
     }
-    
+
     /**
      * Validates the schema returned when a SQL SELECT query is given as part 
of LOAD .
      * @throws Exception
      */
     @Test
     public void testSchemaForQuery() throws Exception {
-        
-       //create the table.
+
+        //create the table.
         final String TABLE = "TABLE3";
         String ddl = String.format("CREATE TABLE " + TABLE +
-                 "  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, 
CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+                "  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, 
CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
                 + "  CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", 
TABLE);
         conn.createStatement().execute(ddl);
 
-        
-        
+
+
         //sql query for LOAD
         final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE 
FROM " + TABLE;
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');",
                 sqlQuery, zkQuorum));
-        
+
         //assert the schema.
         Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
@@ -168,14 +142,14 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double"));
         assertTrue(fields.get(2).type == DataType.DOUBLE);
     }
-    
+
     /**
      * Validates the schema when it is given as part of LOAD..AS
      * @throws Exception
      */
     @Test
     public void testSchemaForTableWithAlias() throws Exception {
-        
+
         //create the table.
         final String TABLE = "S.TABLE4";
         String ddl = "CREATE TABLE  " + TABLE
@@ -185,13 +159,13 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
 
         //select query given as part of LOAD.
         final String sqlQuery = "SELECT 
A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE;
-        
+
         LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
-        
+
         pigServer.registerQuery(String.format(
                 "raw = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s') AS 
(a:chararray,b:bigdecimal,c:int,d:double);",
                 sqlQuery, zkQuorum));
-        
+
         //test the schema.
         Schema schema = pigServer.dumpSchema("raw");
         List<FieldSchema> fields = schema.getFields();
@@ -205,19 +179,19 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         assertTrue(fields.get(3).alias.equalsIgnoreCase("d"));
         assertTrue(fields.get(3).type == DataType.DOUBLE);
     }
-    
+
     /**
      * @throws Exception
      */
     @Test
     public void testDataForTable() throws Exception {
-        
-         //create the table
-         String ddl = "CREATE TABLE  " + CASE_SENSITIVE_TABLE_FULL_NAME 
+
+        //create the table
+        String ddl = "CREATE TABLE  " + CASE_SENSITIVE_TABLE_FULL_NAME 
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " 
VALUES(?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -229,13 +203,13 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
             stmt.execute();    
         }
         conn.commit();
-         
+
         //load data and filter rows whose age is > 25
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using "  + 
PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME,
                 zkQuorum));
         pigServer.registerQuery("B = FILTER A BY AGE > 25;");
-        
+
         final Iterator<Tuple> iterator = pigServer.openIterator("B");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -245,19 +219,19 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         }
         assertEquals(rows/2, recordsRead);
     }
-    
+
     /**
      * @throws Exception
      */
     @Test
     public void testDataForSQLQuery() throws Exception {
-        
-         //create the table
-         String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+
+        //create the table
+        String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -269,14 +243,14 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
             stmt.execute();    
         }
         conn.commit();
-        
+
         //sql query
         final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME 
+ " WHERE AGE > 25";
         //load data and filter rows whose age is > 25
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
-        
+
         final Iterator<Tuple> iterator = pigServer.openIterator("A");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -285,21 +259,21 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         }
         assertEquals(rows/2, recordsRead);
     }
-    
+
     /**
      * 
      * @throws Exception
      */
     @Test
     public void testForNonPKSQLQuery() throws Exception {
-        
-         //create the table
+
+        //create the table
         final String TABLE = "TABLE5";
         String ddl = "CREATE TABLE  " + TABLE
                 + " ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ 
UNSIGNED_INT)";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //upsert data.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?) ";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -313,16 +287,16 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         stmt.setInt(3,-2);
         stmt.setInt(4,2);
         stmt.execute();
-        
+
         conn.commit();
-        
+
         //sql query
         final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE 
BAR = -1 " , TABLE);
-      
+
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
-        
+
         final Iterator<Tuple> iterator = pigServer.openIterator("A");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -332,7 +306,7 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
             recordsRead++;
         }
         assertEquals(1, recordsRead);
-        
+
         //test the schema. Test for PHOENIX-1123
         Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
@@ -342,20 +316,20 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         assertTrue(fields.get(1).alias.equalsIgnoreCase("BAZ"));
         assertTrue(fields.get(1).type == DataType.INTEGER);
     }
-    
+
     /**
      * @throws Exception
      */
     @Test
     public void testGroupingOfDataForTable() throws Exception {
-        
-         //create the table
+
+        //create the table
         final String TABLE = "TABLE6";
         String ddl = "CREATE TABLE  " + TABLE
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER, SAL INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -371,135 +345,123 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
                 stmt.setInt(3, 30);
                 stmt.setInt(4, 10 * 3 * k++);
             }
-            
+
             stmt.execute();    
         }
         conn.commit();
-        
+
         //prepare the mock storage with expected output
         final Data data = Storage.resetData(pigServer);
         List<Tuple> expectedList = new ArrayList<Tuple>();
         expectedList.add(Storage.tuple(0,180));
         expectedList.add(Storage.tuple(0,270));
-        
-         //load data and filter rows whose age is > 25
+
+        //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         pigServer.registerQuery("B = GROUP A BY AGE;");
         pigServer.registerQuery("C = FOREACH B GENERATE 
MIN(A.SAL),MAX(A.SAL);");
         pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
         pigServer.executeBatch();
-        
+
         List<Tuple> actualList = data.get("out");
         assertEquals(expectedList, actualList);
     }
-        
+
     @Test
     public void testTimestampForSQLQuery() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP 
TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
-            conn.createStatement().execute(ddl);
-
-            final String dml = "UPSERT INTO TIMESTAMP_T 
VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))";
-            conn.createStatement().execute(dml);
-            conn.commit();
-
-            //sql query
-            final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM 
TIMESTAMP_T ";
-            pigServer.registerQuery(String.format(
+        //create the table
+        String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP 
TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
+        conn.createStatement().execute(ddl);
+
+        final String dml = "UPSERT INTO TIMESTAMP_T 
VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        //sql query
+        final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM 
TIMESTAMP_T ";
+        pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
 
-            final Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("foo", tuple.get(0));
-                assertEquals(2006, tuple.get(1));
-            }
-        } finally {
-            dropTable("TIMESTAMP_T");
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("foo", tuple.get(0));
+            assertEquals(2006, tuple.get(1));
         }
     }
-    
+
     @Test
     public void testDateForSQLQuery() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
-            conn.createStatement().execute(ddl);
-
-            final String dml = "UPSERT INTO DATE_T 
VALUES('foo',TO_DATE('2004-03-10 10:00:00'))";
-            conn.createStatement().execute(dml);
-            conn.commit();
-
-            //sql query
-            final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T 
";
-            pigServer.registerQuery(String.format(
+        //create the table
+        String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
+        conn.createStatement().execute(ddl);
+
+        final String dml = "UPSERT INTO DATE_T 
VALUES('foo',TO_DATE('2004-03-10 10:00:00'))";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        //sql query
+        final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T ";
+        pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
 
-            final Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("foo", tuple.get(0));
-                assertEquals(10, tuple.get(1));
-            }
-        } finally {
-            dropTable("DATE_T");
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("foo", tuple.get(0));
+            assertEquals(10, tuple.get(1));
         }
     }
 
     @Test
     public void testTimeForSQLQuery() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
-            conn.createStatement().execute(ddl);
-
-            final String dml = "UPSERT INTO TIME_T 
VALUES('foo',TO_TIME('2008-05-16 00:30:00'))";
-            conn.createStatement().execute(dml);
-            conn.commit();
-
-            //sql query
-            final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM 
TIME_T ";
-            pigServer.registerQuery(String.format(
+        //create the table
+        String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
+        conn.createStatement().execute(ddl);
+
+        final String dml = "UPSERT INTO TIME_T 
VALUES('foo',TO_TIME('2008-05-16 00:30:00'))";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        //sql query
+        final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T ";
+        pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
 
-            final Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("foo", tuple.get(0));
-                assertEquals(30, tuple.get(1));
-            }
-        } finally {
-            dropTable("TIME_T");
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("foo", tuple.get(0));
+            assertEquals(30, tuple.get(1));
         }
     }
-    
+
     /**
      * Tests both  {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage} 
      * @throws Exception
      */
     @Test
     public void testLoadAndStore() throws Exception {
-        
-         //create the tables
+
+        //create the tables
         final String TABLE = "TABLE7";
         final String sourceTableddl = "CREATE TABLE  " + TABLE
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER, SAL INTEGER) ";
-         
+
         final String targetTable = "AGGREGATE";
         final String targetTableddl = "CREATE TABLE " + targetTable
-                 +  "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , 
MAX_SAL INTEGER) ";
-                 
+                +  "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , 
MAX_SAL INTEGER) ";
+
         conn.createStatement().execute(sourceTableddl);
         conn.createStatement().execute(targetTableddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -515,25 +477,25 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
                 stmt.setInt(3, 30);
                 stmt.setInt(4, 10 * 3 * k++);
             }
-            
+
             stmt.execute();    
         }
         conn.commit();
-        
-            
-         //load data and filter rows whose age is > 25
+
+
+        //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         pigServer.registerQuery("B = GROUP A BY AGE;");
         pigServer.registerQuery("C = FOREACH B GENERATE group as 
AGE,MIN(A.SAL),MAX(A.SAL);");
         pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable 
                 + "' using " + PhoenixHBaseStorage.class.getName() + "('"
-                 + zkQuorum + "', '-batchSize 1000');");
+                + zkQuorum + "', '-batchSize 1000');");
         pigServer.executeBatch();
-        
+
         //validate the data with what is stored.
         final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + 
targetTable + " ORDER BY AGE";
         final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
@@ -546,25 +508,25 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         assertEquals(0, rs.getInt("MIN_SAL"));
         assertEquals(270, rs.getInt("MAX_SAL"));
     }
-       
-       /**
+
+    /**
      * Test for Sequence
      * @throws Exception
      */
     @Test
     public void testDataForSQLQueryWithSequences() throws Exception {
-        
-         //create the table
+
+        //create the table
         final String TABLE = "TABLE8";
         String ddl = "CREATE TABLE " + TABLE
                 + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         String sequenceDdl = "CREATE SEQUENCE my_sequence";
-                
+
         conn.createStatement().execute(sequenceDdl);
-           
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -576,14 +538,14 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
             stmt.execute();
         }
         conn.commit();
-        
+
         //sql query load data and filter rows whose age is > 25
         final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS 
my_seq,ID,NAME,AGE FROM " + TABLE + " WHERE AGE > 25";
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
-        
-        
+
+
         Iterator<Tuple> iterator = pigServer.openIterator("A");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -592,17 +554,17 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
         }
         assertEquals(rows/2, recordsRead);
     }
-       
-        @Test
+
+    @Test
     public void testDataForSQLQueryWithFunctions() throws Exception {
-        
-         //create the table
-         final String TABLE = "TABLE9";
-         String ddl = "CREATE TABLE " + TABLE
+
+        //create the table
+        final String TABLE = "TABLE9";
+        String ddl = "CREATE TABLE " + TABLE
                 + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
         int rows = 20;
@@ -612,15 +574,15 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
             stmt.execute();
         }
         conn.commit();
-        
+
         //sql query
         final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE + " 
ORDER BY ID" ;
 
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using "  + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
-        
-        
+
+
         Iterator<Tuple> iterator = pigServer.openIterator("A");
         int i = 0;
         while (iterator.hasNext()) {
@@ -629,53 +591,49 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
             assertEquals("A" + i, name);
             i++;
         }
-    
+
     }
-        @Test
-        public void testDataFromIndexTable() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE " + TABLE_NAME
-                    + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID 
INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
-                   
-            conn.createStatement().execute(ddl);
-           
-            //create a index table
-            String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + 
TABLE_NAME + " (EMPLID) INCLUDE (NAME) ";
-            conn.createStatement().execute(indexDdl);
-           
-            //upsert the data.
-            final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)";
-            PreparedStatement stmt = conn.prepareStatement(dml);
-            int rows = 20;
-            for(int i = 0 ; i < rows; i++) {
-                stmt.setInt(1, i);
-                stmt.setString(2, "a"+i);
-                stmt.setInt(3, i * 5);
-                stmt.execute();
-            }
-            conn.commit();
-            pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , 
EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() 
+ "('"+zkQuorum + "')  ;");
-            Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("a5", tuple.get(0));
-                assertEquals(25, tuple.get(1));
-            }
-        } finally {
-          dropTable(TABLE_NAME);
-          dropTable(INDEX_NAME);
+
+    @Test
+    public void testDataFromIndexTable() throws Exception {
+        //create the table
+        String ddl = "CREATE TABLE " + TABLE_NAME
+                + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID 
INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
+
+        conn.createStatement().execute(ddl);
+
+        //create a index table
+        String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME 
+ " (EMPLID) INCLUDE (NAME) ";
+        conn.createStatement().execute(indexDdl);
+
+        //upsert the data.
+        final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        int rows = 20;
+        for(int i = 0 ; i < rows; i++) {
+            stmt.setInt(1, i);
+            stmt.setString(2, "a"+i);
+            stmt.setInt(3, i * 5);
+            stmt.execute();
+        }
+        conn.commit();
+        pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID 
FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + 
"('"+zkQuorum + "')  ;");
+        Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("a5", tuple.get(0));
+            assertEquals(25, tuple.get(1));
         }
     }
-        
-       @Test 
-       public void testLoadOfSaltTable() throws Exception {
-           final String TABLE = "TABLE11";
+
+    @Test 
+    public void testLoadOfSaltTable() throws Exception {
+        final String TABLE = "TABLE11";
         final String sourceTableddl = "CREATE TABLE  " + TABLE
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER, SAL INTEGER) SALT_BUCKETS=2  ";
-         
+
         conn.createStatement().execute(sourceTableddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -691,40 +649,27 @@ public class PhoenixHBaseLoaderIT extends 
BaseHBaseManagedTimeIT {
                 stmt.setInt(3, 30);
                 stmt.setInt(4, 10 * 3 * k++);
             }
-            
+
             stmt.execute();    
         }
         conn.commit();
-            
+
         final Data data = Storage.resetData(pigServer);
         List<Tuple> expectedList = new ArrayList<Tuple>();
         expectedList.add(Storage.tuple(25,10));
         expectedList.add(Storage.tuple(30,10));
-        
+
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         pigServer.registerQuery("B = GROUP A BY AGE;");
         pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);");
         pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
         pigServer.executeBatch();
-        
+
         List<Tuple> actualList = data.get("out");
         assertEquals(expectedList.size(), actualList.size());
-       }
-    
-    @After
-    public void tearDown() throws Exception {
-        if(conn != null) {
-            conn.close();
-        }
-        pigServer.shutdown();
-    }
-
-    private void dropTable(String tableFullName) throws SQLException {
-      Preconditions.checkNotNull(conn);
-      conn.createStatement().execute(String.format("DROP TABLE IF EXISTS 
%s",tableFullName));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
index 9106cdd..9e9434c 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
@@ -19,69 +19,28 @@
  */
 package org.apache.phoenix.pig;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Collection;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.joda.time.DateTime;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
 
-public class PhoenixHBaseStorerIT extends BaseHBaseManagedTimeIT {
-
-    private static TupleFactory tupleFactory;
-    private static Connection conn;
-    private static PigServer pigServer;
-    private static String zkQuorum;
-    
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-        conn = DriverManager.getConnection(getUrl());
-        zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + 
getZKClientPort(getTestClusterConfig());
-        // Pig variables
-        tupleFactory = TupleFactory.getInstance();
-    }
-    
-    @Before
-    public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.LOCAL, getTestClusterConfig());
-    }
-    
-    @After
-    public void tearDown() throws Exception {
-         pigServer.shutdown();
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        conn.close();
-    }
-
+public class PhoenixHBaseStorerIT extends BasePigIT {
     /**
      * Basic test - writes data to a Phoenix table and compares the data 
written
      * to expected

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
index cea1a8a..30ce132 100644
--- 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
+++ 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.pig.udf;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -29,17 +27,12 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.pig.BasePigIT;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.UDFContext;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -47,37 +40,33 @@ import org.junit.rules.ExpectedException;
 /**
  * Test class to run all the Pig Sequence UDF integration tests against a 
virtual map reduce cluster.
  */
-public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT {
+public class ReserveNSequenceTestIT extends BasePigIT {
 
     private static final String CREATE_SEQUENCE_SYNTAX = "CREATE SEQUENCE %s 
START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CACHE %s";
     private static final String SEQUENCE_NAME = "my_schema.my_sequence";
     private static final long MAX_VALUE = 10;
 
-    private static TupleFactory TF;
-    private static Connection globalConn;
-    private static String zkQuorum;
-    private static Configuration conf;
     private static UDFContext udfContext;
 
     @Rule
     public ExpectedException thrown = ExpectedException.none();
 
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-        conf = getTestClusterConfig();
-        zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + 
getZKClientPort(getTestClusterConfig());
-        conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
-        globalConn = DriverManager.getConnection(getUrl());
-        // Pig variables
-        TF = TupleFactory.getInstance();
-    }
-
+    @Override
     @Before
-    public void setUp() throws SQLException {
-        createSequence(globalConn);
+    public void setUp() throws Exception {
+        super.setUp();
+        createSequence(conn);
         createUdfContext();
     }
 
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        udfContext.reset();
+        dropSequence(conn);
+        super.tearDown();
+    }
+
     @Test
     public void testReserve() throws Exception {
         doTest(new UDFTestProperties(1));
@@ -161,7 +150,7 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
             doTest(tenantConn, props);
 
             // validate global sequence value is still set to 1
-            assertEquals(1L, getNextSequenceValue(globalConn));
+            assertEquals(1L, getNextSequenceValue(conn));
         } finally {
             dropSequence(tenantConn);
         }
@@ -174,27 +163,27 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
      */
     @Test
     public void testMultipleTuples() throws Exception {
-        Tuple tuple = TF.newTuple(2);
+        Tuple tuple = tupleFactory.newTuple(2);
         tuple.set(0, 2L);
         tuple.set(1, SEQUENCE_NAME);
 
-        final String tentantId = 
globalConn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB);
+        final String tentantId = 
conn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB);
         ReserveNSequence udf = new ReserveNSequence(zkQuorum, tentantId);
 
         for (int i = 0; i < 2; i++) {
             udf.exec(tuple);
         }
-        long nextValue = getNextSequenceValue(globalConn);
+        long nextValue = getNextSequenceValue(conn);
         assertEquals(5L, nextValue);
     }
     
     private void doTest(UDFTestProperties props) throws Exception {
-        doTest(globalConn, props);
+        doTest(conn, props);
     }
 
     private void doTest(Connection conn, UDFTestProperties props) throws 
Exception {
         setCurrentValue(conn, props.getCurrentValue());
-        Tuple tuple = TF.newTuple(3);
+        Tuple tuple = tupleFactory.newTuple(3);
         tuple.set(0, props.getNumToReserve());
         tuple.set(1, props.getSequenceName());
         tuple.set(2, zkQuorum);
@@ -251,17 +240,6 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
         conn.commit();
     }
 
-    @After
-    public void tearDown() throws Exception {
-        udfContext.reset();
-        dropSequence(globalConn);
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        globalConn.close();
-    }
-
     /**
      * Static class to define properties for the test
      */

Reply via email to