PHOENIX-2419 Fix flapping Pig tests
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e9d44986 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e9d44986 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e9d44986 Branch: refs/heads/txn Commit: e9d449865449ce0ad5406b3680d264db77d991ef Parents: 7afa91a Author: James Taylor <[email protected]> Authored: Sun Nov 15 18:25:15 2015 -0800 Committer: James Taylor <[email protected]> Committed: Mon Nov 16 09:02:16 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/phoenix/pig/BasePigIT.java | 83 ++++ .../phoenix/pig/PhoenixHBaseLoaderIT.java | 423 ++++++++----------- .../phoenix/pig/PhoenixHBaseStorerIT.java | 43 +- .../phoenix/pig/udf/ReserveNSequenceTestIT.java | 62 +-- 4 files changed, 288 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java new file mode 100644 index 0000000..94ccc25 --- /dev/null +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.pig; + +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.TestUtil.LOCALHOST; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.TupleFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; + +import com.google.common.collect.Maps; + +public class BasePigIT extends BaseHBaseManagedTimeIT { + protected TupleFactory tupleFactory; + protected String zkQuorum; + protected Connection conn; + protected Configuration conf; + protected PigServer pigServer; + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void setUp() throws Exception { + conf = getTestClusterConfig(); + conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + pigServer = new PigServer(ExecType.LOCAL, conf); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(conf); + tupleFactory = TupleFactory.getInstance(); + } + + @After + public void tearDown() throws Exception { + if(conn != null) { + conn.close(); + } + if (pigServer != null) { + pigServer.shutdown(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java index 2f33b5f..606282a 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java @@ -19,47 +19,32 @@ */ package org.apache.phoenix.pig; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; -import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import com.google.common.base.Preconditions; - /** * * Test class to run all the integration tests against a virtual map reduce cluster. */ -public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { - +public class PhoenixHBaseLoaderIT extends BasePigIT { + private static final Log LOG = LogFactory.getLog(PhoenixHBaseLoaderIT.class); private static final String SCHEMA_NAME = "T"; private static final String TABLE_NAME = "A"; @@ -67,17 +52,6 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { private static final String TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, TABLE_NAME); private static final String CASE_SENSITIVE_TABLE_NAME = SchemaUtil.getEscapedArgument("a"); private static final String CASE_SENSITIVE_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME,CASE_SENSITIVE_TABLE_NAME); - private String zkQuorum; - private Connection conn; - private PigServer pigServer; - - @Before - public void setUp() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - conn = DriverManager.getConnection(getUrl(), props); - zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig()); - pigServer = new PigServer(ExecType.LOCAL, getTestClusterConfig()); - } /** * Validates the schema returned for a table with Pig data types. @@ -94,7 +68,7 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + final Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); assertEquals(4, fields.size()); @@ -114,18 +88,18 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { */ @Test public void testSchemaForTableWithSpecificColumns() throws Exception { - + //create the table final String TABLE = "TABLE2"; final String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) "; conn.createStatement().execute(ddl); - + final String selectColumns = "ID,NAME"; pigServer.registerQuery(String.format( "A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, selectColumns, zkQuorum)); - + Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); assertEquals(2, fields.size()); @@ -134,29 +108,29 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME")); assertTrue(fields.get(1).type == DataType.CHARARRAY); } - + /** * Validates the schema returned when a SQL SELECT query is given as part of LOAD . * @throws Exception */ @Test public void testSchemaForQuery() throws Exception { - - //create the table. + + //create the table. final String TABLE = "TABLE3"; String ddl = String.format("CREATE TABLE " + TABLE + - " (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE" + " (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE" + " CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", TABLE); conn.createStatement().execute(ddl); - - + + //sql query for LOAD final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE; pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - + //assert the schema. Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); @@ -168,14 +142,14 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double")); assertTrue(fields.get(2).type == DataType.DOUBLE); } - + /** * Validates the schema when it is given as part of LOAD..AS * @throws Exception */ @Test public void testSchemaForTableWithAlias() throws Exception { - + //create the table. final String TABLE = "S.TABLE4"; String ddl = "CREATE TABLE " + TABLE @@ -185,13 +159,13 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { //select query given as part of LOAD. final String sqlQuery = "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE; - + LOG.info(String.format("Generated SQL Query [%s]",sqlQuery)); - + pigServer.registerQuery(String.format( "raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);", sqlQuery, zkQuorum)); - + //test the schema. Schema schema = pigServer.dumpSchema("raw"); List<FieldSchema> fields = schema.getFields(); @@ -205,19 +179,19 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { assertTrue(fields.get(3).alias.equalsIgnoreCase("d")); assertTrue(fields.get(3).type == DataType.DOUBLE); } - + /** * @throws Exception */ @Test public void testDataForTable() throws Exception { - - //create the table - String ddl = "CREATE TABLE " + CASE_SENSITIVE_TABLE_FULL_NAME + + //create the table + String ddl = "CREATE TABLE " + CASE_SENSITIVE_TABLE_FULL_NAME + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) "; - + conn.createStatement().execute(ddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " VALUES(?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -229,13 +203,13 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.execute(); } conn.commit(); - + //load data and filter rows whose age is > 25 pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME, zkQuorum)); pigServer.registerQuery("B = FILTER A BY AGE > 25;"); - + final Iterator<Tuple> iterator = pigServer.openIterator("B"); int recordsRead = 0; while (iterator.hasNext()) { @@ -245,19 +219,19 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { } assertEquals(rows/2, recordsRead); } - + /** * @throws Exception */ @Test public void testDataForSQLQuery() throws Exception { - - //create the table - String ddl = "CREATE TABLE " + TABLE_FULL_NAME + + //create the table + String ddl = "CREATE TABLE " + TABLE_FULL_NAME + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) "; - + conn.createStatement().execute(ddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -269,14 +243,14 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.execute(); } conn.commit(); - + //sql query final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25"; //load data and filter rows whose age is > 25 pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - + final Iterator<Tuple> iterator = pigServer.openIterator("A"); int recordsRead = 0; while (iterator.hasNext()) { @@ -285,21 +259,21 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { } assertEquals(rows/2, recordsRead); } - + /** * * @throws Exception */ @Test public void testForNonPKSQLQuery() throws Exception { - - //create the table + + //create the table final String TABLE = "TABLE5"; String ddl = "CREATE TABLE " + TABLE + " ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ UNSIGNED_INT)"; - + conn.createStatement().execute(ddl); - + //upsert data. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?) "; PreparedStatement stmt = conn.prepareStatement(dml); @@ -313,16 +287,16 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.setInt(3,-2); stmt.setInt(4,2); stmt.execute(); - + conn.commit(); - + //sql query final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE); - + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - + final Iterator<Tuple> iterator = pigServer.openIterator("A"); int recordsRead = 0; while (iterator.hasNext()) { @@ -332,7 +306,7 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { recordsRead++; } assertEquals(1, recordsRead); - + //test the schema. Test for PHOENIX-1123 Schema schema = pigServer.dumpSchema("A"); List<FieldSchema> fields = schema.getFields(); @@ -342,20 +316,20 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { assertTrue(fields.get(1).alias.equalsIgnoreCase("BAZ")); assertTrue(fields.get(1).type == DataType.INTEGER); } - + /** * @throws Exception */ @Test public void testGroupingOfDataForTable() throws Exception { - - //create the table + + //create the table final String TABLE = "TABLE6"; String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) "; - + conn.createStatement().execute(ddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -371,135 +345,123 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.setInt(3, 30); stmt.setInt(4, 10 * 3 * k++); } - + stmt.execute(); } conn.commit(); - + //prepare the mock storage with expected output final Data data = Storage.resetData(pigServer); List<Tuple> expectedList = new ArrayList<Tuple>(); expectedList.add(Storage.tuple(0,180)); expectedList.add(Storage.tuple(0,270)); - - //load data and filter rows whose age is > 25 + + //load data and filter rows whose age is > 25 pigServer.setBatchOn(); pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + pigServer.registerQuery("B = GROUP A BY AGE;"); pigServer.registerQuery("C = FOREACH B GENERATE MIN(A.SAL),MAX(A.SAL);"); pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();"); pigServer.executeBatch(); - + List<Tuple> actualList = data.get("out"); assertEquals(expectedList, actualList); } - + @Test public void testTimestampForSQLQuery() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) "; - conn.createStatement().execute(ddl); - - final String dml = "UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))"; - conn.createStatement().execute(dml); - conn.commit(); - - //sql query - final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T "; - pigServer.registerQuery(String.format( + //create the table + String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) "; + conn.createStatement().execute(ddl); + + final String dml = "UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))"; + conn.createStatement().execute(dml); + conn.commit(); + + //sql query + final String sqlQuery = " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T "; + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - final Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("foo", tuple.get(0)); - assertEquals(2006, tuple.get(1)); - } - } finally { - dropTable("TIMESTAMP_T"); + final Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("foo", tuple.get(0)); + assertEquals(2006, tuple.get(1)); } } - + @Test public void testDateForSQLQuery() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date CONSTRAINT PK PRIMARY KEY (MYKEY)) "; - conn.createStatement().execute(ddl); - - final String dml = "UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))"; - conn.createStatement().execute(dml); - conn.commit(); - - //sql query - final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T "; - pigServer.registerQuery(String.format( + //create the table + String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date CONSTRAINT PK PRIMARY KEY (MYKEY)) "; + conn.createStatement().execute(ddl); + + final String dml = "UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))"; + conn.createStatement().execute(dml); + conn.commit(); + + //sql query + final String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T "; + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - final Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("foo", tuple.get(0)); - assertEquals(10, tuple.get(1)); - } - } finally { - dropTable("DATE_T"); + final Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("foo", tuple.get(0)); + assertEquals(10, tuple.get(1)); } } @Test public void testTimeForSQLQuery() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME CONSTRAINT PK PRIMARY KEY (MYKEY)) "; - conn.createStatement().execute(ddl); - - final String dml = "UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))"; - conn.createStatement().execute(dml); - conn.commit(); - - //sql query - final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T "; - pigServer.registerQuery(String.format( + //create the table + String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME CONSTRAINT PK PRIMARY KEY (MYKEY)) "; + conn.createStatement().execute(ddl); + + final String dml = "UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))"; + conn.createStatement().execute(dml); + conn.commit(); + + //sql query + final String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T "; + pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, zkQuorum)); - final Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("foo", tuple.get(0)); - assertEquals(30, tuple.get(1)); - } - } finally { - dropTable("TIME_T"); + final Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("foo", tuple.get(0)); + assertEquals(30, tuple.get(1)); } } - + /** * Tests both {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage} * @throws Exception */ @Test public void testLoadAndStore() throws Exception { - - //create the tables + + //create the tables final String TABLE = "TABLE7"; final String sourceTableddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) "; - + final String targetTable = "AGGREGATE"; final String targetTableddl = "CREATE TABLE " + targetTable - + "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) "; - + + "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) "; + conn.createStatement().execute(sourceTableddl); conn.createStatement().execute(targetTableddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -515,25 +477,25 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.setInt(3, 30); stmt.setInt(4, 10 * 3 * k++); } - + stmt.execute(); } conn.commit(); - - - //load data and filter rows whose age is > 25 + + + //load data and filter rows whose age is > 25 pigServer.setBatchOn(); pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + pigServer.registerQuery("B = GROUP A BY AGE;"); pigServer.registerQuery("C = FOREACH B GENERATE group as AGE,MIN(A.SAL),MAX(A.SAL);"); pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable + "' using " + PhoenixHBaseStorage.class.getName() + "('" - + zkQuorum + "', '-batchSize 1000');"); + + zkQuorum + "', '-batchSize 1000');"); pigServer.executeBatch(); - + //validate the data with what is stored. final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE"; final ResultSet rs = conn.createStatement().executeQuery(selectQuery); @@ -546,25 +508,25 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { assertEquals(0, rs.getInt("MIN_SAL")); assertEquals(270, rs.getInt("MAX_SAL")); } - - /** + + /** * Test for Sequence * @throws Exception */ @Test public void testDataForSQLQueryWithSequences() throws Exception { - - //create the table + + //create the table final String TABLE = "TABLE8"; String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) "; - + conn.createStatement().execute(ddl); - + String sequenceDdl = "CREATE SEQUENCE my_sequence"; - + conn.createStatement().execute(sequenceDdl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -576,14 +538,14 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.execute(); } conn.commit(); - + //sql query load data and filter rows whose age is > 25 final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE + " WHERE AGE > 25"; pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - - + + Iterator<Tuple> iterator = pigServer.openIterator("A"); int recordsRead = 0; while (iterator.hasNext()) { @@ -592,17 +554,17 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { } assertEquals(rows/2, recordsRead); } - - @Test + + @Test public void testDataForSQLQueryWithFunctions() throws Exception { - - //create the table - final String TABLE = "TABLE9"; - String ddl = "CREATE TABLE " + TABLE + + //create the table + final String TABLE = "TABLE9"; + String ddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) "; - + conn.createStatement().execute(ddl); - + final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); int rows = 20; @@ -612,15 +574,15 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.execute(); } conn.commit(); - + //sql query final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE + " ORDER BY ID" ; pigServer.registerQuery(String.format( "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); - - + + Iterator<Tuple> iterator = pigServer.openIterator("A"); int i = 0; while (iterator.hasNext()) { @@ -629,53 +591,49 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { assertEquals("A" + i, name); i++; } - + } - @Test - public void testDataFromIndexTable() throws Exception { - try { - //create the table - String ddl = "CREATE TABLE " + TABLE_NAME - + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true"; - - conn.createStatement().execute(ddl); - - //create a index table - String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + " (EMPLID) INCLUDE (NAME) "; - conn.createStatement().execute(indexDdl); - - //upsert the data. - final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)"; - PreparedStatement stmt = conn.prepareStatement(dml); - int rows = 20; - for(int i = 0 ; i < rows; i++) { - stmt.setInt(1, i); - stmt.setString(2, "a"+i); - stmt.setInt(3, i * 5); - stmt.execute(); - } - conn.commit(); - pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('"+zkQuorum + "') ;"); - Iterator<Tuple> iterator = pigServer.openIterator("A"); - while (iterator.hasNext()) { - Tuple tuple = iterator.next(); - assertEquals("a5", tuple.get(0)); - assertEquals(25, tuple.get(1)); - } - } finally { - dropTable(TABLE_NAME); - dropTable(INDEX_NAME); + + @Test + public void testDataFromIndexTable() throws Exception { + //create the table + String ddl = "CREATE TABLE " + TABLE_NAME + + " (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true"; + + conn.createStatement().execute(ddl); + + //create a index table + String indexDdl = " CREATE INDEX " + INDEX_NAME + " ON " + TABLE_NAME + " (EMPLID) INCLUDE (NAME) "; + conn.createStatement().execute(indexDdl); + + //upsert the data. + final String dml = "UPSERT INTO " + TABLE_NAME + " VALUES(?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + int rows = 20; + for(int i = 0 ; i < rows; i++) { + stmt.setInt(1, i); + stmt.setString(2, "a"+i); + stmt.setInt(3, i * 5); + stmt.execute(); + } + conn.commit(); + pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('"+zkQuorum + "') ;"); + Iterator<Tuple> iterator = pigServer.openIterator("A"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + assertEquals("a5", tuple.get(0)); + assertEquals(25, tuple.get(1)); } } - - @Test - public void testLoadOfSaltTable() throws Exception { - final String TABLE = "TABLE11"; + + @Test + public void testLoadOfSaltTable() throws Exception { + final String TABLE = "TABLE11"; final String sourceTableddl = "CREATE TABLE " + TABLE + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) SALT_BUCKETS=2 "; - + conn.createStatement().execute(sourceTableddl); - + //prepare data with 10 rows having age 25 and the other 30. final String dml = "UPSERT INTO " + TABLE + " VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(dml); @@ -691,40 +649,27 @@ public class PhoenixHBaseLoaderIT extends BaseHBaseManagedTimeIT { stmt.setInt(3, 30); stmt.setInt(4, 10 * 3 * k++); } - + stmt.execute(); } conn.commit(); - + final Data data = Storage.resetData(pigServer); List<Tuple> expectedList = new ArrayList<Tuple>(); expectedList.add(Storage.tuple(25,10)); expectedList.add(Storage.tuple(30,10)); - + pigServer.setBatchOn(); pigServer.registerQuery(String.format( "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, zkQuorum)); - + pigServer.registerQuery("B = GROUP A BY AGE;"); pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);"); pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();"); pigServer.executeBatch(); - + List<Tuple> actualList = data.get("out"); assertEquals(expectedList.size(), actualList.size()); - } - - @After - public void tearDown() throws Exception { - if(conn != null) { - conn.close(); - } - pigServer.shutdown(); - } - - private void dropTable(String tableFullName) throws SQLException { - Preconditions.checkNotNull(conn); - conn.createStatement().execute(String.format("DROP TABLE IF EXISTS %s",tableFullName)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java index 9106cdd..9e9434c 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java @@ -19,69 +19,28 @@ */ package org.apache.phoenix.pig; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Collection; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.util.SchemaUtil; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.joda.time.DateTime; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Lists; -public class PhoenixHBaseStorerIT extends BaseHBaseManagedTimeIT { - - private static TupleFactory tupleFactory; - private static Connection conn; - private static PigServer pigServer; - private static String zkQuorum; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conn = DriverManager.getConnection(getUrl()); - zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig()); - // Pig variables - tupleFactory = TupleFactory.getInstance(); - } - - @Before - public void setUp() throws Exception { - pigServer = new PigServer(ExecType.LOCAL, getTestClusterConfig()); - } - - @After - public void tearDown() throws Exception { - pigServer.shutdown(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - conn.close(); - } - +public class PhoenixHBaseStorerIT extends BasePigIT { /** * Basic test - writes data to a Phoenix table and compares the data written * to expected http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9d44986/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java index cea1a8a..30ce132 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.pig.udf; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -29,17 +27,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.pig.BasePigIT; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.UDFContext; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -47,37 +40,33 @@ import org.junit.rules.ExpectedException; /** * Test class to run all the Pig Sequence UDF integration tests against a virtual map reduce cluster. */ -public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { +public class ReserveNSequenceTestIT extends BasePigIT { private static final String CREATE_SEQUENCE_SYNTAX = "CREATE SEQUENCE %s START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CACHE %s"; private static final String SEQUENCE_NAME = "my_schema.my_sequence"; private static final long MAX_VALUE = 10; - private static TupleFactory TF; - private static Connection globalConn; - private static String zkQuorum; - private static Configuration conf; private static UDFContext udfContext; @Rule public ExpectedException thrown = ExpectedException.none(); - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf = getTestClusterConfig(); - zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig()); - conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); - globalConn = DriverManager.getConnection(getUrl()); - // Pig variables - TF = TupleFactory.getInstance(); - } - + @Override @Before - public void setUp() throws SQLException { - createSequence(globalConn); + public void setUp() throws Exception { + super.setUp(); + createSequence(conn); createUdfContext(); } + @Override + @After + public void tearDown() throws Exception { + udfContext.reset(); + dropSequence(conn); + super.tearDown(); + } + @Test public void testReserve() throws Exception { doTest(new UDFTestProperties(1)); @@ -161,7 +150,7 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { doTest(tenantConn, props); // validate global sequence value is still set to 1 - assertEquals(1L, getNextSequenceValue(globalConn)); + assertEquals(1L, getNextSequenceValue(conn)); } finally { dropSequence(tenantConn); } @@ -174,27 +163,27 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { */ @Test public void testMultipleTuples() throws Exception { - Tuple tuple = TF.newTuple(2); + Tuple tuple = tupleFactory.newTuple(2); tuple.set(0, 2L); tuple.set(1, SEQUENCE_NAME); - final String tentantId = globalConn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB); + final String tentantId = conn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB); ReserveNSequence udf = new ReserveNSequence(zkQuorum, tentantId); for (int i = 0; i < 2; i++) { udf.exec(tuple); } - long nextValue = getNextSequenceValue(globalConn); + long nextValue = getNextSequenceValue(conn); assertEquals(5L, nextValue); } private void doTest(UDFTestProperties props) throws Exception { - doTest(globalConn, props); + doTest(conn, props); } private void doTest(Connection conn, UDFTestProperties props) throws Exception { setCurrentValue(conn, props.getCurrentValue()); - Tuple tuple = TF.newTuple(3); + Tuple tuple = tupleFactory.newTuple(3); tuple.set(0, props.getNumToReserve()); tuple.set(1, props.getSequenceName()); tuple.set(2, zkQuorum); @@ -251,17 +240,6 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { conn.commit(); } - @After - public void tearDown() throws Exception { - udfContext.reset(); - dropSequence(globalConn); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - globalConn.close(); - } - /** * Static class to define properties for the test */
