Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 22d825766 -> d04b4d65a
Changes to make builds stable again Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d04b4d65 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d04b4d65 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d04b4d65 Branch: refs/heads/4.x-HBase-0.98 Commit: d04b4d65a1504022f09fe7b9cf42ab9b33ca6dcf Parents: 22d8257 Author: Samarth <[email protected]> Authored: Thu Jan 7 22:23:56 2016 -0800 Committer: Samarth <[email protected]> Committed: Thu Jan 7 22:25:30 2016 -0800 ---------------------------------------------------------------------- .../end2end/mapreduce/CsvBulkLoadToolIT.java | 372 +++++++++++++++++++ .../phoenix/end2end/mapreduce/IndexToolIT.java | 339 +++++++++++++++++ .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 371 ------------------ .../apache/phoenix/mapreduce/IndexToolIT.java | 339 ----------------- pom.xml | 11 +- 5 files changed, 718 insertions(+), 714 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d04b4d65/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java new file mode 100644 index 0000000..1bc36d0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.mapreduce; + +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.CsvBulkLoadTool; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class CsvBulkLoadToolIT { + + // We use HBaseTestUtil because we need to start up a MapReduce cluster as well + private static HBaseTestingUtility hbaseTestUtil; + private static String zkQuorum; + private static Connection conn; + + @BeforeClass + public static void setUp() throws Exception { + hbaseTestUtil = new HBaseTestingUtility(); + Configuration conf = hbaseTestUtil.getConfiguration(); + setUpConfigForMiniCluster(conf); + // Since we're using the real PhoenixDriver in this test, remove the + // extra JDBC argument that causes the test driver to be used. + conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + hbaseTestUtil.startMiniCluster(); + hbaseTestUtil.startMiniMapReduceCluster(); + + Class.forName(PhoenixDriver.class.getName()); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + if (conn != null) conn.close(); + } finally { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + try { + hbaseTestUtil.shutdownMiniMapReduceCluster(); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + } + } + + @Test + public void testBasicImport() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.println("2,Name 2,1970/01/02"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv", + "--table", "table1", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table1 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testFullOptionImport() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " + + "NAME VARCHAR, NAMES VARCHAR ARRAY)"); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1|Name 1a;Name 1b"); + printWriter.println("2|Name 2a;Name 2b"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input2.csv", + "--table", "table2", + "--zookeeper", zkQuorum, + "--delimiter", "|", + "--array-delimiter", ";", + "--import-columns", "ID,NAMES"}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray()); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray()); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testMultipleInputFiles() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.close(); + outputStream = fs.create(new Path("/tmp/input2.csv")); + printWriter = new PrintWriter(outputStream); + printWriter.println("2,Name 2,1970/01/02"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv,/tmp/input2.csv", + "--table", "table7", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportWithIndex() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 " + + " (FIRST_NAME ASC)" + + " INCLUDE (LAST_NAME)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table3", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("FirstName 2", rs.getString(2)); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportWithLocalIndex() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 " + + " (FIRST_NAME ASC)"; + stmt.execute(ddl); + ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + try { + csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table6", + "--zookeeper", zkQuorum}); + fail("Csv bulk load currently has issues with local indexes."); + } catch( UnsupportedOperationException ise) { + assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage()); + } + + } + + @Test + public void testImportOneIndexTable() throws Exception { + testImportOneIndexTable("TABLE4", false); + } + + //@Test + public void testImportOneLocalIndexTable() throws Exception { + testImportOneIndexTable("TABLE5", true); + } + + public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception { + + String indexTableName = String.format("%s_IDX", tableName); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = + "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON " + + tableName + "(FIRST_NAME ASC)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input4.csv", + "--table", tableName, + "--index-table", indexTableName, + "--zookeeper", zkQuorum }); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); + assertFalse(rs.next()); + rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'"); + assertTrue(rs.next()); + assertEquals("FirstName 1", rs.getString(1)); + + rs.close(); + stmt.close(); + } + + @Test + public void testInvalidArguments() { + String tableName = "TABLE8"; + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + try { + csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input4.csv", + "--table", tableName, + "--zookeeper", zkQuorum }); + fail(String.format("Table %s not created, hence should fail",tableName)); + } catch (Exception ex) { + assertTrue(ex instanceof IllegalArgumentException); + assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName))); + } + } + + @Test + public void testAlreadyExistsOutputPath() { + String tableName = "TABLE9"; + String outputPath = "/tmp/output/tabl9"; + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + fs.create(new Path(outputPath)); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input9.csv", + "--output", outputPath, + "--table", tableName, + "--zookeeper", zkQuorum }); + + fail(String.format("Output path %s already exists. hence, should fail",outputPath)); + } catch (Exception ex) { + assertTrue(ex instanceof FileAlreadyExistsException); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d04b4d65/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java new file mode 100644 index 0000000..e5696f0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.mapreduce; + +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Tests for the {@link IndexTool} + */ +@Category(NeedsOwnMiniClusterTest.class) +public class IndexToolIT { + + private static HBaseTestingUtility hbaseTestUtil; + private static String zkQuorum; + + @BeforeClass + public static void setUp() throws Exception { + hbaseTestUtil = new HBaseTestingUtility(); + Configuration conf = hbaseTestUtil.getConfiguration(); + conf.setBoolean("hbase.defaults.for.version.skip", true); + // Since we're using the real PhoenixDriver in this test, remove the + // extra JDBC argument that causes the test driver to be used. + conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + setUpConfigForMiniCluster(conf); + hbaseTestUtil.startMiniCluster(); + hbaseTestUtil.startMiniMapReduceCluster(); + Class.forName(PhoenixDriver.class.getName()); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + } + + @Test + public void testImmutableGlobalIndex() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false); + } + + @Test + public void testImmutableLocalIndex() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true); + } + + @Test + public void testMutableGlobalIndex() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false); + } + + @Test + public void testMutableLocalIndex() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true); + } + + @Test + public void testImmutableGlobalIndexDirectApi() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true); + } + + @Test + public void testImmutableLocalIndexDirectApi() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true); + } + + @Test + public void testMutableGlobalIndexDirectApi() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true); + } + + @Test + public void testMutableLocalIndexDirectApi() throws Exception { + testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true); + } + + public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception { + testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false); + } + + public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception { + + final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable); + final String indxTable = String.format("%s_%s",dataTable,"INDX"); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props); + Statement stmt = conn.createStatement(); + try { + + stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :""))); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + int id = 1; + // insert two rows + upsertRow(stmt1, id++); + upsertRow(stmt1, id++); + conn.commit(); + + stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName)); + + //verify rows are fetched from data table. + String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName); + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + //assert we are pulling from data table. + assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s", fullTableName), actualExplainPlan); + + rs = stmt1.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("xxUNAME1_xyz", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("xxUNAME2_xyz", rs.getString(1)); + + //run the index MR job. + final IndexTool indexingTool = new IndexTool(); + indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + + final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi); + int status = indexingTool.run(cmdArgs); + assertEquals(0, status); + + // insert two more rows + upsertRow(stmt1, 3); + upsertRow(stmt1, 4); + conn.commit(); + + rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable)); + + //assert we are pulling from index table. + rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal); + + rs = stmt.executeQuery(selectSql); +// assertTrue(rs.next()); +// assertEquals("xxUNAME1_xyz", rs.getString(1)); +// assertEquals(1, rs.getInt(2)); +// +// assertTrue(rs.next()); +// assertEquals("xxUNAME2_xyz", rs.getString(1)); +// assertEquals(2, rs.getInt(2)); +// +// assertTrue(rs.next()); +// assertEquals("xxUNAME3_xyz", rs.getString(1)); +// assertEquals(3, rs.getInt(2)); +// +// assertTrue(rs.next()); +// assertEquals("xxUNAME4_xyz", rs.getString(1)); +// assertEquals(4, rs.getInt(2)); +// +// assertFalse(rs.next()); + + conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , fullTableName)); + } finally { + conn.close(); + } + } + + + /** + * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before + * the MR job runs, do show up in the index table . + * @throws Exception + */ + @Test + public void testMutalbleIndexWithUpdates() throws Exception { + + final String dataTable = "DATA_TABLE5"; + final String indxTable = String.format("%s_%s",dataTable,"INDX"); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props); + Statement stmt = conn.createStatement(); + try { + + stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable)); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + int id = 1; + // insert two rows + upsertRow(stmt1, id++); + upsertRow(stmt1, id++); + conn.commit(); + + stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable)); + + //update a row + stmt1.setInt(1, 1); + stmt1.setString(2, "uname" + String.valueOf(10)); + stmt1.setInt(3, 95050 + 1); + stmt1.executeUpdate(); + conn.commit(); + + //verify rows are fetched from data table. + String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable); + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + //assert we are pulling from data table. + assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan); + + rs = stmt1.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("UNAME10", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("UNAME2", rs.getString(1)); + + //run the index MR job. + final IndexTool indexingTool = new IndexTool(); + indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + + final String[] cmdArgs = getArgValues(null, dataTable,indxTable); + int status = indexingTool.run(cmdArgs); + assertEquals(0, status); + + //assert we are pulling from index table. + rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false); + + rs = stmt.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("UNAME10", rs.getString(1)); + assertEquals(1, rs.getInt(2)); + + assertTrue(rs.next()); + assertEquals("UNAME2", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable)); + } finally { + conn.close(); + } + } + + private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable, + String indxTable, boolean isLocal) { + + String expectedExplainPlan = ""; + if(isLocal) { + final String localIndexName = MetaDataUtil.getLocalIndexTableName(SchemaUtil.getTableName(schemaName, dataTable)); + expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]" + + "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName); + } else { + expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s" + + "\n SERVER FILTER BY FIRST KEY ONLY",SchemaUtil.getTableName(schemaName, indxTable)); + } + assertEquals(expectedExplainPlan,actualExplainPlan); + } + + private String[] getArgValues(String schemaName, String dataTable, String indxTable) { + return getArgValues(schemaName, dataTable, indxTable, false); + } + + private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) { + final List<String> args = Lists.newArrayList(); + if (schemaName!=null) { + args.add("-s"); + args.add(schemaName); + } + args.add("-dt"); + args.add(dataTable); + args.add("-it"); + args.add(indxTable); + if(directApi) { + args.add("-direct"); + // Need to run this job in foreground for the test to be deterministic + args.add("-runfg"); + } + + args.add("-op"); + args.add("/tmp/"+UUID.randomUUID().toString()); + return args.toArray(new String[0]); + } + + private void upsertRow(PreparedStatement stmt, int i) throws SQLException { + // insert row + stmt.setInt(1, i); + stmt.setString(2, "uname" + String.valueOf(i)); + stmt.setInt(3, 95050 + i); + stmt.executeUpdate(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + try { + hbaseTestUtil.shutdownMiniMapReduceCluster(); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d04b4d65/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java deleted file mode 100644 index 2970d56..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce; - -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.PrintWriter; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.jdbc.PhoenixDriver; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.util.DateUtil; -import org.apache.phoenix.util.PhoenixRuntime; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(NeedsOwnMiniClusterTest.class) -public class CsvBulkLoadToolIT { - - // We use HBaseTestUtil because we need to start up a MapReduce cluster as well - private static HBaseTestingUtility hbaseTestUtil; - private static String zkQuorum; - private static Connection conn; - - @BeforeClass - public static void setUp() throws Exception { - hbaseTestUtil = new HBaseTestingUtility(); - Configuration conf = hbaseTestUtil.getConfiguration(); - setUpConfigForMiniCluster(conf); - // Since we're using the real PhoenixDriver in this test, remove the - // extra JDBC argument that causes the test driver to be used. - conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); - hbaseTestUtil.startMiniCluster(); - hbaseTestUtil.startMiniMapReduceCluster(); - - Class.forName(PhoenixDriver.class.getName()); - DriverManager.registerDriver(PhoenixDriver.INSTANCE); - zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); - conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL - + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - try { - if (conn != null) conn.close(); - } finally { - try { - DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); - } finally { - try { - hbaseTestUtil.shutdownMiniMapReduceCluster(); - } finally { - hbaseTestUtil.shutdownMiniCluster(); - } - } - } - } - - @Test - public void testBasicImport() throws Exception { - - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1,Name 1,1970/01/01"); - printWriter.println("2,Name 2,1970/01/02"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); - csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); - int exitCode = csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input1.csv", - "--table", "table1", - "--zookeeper", zkQuorum}); - assertEquals(0, exitCode); - - ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table1 ORDER BY id"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Name 1", rs.getString(2)); - assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("Name 2", rs.getString(2)); - assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); - assertFalse(rs.next()); - - rs.close(); - stmt.close(); - } - - @Test - public void testFullOptionImport() throws Exception { - - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " + - "NAME VARCHAR, NAMES VARCHAR ARRAY)"); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1|Name 1a;Name 1b"); - printWriter.println("2|Name 2a;Name 2b"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); - int exitCode = csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input2.csv", - "--table", "table2", - "--zookeeper", zkQuorum, - "--delimiter", "|", - "--array-delimiter", ";", - "--import-columns", "ID,NAMES"}); - assertEquals(0, exitCode); - - ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray()); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray()); - assertFalse(rs.next()); - - rs.close(); - stmt.close(); - } - - @Test - public void testMultipleInputFiles() throws Exception { - - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1,Name 1,1970/01/01"); - printWriter.close(); - outputStream = fs.create(new Path("/tmp/input2.csv")); - printWriter = new PrintWriter(outputStream); - printWriter.println("2,Name 2,1970/01/02"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); - csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); - int exitCode = csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input1.csv,/tmp/input2.csv", - "--table", "table7", - "--zookeeper", zkQuorum}); - assertEquals(0, exitCode); - - ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Name 1", rs.getString(2)); - assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("Name 2", rs.getString(2)); - assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); - assertFalse(rs.next()); - - rs.close(); - stmt.close(); - } - - @Test - public void testImportWithIndex() throws Exception { - - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " + - "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); - String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 " - + " (FIRST_NAME ASC)" - + " INCLUDE (LAST_NAME)"; - stmt.execute(ddl); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1,FirstName 1,LastName 1"); - printWriter.println("2,FirstName 2,LastName 2"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); - int exitCode = csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input3.csv", - "--table", "table3", - "--zookeeper", zkQuorum}); - assertEquals(0, exitCode); - - ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'"); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("FirstName 2", rs.getString(2)); - - rs.close(); - stmt.close(); - } - - @Test - public void testImportWithLocalIndex() throws Exception { - - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " + - "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); - String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 " - + " (FIRST_NAME ASC)"; - stmt.execute(ddl); - ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)"; - stmt.execute(ddl); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1,FirstName 1,LastName 1"); - printWriter.println("2,FirstName 2,LastName 2"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); - try { - csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input3.csv", - "--table", "table6", - "--zookeeper", zkQuorum}); - fail("Csv bulk load currently has issues with local indexes."); - } catch( UnsupportedOperationException ise) { - assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage()); - } - - } - - @Test - public void testImportOneIndexTable() throws Exception { - testImportOneIndexTable("TABLE4", false); - } - - //@Test - public void testImportOneLocalIndexTable() throws Exception { - testImportOneIndexTable("TABLE5", true); - } - - public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception { - - String indexTableName = String.format("%s_IDX", tableName); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " - + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); - String ddl = - "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON " - + tableName + "(FIRST_NAME ASC)"; - stmt.execute(ddl); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1,FirstName 1,LastName 1"); - printWriter.println("2,FirstName 2,LastName 2"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); - int exitCode = csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input4.csv", - "--table", tableName, - "--index-table", indexTableName, - "--zookeeper", zkQuorum }); - assertEquals(0, exitCode); - - ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); - assertFalse(rs.next()); - rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'"); - assertTrue(rs.next()); - assertEquals("FirstName 1", rs.getString(1)); - - rs.close(); - stmt.close(); - } - - @Test - public void testInvalidArguments() { - String tableName = "TABLE8"; - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); - try { - csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input4.csv", - "--table", tableName, - "--zookeeper", zkQuorum }); - fail(String.format("Table %s not created, hence should fail",tableName)); - } catch (Exception ex) { - assertTrue(ex instanceof IllegalArgumentException); - assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName))); - } - } - - @Test - public void testAlreadyExistsOutputPath() { - String tableName = "TABLE9"; - String outputPath = "/tmp/output/tabl9"; - try { - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " - + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); - - FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); - fs.create(new Path(outputPath)); - FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv")); - PrintWriter printWriter = new PrintWriter(outputStream); - printWriter.println("1,FirstName 1,LastName 1"); - printWriter.println("2,FirstName 2,LastName 2"); - printWriter.close(); - - CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); - csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); - csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input9.csv", - "--output", outputPath, - "--table", tableName, - "--zookeeper", zkQuorum }); - - fail(String.format("Output path %s already exists. hence, should fail",outputPath)); - } catch (Exception ex) { - assertTrue(ex instanceof FileAlreadyExistsException); - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d04b4d65/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java deleted file mode 100644 index c88a5f4..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce; - -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.jdbc.PhoenixDriver; -import org.apache.phoenix.mapreduce.index.IndexTool; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Lists; - -/** - * Tests for the {@link IndexTool} - */ -@Category(NeedsOwnMiniClusterTest.class) -public class IndexToolIT { - - private static HBaseTestingUtility hbaseTestUtil; - private static String zkQuorum; - - @BeforeClass - public static void setUp() throws Exception { - hbaseTestUtil = new HBaseTestingUtility(); - Configuration conf = hbaseTestUtil.getConfiguration(); - conf.setBoolean("hbase.defaults.for.version.skip", true); - // Since we're using the real PhoenixDriver in this test, remove the - // extra JDBC argument that causes the test driver to be used. - conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); - setUpConfigForMiniCluster(conf); - hbaseTestUtil.startMiniCluster(); - hbaseTestUtil.startMiniMapReduceCluster(); - Class.forName(PhoenixDriver.class.getName()); - DriverManager.registerDriver(PhoenixDriver.INSTANCE); - zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); - } - - @Test - public void testImmutableGlobalIndex() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false); - } - - @Test - public void testImmutableLocalIndex() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true); - } - - @Test - public void testMutableGlobalIndex() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false); - } - - @Test - public void testMutableLocalIndex() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true); - } - - @Test - public void testImmutableGlobalIndexDirectApi() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true); - } - - @Test - public void testImmutableLocalIndexDirectApi() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true); - } - - @Test - public void testMutableGlobalIndexDirectApi() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true); - } - - @Test - public void testMutableLocalIndexDirectApi() throws Exception { - testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true); - } - - public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception { - testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false); - } - - public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception { - - final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable); - final String indxTable = String.format("%s_%s",dataTable,"INDX"); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props); - Statement stmt = conn.createStatement(); - try { - - stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :""))); - String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName); - PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); - - int id = 1; - // insert two rows - upsertRow(stmt1, id++); - upsertRow(stmt1, id++); - conn.commit(); - - stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName)); - - //verify rows are fetched from data table. - String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName); - ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); - String actualExplainPlan = QueryUtil.getExplainPlan(rs); - - //assert we are pulling from data table. - assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s", fullTableName), actualExplainPlan); - - rs = stmt1.executeQuery(selectSql); - assertTrue(rs.next()); - assertEquals("xxUNAME1_xyz", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("xxUNAME2_xyz", rs.getString(1)); - - //run the index MR job. - final IndexTool indexingTool = new IndexTool(); - indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); - - final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi); - int status = indexingTool.run(cmdArgs); - assertEquals(0, status); - - // insert two more rows - upsertRow(stmt1, 3); - upsertRow(stmt1, 4); - conn.commit(); - - rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable)); - - //assert we are pulling from index table. - rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); - actualExplainPlan = QueryUtil.getExplainPlan(rs); - assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal); - - rs = stmt.executeQuery(selectSql); -// assertTrue(rs.next()); -// assertEquals("xxUNAME1_xyz", rs.getString(1)); -// assertEquals(1, rs.getInt(2)); -// -// assertTrue(rs.next()); -// assertEquals("xxUNAME2_xyz", rs.getString(1)); -// assertEquals(2, rs.getInt(2)); -// -// assertTrue(rs.next()); -// assertEquals("xxUNAME3_xyz", rs.getString(1)); -// assertEquals(3, rs.getInt(2)); -// -// assertTrue(rs.next()); -// assertEquals("xxUNAME4_xyz", rs.getString(1)); -// assertEquals(4, rs.getInt(2)); -// -// assertFalse(rs.next()); - - conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , fullTableName)); - } finally { - conn.close(); - } - } - - - /** - * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before - * the MR job runs, do show up in the index table . - * @throws Exception - */ - @Test - public void testMutalbleIndexWithUpdates() throws Exception { - - final String dataTable = "DATA_TABLE5"; - final String indxTable = String.format("%s_%s",dataTable,"INDX"); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props); - Statement stmt = conn.createStatement(); - try { - - stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable)); - String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable); - PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); - - int id = 1; - // insert two rows - upsertRow(stmt1, id++); - upsertRow(stmt1, id++); - conn.commit(); - - stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable)); - - //update a row - stmt1.setInt(1, 1); - stmt1.setString(2, "uname" + String.valueOf(10)); - stmt1.setInt(3, 95050 + 1); - stmt1.executeUpdate(); - conn.commit(); - - //verify rows are fetched from data table. - String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable); - ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); - String actualExplainPlan = QueryUtil.getExplainPlan(rs); - - //assert we are pulling from data table. - assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan); - - rs = stmt1.executeQuery(selectSql); - assertTrue(rs.next()); - assertEquals("UNAME10", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("UNAME2", rs.getString(1)); - - //run the index MR job. - final IndexTool indexingTool = new IndexTool(); - indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); - - final String[] cmdArgs = getArgValues(null, dataTable,indxTable); - int status = indexingTool.run(cmdArgs); - assertEquals(0, status); - - //assert we are pulling from index table. - rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); - actualExplainPlan = QueryUtil.getExplainPlan(rs); - assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false); - - rs = stmt.executeQuery(selectSql); - assertTrue(rs.next()); - assertEquals("UNAME10", rs.getString(1)); - assertEquals(1, rs.getInt(2)); - - assertTrue(rs.next()); - assertEquals("UNAME2", rs.getString(1)); - assertEquals(2, rs.getInt(2)); - conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable)); - } finally { - conn.close(); - } - } - - private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable, - String indxTable, boolean isLocal) { - - String expectedExplainPlan = ""; - if(isLocal) { - final String localIndexName = MetaDataUtil.getLocalIndexTableName(SchemaUtil.getTableName(schemaName, dataTable)); - expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]" - + "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName); - } else { - expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s" - + "\n SERVER FILTER BY FIRST KEY ONLY",SchemaUtil.getTableName(schemaName, indxTable)); - } - assertEquals(expectedExplainPlan,actualExplainPlan); - } - - private String[] getArgValues(String schemaName, String dataTable, String indxTable) { - return getArgValues(schemaName, dataTable, indxTable, false); - } - - private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) { - final List<String> args = Lists.newArrayList(); - if (schemaName!=null) { - args.add("-s"); - args.add(schemaName); - } - args.add("-dt"); - args.add(dataTable); - args.add("-it"); - args.add(indxTable); - if(directApi) { - args.add("-direct"); - // Need to run this job in foreground for the test to be deterministic - args.add("-runfg"); - } - - args.add("-op"); - args.add("/tmp/"+UUID.randomUUID().toString()); - return args.toArray(new String[0]); - } - - private void upsertRow(PreparedStatement stmt, int i) throws SQLException { - // insert row - stmt.setInt(1, i); - stmt.setString(2, "uname" + String.valueOf(i)); - stmt.setInt(3, 95050 + i); - stmt.executeUpdate(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - try { - DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); - } finally { - try { - hbaseTestUtil.shutdownMiniMapReduceCluster(); - } finally { - hbaseTestUtil.shutdownMiniCluster(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d04b4d65/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b47feb7..eaa14aa 100644 --- a/pom.xml +++ b/pom.xml @@ -121,8 +121,8 @@ <!-- Plugin versions --> <maven-eclipse-plugin.version>2.9</maven-eclipse-plugin.version> <maven-build-helper-plugin.version>1.9.1</maven-build-helper-plugin.version> - <maven-surefire-plugin.version>2.19</maven-surefire-plugin.version> - <maven-failsafe-plugin.version>2.19</maven-failsafe-plugin.version> + <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version> + <maven-failsafe-plugin.version>2.19.1</maven-failsafe-plugin.version> <maven-dependency-plugin.version>2.1</maven-dependency-plugin.version> <maven.assembly.version>2.5.2</maven.assembly.version> @@ -130,7 +130,7 @@ <!-- Plugin options --> <numForkedUT>3</numForkedUT> - <numForkedIT>7</numForkedIT> + <numForkedIT>5</numForkedIT> <!-- Set default encoding so multi-byte tests work correctly on the Mac --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -269,6 +269,7 @@ <configuration> <encoding>UTF-8</encoding> <forkCount>${numForkedIT}</forkCount> + <runOrder>alphabetical</runOrder> <reuseForks>true</reuseForks> <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"</argLine> <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile> @@ -285,6 +286,7 @@ <configuration> <encoding>UTF-8</encoding> <forkCount>${numForkedIT}</forkCount> + <runOrder>alphabetical</runOrder> <reuseForks>true</reuseForks> <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"</argLine> <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile> @@ -301,7 +303,8 @@ <configuration> <encoding>UTF-8</encoding> <forkCount>${numForkedIT}</forkCount> - <reuseForks>true</reuseForks> + <runOrder>alphabetical</runOrder> + <reuseForks>false</reuseForks> <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"</argLine> <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile> <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory>
