Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 ac62506fb -> 10d1b64e2


PHOENIX-2419 Fix flapping Pig tests


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/10d1b64e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/10d1b64e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/10d1b64e

Branch: refs/heads/4.x-HBase-0.98
Commit: 10d1b64e2dd05db50211948fa62fa95f7b4e7613
Parents: ac62506
Author: James Taylor <[email protected]>
Authored: Sun Nov 15 23:28:09 2015 -0800
Committer: James Taylor <[email protected]>
Committed: Sun Nov 15 23:28:09 2015 -0800

----------------------------------------------------------------------
 .../phoenix/pig/PhoenixHBaseLoaderIT.java       | 391 +++++++++----------
 1 file changed, 183 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/10d1b64e/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index bc614bf..accf453 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -40,14 +39,12 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.junit.Test;
 
-import com.google.common.base.Preconditions;
-
 /**
  * 
  * Test class to run all the integration tests against a virtual map reduce 
cluster.
  */
 public class PhoenixHBaseLoaderIT extends BasePigIT {
-    
+
     private static final Log LOG = 
LogFactory.getLog(PhoenixHBaseLoaderIT.class);
     private static final String SCHEMA_NAME = "T";
     private static final String TABLE_NAME = "A";
@@ -71,7 +68,7 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         final Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
         assertEquals(4, fields.size());
@@ -91,18 +88,18 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
      */
     @Test
     public void testSchemaForTableWithSpecificColumns() throws Exception {
-        
+
         //create the table
         final String TABLE = "TABLE2";
         final String ddl = "CREATE TABLE " + TABLE
                 + "  (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE 
INTEGER) ";
         conn.createStatement().execute(ddl);
-        
+
         final String selectColumns = "ID,NAME";
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');",
                 TABLE, selectColumns, zkQuorum));
-        
+
         Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
         assertEquals(2, fields.size());
@@ -111,29 +108,29 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME"));
         assertTrue(fields.get(1).type == DataType.CHARARRAY);
     }
-    
+
     /**
      * Validates the schema returned when a SQL SELECT query is given as part 
of LOAD .
      * @throws Exception
      */
     @Test
     public void testSchemaForQuery() throws Exception {
-        
-       //create the table.
+
+        //create the table.
         final String TABLE = "TABLE3";
         String ddl = String.format("CREATE TABLE " + TABLE +
-                 "  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, 
CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+                "  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, 
CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
                 + "  CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", 
TABLE);
         conn.createStatement().execute(ddl);
 
-        
-        
+
+
         //sql query for LOAD
         final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE 
FROM " + TABLE;
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');",
                 sqlQuery, zkQuorum));
-        
+
         //assert the schema.
         Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
@@ -145,14 +142,14 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double"));
         assertTrue(fields.get(2).type == DataType.DOUBLE);
     }
-    
+
     /**
      * Validates the schema when it is given as part of LOAD..AS
      * @throws Exception
      */
     @Test
     public void testSchemaForTableWithAlias() throws Exception {
-        
+
         //create the table.
         final String TABLE = "S.TABLE4";
         String ddl = "CREATE TABLE  " + TABLE
@@ -162,13 +159,13 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
 
         //select query given as part of LOAD.
         final String sqlQuery = "SELECT 
A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE;
-        
+
         LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
-        
+
         pigServer.registerQuery(String.format(
                 "raw = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s') AS 
(a:chararray,b:bigdecimal,c:int,d:double);",
                 sqlQuery, zkQuorum));
-        
+
         //test the schema.
         Schema schema = pigServer.dumpSchema("raw");
         List<FieldSchema> fields = schema.getFields();
@@ -182,19 +179,19 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         assertTrue(fields.get(3).alias.equalsIgnoreCase("d"));
         assertTrue(fields.get(3).type == DataType.DOUBLE);
     }
-    
+
     /**
      * @throws Exception
      */
     @Test
     public void testDataForTable() throws Exception {
-        
-         //create the table
-         String ddl = "CREATE TABLE  " + CASE_SENSITIVE_TABLE_FULL_NAME 
+
+        //create the table
+        String ddl = "CREATE TABLE  " + CASE_SENSITIVE_TABLE_FULL_NAME 
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " 
VALUES(?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -206,13 +203,13 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
             stmt.execute();    
         }
         conn.commit();
-         
+
         //load data and filter rows whose age is > 25
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using "  + 
PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME,
                 zkQuorum));
         pigServer.registerQuery("B = FILTER A BY AGE > 25;");
-        
+
         final Iterator<Tuple> iterator = pigServer.openIterator("B");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -222,19 +219,19 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         }
         assertEquals(rows/2, recordsRead);
     }
-    
+
     /**
      * @throws Exception
      */
     @Test
     public void testDataForSQLQuery() throws Exception {
-        
-         //create the table
-         String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+
+        //create the table
+        String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -246,14 +243,14 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
             stmt.execute();    
         }
         conn.commit();
-        
+
         //sql query
         final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME 
+ " WHERE AGE > 25";
         //load data and filter rows whose age is > 25
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
-        
+
         final Iterator<Tuple> iterator = pigServer.openIterator("A");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -262,21 +259,21 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         }
         assertEquals(rows/2, recordsRead);
     }
-    
+
     /**
      * 
      * @throws Exception
      */
     @Test
     public void testForNonPKSQLQuery() throws Exception {
-        
-         //create the table
+
+        //create the table
         final String TABLE = "TABLE5";
         String ddl = "CREATE TABLE  " + TABLE
                 + " ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ 
UNSIGNED_INT)";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //upsert data.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?) ";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -285,22 +282,22 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         stmt.setInt(3,-1);
         stmt.setInt(4,1);
         stmt.execute();
-       
+
         stmt.setString(1, "b");
         stmt.setString(2, "b");
         stmt.setInt(3,-2);
         stmt.setInt(4,2);
         stmt.execute();
-        
+
         conn.commit();
-        
+
         //sql query
         final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE 
BAR = -1 " , TABLE);
-      
+
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
-        
+
         final Iterator<Tuple> iterator = pigServer.openIterator("A");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -310,7 +307,7 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
             recordsRead++;
         }
         assertEquals(1, recordsRead);
-        
+
         //test the schema. Test for PHOENIX-1123
         Schema schema = pigServer.dumpSchema("A");
         List<FieldSchema> fields = schema.getFields();
@@ -320,20 +317,20 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         assertTrue(fields.get(1).alias.equalsIgnoreCase("BAZ"));
         assertTrue(fields.get(1).type == DataType.INTEGER);
     }
-    
+
     /**
      * @throws Exception
      */
     @Test
     public void testGroupingOfDataForTable() throws Exception {
-        
-         //create the table
+
+        //create the table
         final String TABLE = "TABLE6";
         String ddl = "CREATE TABLE  " + TABLE
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER, SAL INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -349,135 +346,123 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
                 stmt.setInt(3, 30);
                 stmt.setInt(4, 10 * 3 * k++);
             }
-            
+
             stmt.execute();    
         }
         conn.commit();
-        
+
         //prepare the mock storage with expected output
         final Data data = Storage.resetData(pigServer);
         List<Tuple> expectedList = new ArrayList<Tuple>();
         expectedList.add(Storage.tuple(0,180));
         expectedList.add(Storage.tuple(0,270));
-        
-         //load data and filter rows whose age is > 25
+
+        //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         pigServer.registerQuery("B = GROUP A BY AGE;");
         pigServer.registerQuery("C = FOREACH B GENERATE 
MIN(A.SAL),MAX(A.SAL);");
         pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
         pigServer.executeBatch();
-        
+
         List<Tuple> actualList = data.get("out");
         assertEquals(expectedList, actualList);
     }
-        
+
     @Test
     public void testTimestampForSQLQuery() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP 
TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
-            conn.createStatement().execute(ddl);
-
-            final String dml = "UPSERT INTO TIMESTAMP_T 
VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))";
-            conn.createStatement().execute(dml);
-            conn.commit();
-
-            //sql query
-            final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM 
TIMESTAMP_T ";
-            pigServer.registerQuery(String.format(
+        //create the table
+        String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP 
TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
+        conn.createStatement().execute(ddl);
+
+        final String dml = "UPSERT INTO TIMESTAMP_T 
VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        //sql query
+        final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM 
TIMESTAMP_T ";
+        pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
 
-            final Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("foo", tuple.get(0));
-                assertEquals(2006, tuple.get(1));
-            }
-        } finally {
-            dropTable("TIMESTAMP_T");
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("foo", tuple.get(0));
+            assertEquals(2006, tuple.get(1));
         }
     }
-    
+
     @Test
     public void testDateForSQLQuery() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
-            conn.createStatement().execute(ddl);
-
-            final String dml = "UPSERT INTO DATE_T 
VALUES('foo',TO_DATE('2004-03-10 10:00:00'))";
-            conn.createStatement().execute(dml);
-            conn.commit();
-
-            //sql query
-            final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T 
";
-            pigServer.registerQuery(String.format(
+        //create the table
+        String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
+        conn.createStatement().execute(ddl);
+
+        final String dml = "UPSERT INTO DATE_T 
VALUES('foo',TO_DATE('2004-03-10 10:00:00'))";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        //sql query
+        final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T ";
+        pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
 
-            final Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("foo", tuple.get(0));
-                assertEquals(10, tuple.get(1));
-            }
-        } finally {
-            dropTable("DATE_T");
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("foo", tuple.get(0));
+            assertEquals(10, tuple.get(1));
         }
     }
 
     @Test
     public void testTimeForSQLQuery() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
-            conn.createStatement().execute(ddl);
-
-            final String dml = "UPSERT INTO TIME_T 
VALUES('foo',TO_TIME('2008-05-16 00:30:00'))";
-            conn.createStatement().execute(dml);
-            conn.commit();
-
-            //sql query
-            final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM 
TIME_T ";
-            pigServer.registerQuery(String.format(
+        //create the table
+        String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME 
CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
+        conn.createStatement().execute(ddl);
+
+        final String dml = "UPSERT INTO TIME_T 
VALUES('foo',TO_TIME('2008-05-16 00:30:00'))";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        //sql query
+        final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T ";
+        pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
                 zkQuorum));
 
-            final Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("foo", tuple.get(0));
-                assertEquals(30, tuple.get(1));
-            }
-        } finally {
-            dropTable("TIME_T");
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("foo", tuple.get(0));
+            assertEquals(30, tuple.get(1));
         }
     }
-    
+
     /**
      * Tests both  {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage} 
      * @throws Exception
      */
     @Test
     public void testLoadAndStore() throws Exception {
-        
-         //create the tables
+
+        //create the tables
         final String TABLE = "TABLE7";
         final String sourceTableddl = "CREATE TABLE  " + TABLE
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER, SAL INTEGER) ";
-         
+
         final String targetTable = "AGGREGATE";
         final String targetTableddl = "CREATE TABLE " + targetTable
-                 +  "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , 
MAX_SAL INTEGER) ";
-                 
+                +  "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , 
MAX_SAL INTEGER) ";
+
         conn.createStatement().execute(sourceTableddl);
         conn.createStatement().execute(targetTableddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -493,25 +478,25 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
                 stmt.setInt(3, 30);
                 stmt.setInt(4, 10 * 3 * k++);
             }
-            
+
             stmt.execute();    
         }
         conn.commit();
-        
-            
-         //load data and filter rows whose age is > 25
+
+
+        //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         pigServer.registerQuery("B = GROUP A BY AGE;");
         pigServer.registerQuery("C = FOREACH B GENERATE group as 
AGE,MIN(A.SAL),MAX(A.SAL);");
         pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable 
                 + "' using " + PhoenixHBaseStorage.class.getName() + "('"
-                 + zkQuorum + "', '-batchSize 1000');");
+                + zkQuorum + "', '-batchSize 1000');");
         pigServer.executeBatch();
-        
+
         //validate the data with what is stored.
         final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + 
targetTable + " ORDER BY AGE";
         final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
@@ -524,25 +509,25 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         assertEquals(0, rs.getInt("MIN_SAL"));
         assertEquals(270, rs.getInt("MAX_SAL"));
     }
-       
-       /**
+
+    /**
      * Test for Sequence
      * @throws Exception
      */
     @Test
     public void testDataForSQLQueryWithSequences() throws Exception {
-        
-         //create the table
+
+        //create the table
         final String TABLE = "TABLE8";
         String ddl = "CREATE TABLE " + TABLE
                 + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         String sequenceDdl = "CREATE SEQUENCE my_sequence";
-                
+
         conn.createStatement().execute(sequenceDdl);
-           
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -554,14 +539,14 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
             stmt.execute();
         }
         conn.commit();
-        
+
         //sql query load data and filter rows whose age is > 25
         final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS 
my_seq,ID,NAME,AGE FROM " + TABLE + " WHERE AGE > 25";
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
-        
-        
+
+
         Iterator<Tuple> iterator = pigServer.openIterator("A");
         int recordsRead = 0;
         while (iterator.hasNext()) {
@@ -570,17 +555,17 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         }
         assertEquals(rows/2, recordsRead);
     }
-       
-        @Test
+
+    @Test
     public void testDataForSQLQueryWithFunctions() throws Exception {
-        
-         //create the table
-         final String TABLE = "TABLE9";
-         String ddl = "CREATE TABLE " + TABLE
+
+        //create the table
+        final String TABLE = "TABLE9";
+        String ddl = "CREATE TABLE " + TABLE
                 + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) ";
-                
+
         conn.createStatement().execute(ddl);
-        
+
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
         int rows = 20;
@@ -590,15 +575,15 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
             stmt.execute();
         }
         conn.commit();
-        
+
         //sql query
         final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE + " 
ORDER BY ID" ;
 
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://query/%s' using "  + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
-        
-        
+
+
         Iterator<Tuple> iterator = pigServer.openIterator("A");
         int i = 0;
         while (iterator.hasNext()) {
@@ -607,54 +592,49 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
             assertEquals("A" + i, name);
             i++;
         }
-    
+
     }
-        
-        @Test
-        public void testDataFromIndexTable() throws Exception {
-        try {
-            //create the table
-            String ddl = "CREATE TABLE " + TABLE_NAME
-                    + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID 
INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
-                   
-            conn.createStatement().execute(ddl);
-           
-            //create a index table
-            String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + 
TABLE_NAME + " (EMPLID) INCLUDE (NAME) ";
-            conn.createStatement().execute(indexDdl);
-           
-            //upsert the data.
-            final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)";
-            PreparedStatement stmt = conn.prepareStatement(dml);
-            int rows = 20;
-            for(int i = 0 ; i < rows; i++) {
-                stmt.setInt(1, i);
-                stmt.setString(2, "a"+i);
-                stmt.setInt(3, i * 5);
-                stmt.execute();
-            }
-            conn.commit();
-            pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , 
EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() 
+ "('"+zkQuorum + "')  ;");
-            Iterator<Tuple> iterator = pigServer.openIterator("A");
-            while (iterator.hasNext()) {
-                Tuple tuple = iterator.next();
-                assertEquals("a5", tuple.get(0));
-                assertEquals(25, tuple.get(1));
-            }
-        } finally {
-          dropTable(TABLE_NAME);
-          dropTable(INDEX_NAME);
+
+    @Test
+    public void testDataFromIndexTable() throws Exception {
+        //create the table
+        String ddl = "CREATE TABLE " + TABLE_NAME
+                + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID 
INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
+
+        conn.createStatement().execute(ddl);
+
+        //create a index table
+        String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME 
+ " (EMPLID) INCLUDE (NAME) ";
+        conn.createStatement().execute(indexDdl);
+
+        //upsert the data.
+        final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        int rows = 20;
+        for(int i = 0 ; i < rows; i++) {
+            stmt.setInt(1, i);
+            stmt.setString(2, "a"+i);
+            stmt.setInt(3, i * 5);
+            stmt.execute();
+        }
+        conn.commit();
+        pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID 
FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + 
"('"+zkQuorum + "')  ;");
+        Iterator<Tuple> iterator = pigServer.openIterator("A");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            assertEquals("a5", tuple.get(0));
+            assertEquals(25, tuple.get(1));
         }
     }
-        
-       @Test 
-       public void testLoadOfSaltTable() throws Exception {
-           final String TABLE = "TABLE11";
+
+    @Test 
+    public void testLoadOfSaltTable() throws Exception {
+        final String TABLE = "TABLE11";
         final String sourceTableddl = "CREATE TABLE  " + TABLE
                 + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE 
INTEGER, SAL INTEGER) SALT_BUCKETS=2  ";
-         
+
         conn.createStatement().execute(sourceTableddl);
-        
+
         //prepare data with 10 rows having age 25 and the other 30.
         final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)";
         PreparedStatement stmt = conn.prepareStatement(dml);
@@ -670,32 +650,27 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
                 stmt.setInt(3, 30);
                 stmt.setInt(4, 10 * 3 * k++);
             }
-            
+
             stmt.execute();    
         }
         conn.commit();
-            
+
         final Data data = Storage.resetData(pigServer);
         List<Tuple> expectedList = new ArrayList<Tuple>();
         expectedList.add(Storage.tuple(25,10));
         expectedList.add(Storage.tuple(30,10));
-        
+
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
                 "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
                 zkQuorum));
-        
+
         pigServer.registerQuery("B = GROUP A BY AGE;");
         pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);");
         pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
         pigServer.executeBatch();
-        
+
         List<Tuple> actualList = data.get("out");
         assertEquals(expectedList.size(), actualList.size());
-       }
-    
-    private void dropTable(String tableFullName) throws SQLException {
-      Preconditions.checkNotNull(conn);
-      conn.createStatement().execute(String.format("DROP TABLE IF EXISTS 
%s",tableFullName));
     }
 }
\ No newline at end of file

Reply via email to