Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 ac62506fb -> 10d1b64e2
PHOENIX-2419 Fix flapping Pig tests Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/10d1b64e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/10d1b64e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/10d1b64e Branch: refs/heads/4.x-HBase-0.98 Commit: 10d1b64e2dd05db50211948fa62fa95f7b4e7613 Parents: ac62506 Author: James Taylor <[email protected]> Authored: Sun Nov 15 23:28:09 2015 -0800 Committer: James Taylor <[email protected]> Committed: Sun Nov 15 23:28:09 2015 -0800 ---------------------------------------------------------------------- .../phoenix/pig/PhoenixHBaseLoaderIT.java | 391 +++++++++---------- 1 file changed, 183 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/10d1b64e/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java index bc614bf..accf453 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -40,14 +39,12 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.junit.Test; -import com.google.common.base.Preconditions; - /** * * Test class to run all the integration tests against a virtual map reduce cluster. */ public class PhoenixHBaseLoaderIT extends BasePigIT { - + private static final Log LOG = LogFactory.getLog(PhoenixHBaseLoaderIT.class); private static final String SCHEMA_NAME = "T"; private static final String TABLE_NAME = "A"; @@ -71,7 +68,7 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + final Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); assertEquals(4, fields.size()); @@ -91,18 +88,18 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { */ @Test public void testSchemaForTableWithSpecificColumns() throws Exception { - + //create the table final String TABLE = "TABLE2"; final String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) "; conn.createStatement().execute(ddl); - + final String selectColumns = "ID,NAME"; pigServer.registerQuery(String.format( "A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, selectColumns, zkQuorum)); - + Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); assertEquals(2, fields.size()); @@ -111,29 +108,29 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME")); assertTrue(fields.get(1).type == DataType.CHARARRAY); } - + /** * Validates the schema returned when a SQL SELECT query is given as part of LOAD . * @throws Exception */ @Test public void testSchemaForQuery() throws Exception { - - //create the table. + + //create the table. final String TABLE = "TABLE3"; String ddl = String.format("CREATE TABLE " + TABLE + - " (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE" + " (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE" + " CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", TABLE); conn.createStatement().execute(ddl); - - + + //sql query for LOAD final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE; pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - + //assert the schema. Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); @@ -145,14 +142,14 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double")); assertTrue(fields.get(2).type == DataType.DOUBLE); } - + /** * Validates the schema when it is given as part of LOAD..AS * @throws Exception */ @Test public void testSchemaForTableWithAlias() throws Exception { - + //create the table. final String TABLE = "S.TABLE4"; String ddl = "CREATE TABLE " + TABLE @@ -162,13 +159,13 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { //select query given as part of LOAD. final String sqlQuery = "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE; - + LOG.info(String.format("Generated SQL Query [%s]",sqlQuery)); - + pigServer.registerQuery(String.format( "raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);", sqlQuery, zkQuorum)); - + //test the schema. Schema schema = pigServer.dumpSchema("raw"); List<FieldSchema> fields = schema.getFields(); @@ -182,19 +179,19 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { assertTrue(fields.get(3).alias.equalsIgnoreCase("d")); assertTrue(fields.get(3).type == DataType.DOUBLE); } - + /** * @throws Exception */ @Test public void testDataForTable() throws Exception { - - //create the table - String ddl = "CREATE TABLE " + CASE_SENSITIVE_TABLE_FULL_NAME + + //create the table + String ddl = "CREATE TABLE " + CASE_SENSITIVE_TABLE_FULL_NAME + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) "; - + conn.createStatement().execute(ddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " VALUES(?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -206,13 +203,13 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.execute(); } conn.commit(); - + //load data and filter rows whose age is > 25 pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME, zkQuorum)); pigServer.registerQuery("B = FILTER A BY AGE > 25;"); - + final Iterator<Tuple> iterator = pigServer.openIterator("B"); int recordsRead = 0; while (iterator.hasNext()) { @@ -222,19 +219,19 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { } assertEquals(rows/2, recordsRead); } - + /** * @throws Exception */ @Test public void testDataForSQLQuery() throws Exception { - - //create the table - String ddl = "CREATE TABLE " + TABLE_FULL_NAME + + //create the table + String ddl = "CREATE TABLE " + TABLE_FULL_NAME + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) "; - + conn.createStatement().execute(ddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -246,14 +243,14 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.execute(); } conn.commit(); - + //sql query final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25"; //load data and filter rows whose age is > 25 pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - + final Iterator<Tuple> iterator = pigServer.openIterator("A"); int recordsRead = 0; while (iterator.hasNext()) { @@ -262,21 +259,21 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { } assertEquals(rows/2, recordsRead); } - + /** * * @throws Exception */ @Test public void testForNonPKSQLQuery() throws Exception { - - //create the table + + //create the table final String TABLE = "TABLE5"; String ddl = "CREATE TABLE " + TABLE + " ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ UNSIGNED_INT)"; - + conn.createStatement().execute(ddl); - + //upsert data. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?) "; PreparedStatement stmt = conn.prepareStatement(dml); @@ -285,22 +282,22 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.setInt(3,-1); stmt.setInt(4,1); stmt.execute(); - + stmt.setString(1, "b"); stmt.setString(2, "b"); stmt.setInt(3,-2); stmt.setInt(4,2); stmt.execute(); - + conn.commit(); - + //sql query final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE); - + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - + final Iterator<Tuple> iterator = pigServer.openIterator("A"); int recordsRead = 0; while (iterator.hasNext()) { @@ -310,7 +307,7 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { recordsRead++; } assertEquals(1, recordsRead); - + //test the schema. Test for PHOENIX-1123 Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); @@ -320,20 +317,20 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { assertTrue(fields.get(1).alias.equalsIgnoreCase("BAZ")); assertTrue(fields.get(1).type == DataType.INTEGER); } - + /** * @throws Exception */ @Test public void testGroupingOfDataForTable() throws Exception { - - //create the table + + //create the table final String TABLE = "TABLE6"; String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) "; - + conn.createStatement().execute(ddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -349,135 +346,123 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.setInt(3, 30); stmt.setInt(4, 10 * 3 * k++); } - + stmt.execute(); } conn.commit(); - + //prepare the mock storage with expected output final Data data = Storage.resetData(pigServer); List<Tuple> expectedList = new ArrayList<Tuple>(); expectedList.add(Storage.tuple(0,180)); expectedList.add(Storage.tuple(0,270)); - - //load data and filter rows whose age is > 25 + + //load data and filter rows whose age is > 25 pigServer.setBatchOn(); pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + pigServer.registerQuery("B = GROUP A BY AGE;"); pigServer.registerQuery("C = FOREACH B GENERATE MIN(A.SAL),MAX(A.SAL);"); pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();"); pigServer.executeBatch(); - + List<Tuple> actualList = data.get("out"); assertEquals(expectedList, actualList); } - + @Test public void testTimestampForSQLQuery() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) "; - conn.createStatement().execute(ddl); - - final String dml = "UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))"; - conn.createStatement().execute(dml); - conn.commit(); - - //sql query - final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T "; - pigServer.registerQuery(String.format( + //create the table + String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) "; + conn.createStatement().execute(ddl); + + final String dml = "UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))"; + conn.createStatement().execute(dml); + conn.commit(); + + //sql query + final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T "; + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - final Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("foo", tuple.get(0)); - assertEquals(2006, tuple.get(1)); - } - } finally { - dropTable("TIMESTAMP_T"); + final Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("foo", tuple.get(0)); + assertEquals(2006, tuple.get(1)); } } - + @Test public void testDateForSQLQuery() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date CONSTRAINT PK PRIMARY KEY (MYKEY)) "; - conn.createStatement().execute(ddl); - - final String dml = "UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))"; - conn.createStatement().execute(dml); - conn.commit(); - - //sql query - final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T "; - pigServer.registerQuery(String.format( + //create the table + String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date CONSTRAINT PK PRIMARY KEY (MYKEY)) "; + conn.createStatement().execute(ddl); + + final String dml = "UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))"; + conn.createStatement().execute(dml); + conn.commit(); + + //sql query + final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T "; + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - final Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("foo", tuple.get(0)); - assertEquals(10, tuple.get(1)); - } - } finally { - dropTable("DATE_T"); + final Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("foo", tuple.get(0)); + assertEquals(10, tuple.get(1)); } } @Test public void testTimeForSQLQuery() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME CONSTRAINT PK PRIMARY KEY (MYKEY)) "; - conn.createStatement().execute(ddl); - - final String dml = "UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))"; - conn.createStatement().execute(dml); - conn.commit(); - - //sql query - final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T "; - pigServer.registerQuery(String.format( + //create the table + String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME CONSTRAINT PK PRIMARY KEY (MYKEY)) "; + conn.createStatement().execute(ddl); + + final String dml = "UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))"; + conn.createStatement().execute(dml); + conn.commit(); + + //sql query + final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T "; + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - final Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("foo", tuple.get(0)); - assertEquals(30, tuple.get(1)); - } - } finally { - dropTable("TIME_T"); + final Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("foo", tuple.get(0)); + assertEquals(30, tuple.get(1)); } } - + /** * Tests both {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage} * @throws Exception */ @Test public void testLoadAndStore() throws Exception { - - //create the tables + + //create the tables final String TABLE = "TABLE7"; final String sourceTableddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) "; - + final String targetTable = "AGGREGATE"; final String targetTableddl = "CREATE TABLE " + targetTable - + "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) "; - + + "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) "; + conn.createStatement().execute(sourceTableddl); conn.createStatement().execute(targetTableddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -493,25 +478,25 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.setInt(3, 30); stmt.setInt(4, 10 * 3 * k++); } - + stmt.execute(); } conn.commit(); - - - //load data and filter rows whose age is > 25 + + + //load data and filter rows whose age is > 25 pigServer.setBatchOn(); pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + pigServer.registerQuery("B = GROUP A BY AGE;"); pigServer.registerQuery("C = FOREACH B GENERATE group as AGE,MIN(A.SAL),MAX(A.SAL);"); pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable + "' using " + PhoenixHBaseStorage.class.getName() + "('" - + zkQuorum + "', '-batchSize 1000');"); + + zkQuorum + "', '-batchSize 1000');"); pigServer.executeBatch(); - + //validate the data with what is stored. final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE"; final ResultSet rs = conn.createStatement().executeQuery(selectQuery); @@ -524,25 +509,25 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { assertEquals(0, rs.getInt("MIN_SAL")); assertEquals(270, rs.getInt("MAX_SAL")); } - - /** + + /** * Test for Sequence * @throws Exception */ @Test public void testDataForSQLQueryWithSequences() throws Exception { - - //create the table + + //create the table final String TABLE = "TABLE8"; String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) "; - + conn.createStatement().execute(ddl); - + String sequenceDdl = "CREATE SEQUENCE my_sequence"; - + conn.createStatement().execute(sequenceDdl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -554,14 +539,14 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.execute(); } conn.commit(); - + //sql query load data and filter rows whose age is > 25 final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE + " WHERE AGE > 25"; pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - - + + Iterator<Tuple> iterator = pigServer.openIterator("A"); int recordsRead = 0; while (iterator.hasNext()) { @@ -570,17 +555,17 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { } assertEquals(rows/2, recordsRead); } - - @Test + + @Test public void testDataForSQLQueryWithFunctions() throws Exception { - - //create the table - final String TABLE = "TABLE9"; - String ddl = "CREATE TABLE " + TABLE + + //create the table + final String TABLE = "TABLE9"; + String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) "; - + conn.createStatement().execute(ddl); - + final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); int rows = 20; @@ -590,15 +575,15 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.execute(); } conn.commit(); - + //sql query final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE + " ORDER BY ID" ; pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - - + + Iterator<Tuple> iterator = pigServer.openIterator("A"); int i = 0; while (iterator.hasNext()) { @@ -607,54 +592,49 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { assertEquals("A" + i, name); i++; } - + } - - @Test - public void testDataFromIndexTable() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE " + TABLE_NAME - + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true"; - - conn.createStatement().execute(ddl); - - //create a index table - String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + " (EMPLID) INCLUDE (NAME) "; - conn.createStatement().execute(indexDdl); - - //upsert the data. - final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)"; - PreparedStatement stmt = conn.prepareStatement(dml); - int rows = 20; - for(int i = 0 ; i < rows; i++) { - stmt.setInt(1, i); - stmt.setString(2, "a"+i); - stmt.setInt(3, i * 5); - stmt.execute(); - } - conn.commit(); - pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('"+zkQuorum + "') ;"); - Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("a5", tuple.get(0)); - assertEquals(25, tuple.get(1)); - } - } finally { - dropTable(TABLE_NAME); - dropTable(INDEX_NAME); + + @Test + public void testDataFromIndexTable() throws Exception { + //create the table + String ddl = "CREATE TABLE " + TABLE_NAME + + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true"; + + conn.createStatement().execute(ddl); + + //create a index table + String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + " (EMPLID) INCLUDE (NAME) "; + conn.createStatement().execute(indexDdl); + + //upsert the data. + final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + int rows = 20; + for(int i = 0 ; i < rows; i++) { + stmt.setInt(1, i); + stmt.setString(2, "a"+i); + stmt.setInt(3, i * 5); + stmt.execute(); + } + conn.commit(); + pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('"+zkQuorum + "') ;"); + Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("a5", tuple.get(0)); + assertEquals(25, tuple.get(1)); } } - - @Test - public void testLoadOfSaltTable() throws Exception { - final String TABLE = "TABLE11"; + + @Test + public void testLoadOfSaltTable() throws Exception { + final String TABLE = "TABLE11"; final String sourceTableddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) SALT_BUCKETS=2 "; - + conn.createStatement().execute(sourceTableddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -670,32 +650,27 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { stmt.setInt(3, 30); stmt.setInt(4, 10 * 3 * k++); } - + stmt.execute(); } conn.commit(); - + final Data data = Storage.resetData(pigServer); List<Tuple> expectedList = new ArrayList<Tuple>(); expectedList.add(Storage.tuple(25,10)); expectedList.add(Storage.tuple(30,10)); - + pigServer.setBatchOn(); pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + pigServer.registerQuery("B = GROUP A BY AGE;"); pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);"); pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();"); pigServer.executeBatch(); - + List<Tuple> actualList = data.get("out"); assertEquals(expectedList.size(), actualList.size()); - } - - private void dropTable(String tableFullName) throws SQLException { - Preconditions.checkNotNull(conn); - conn.createStatement().execute(String.format("DROP TABLE IF EXISTS %s",tableFullName)); } } \ No newline at end of file
