PHOENIX-1609-4.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9c889440 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9c889440 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9c889440 Branch: refs/heads/4.0 Commit: 9c8894407153dea46657f70fe09d8534c4f75d99 Parents: 04ceb5f Author: ravimagham <ravimag...@apache.org> Authored: Thu Mar 5 16:18:57 2015 -0800 Committer: ravimagham <ravimag...@apache.org> Committed: Wed Mar 11 20:55:32 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/mapreduce/IndexToolIT.java | 296 ++++++++++++++++++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 7 +- .../phoenix/compile/PostIndexDDLCompiler.java | 36 ++- .../apache/phoenix/jdbc/PhoenixStatement.java | 8 +- .../phoenix/mapreduce/CsvBulkImportUtil.java | 6 +- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 42 +-- .../phoenix/mapreduce/PhoenixInputFormat.java | 9 +- .../phoenix/mapreduce/PhoenixJobCounters.java | 29 ++ .../phoenix/mapreduce/index/IndexTool.java | 302 +++++++++++++++++++ .../mapreduce/index/PhoenixIndexDBWritable.java | 91 ++++++ .../index/PhoenixIndexImportMapper.java | 133 ++++++++ .../phoenix/mapreduce/util/ConnectionUtil.java | 14 + .../util/PhoenixConfigurationUtil.java | 21 ++ .../phoenix/parse/CreateIndexStatement.java | 8 +- .../apache/phoenix/parse/ParseNodeFactory.java | 4 +- .../apache/phoenix/schema/MetaDataClient.java | 5 + .../org/apache/phoenix/util/ColumnInfo.java | 13 +- .../java/org/apache/phoenix/util/QueryUtil.java | 46 ++- .../org/apache/phoenix/util/SchemaUtil.java | 3 + .../mapreduce/CsvBulkImportUtilTest.java | 14 +- .../mapreduce/CsvToKeyValueMapperTest.java | 26 +- .../org/apache/phoenix/util/ColumnInfoTest.java | 8 +- .../org/apache/phoenix/util/QueryUtilTest.java | 2 +- .../pig/util/PhoenixPigSchemaUtilTest.java | 5 +- 24 files changed, 1048 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java new file mode 100644 index 0000000..2b7b16b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Tests for the {@link IndexTool} + */ +@Category(NeedsOwnMiniClusterTest.class) +public class IndexToolIT { + + private static HBaseTestingUtility hbaseTestUtil; + private static String zkQuorum; + + @BeforeClass + public static void setUp() throws Exception { + hbaseTestUtil = new HBaseTestingUtility(); + Configuration conf = hbaseTestUtil.getConfiguration(); + setUpConfigForMiniCluster(conf); + hbaseTestUtil.startMiniCluster(); + hbaseTestUtil.startMiniMapReduceCluster(); + Class.forName(PhoenixDriver.class.getName()); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + } + + @Test + public void testImmutableGlobalIndex() throws Exception { + testSecondaryIndex("DATA_TABLE1",true, false); + } + + @Test + public void testImmutableLocalIndex() throws Exception { + testSecondaryIndex("DATA_TABLE2",true, true); + } + + @Test + public void testMutableGlobalIndex() throws Exception { + testSecondaryIndex("DATA_TABLE3",false, false); + } + + @Test + public void testMutableLocalIndex() throws Exception { + testSecondaryIndex("DATA_TABLE4",false, true); + } + + public void testSecondaryIndex(final String dataTable , final boolean isImmutable , final boolean isLocal) throws Exception { + + final String indxTable = String.format("%s_%s",dataTable,"INDX"); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props); + Statement stmt = conn.createStatement(); + try { + + stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s",dataTable, (isImmutable ? "IMMUTABLE_ROWS=true" :""))); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + int id = 1; + // insert two rows + upsertRow(stmt1, id++); + upsertRow(stmt1, id++); + conn.commit(); + + stmt.execute(String.format("CREATE %s INDEX %s ON %s (UPPER(NAME)) ASYNC ", (isLocal ? "LOCAL" : ""), indxTable,dataTable)); + + //verify rows are fetched from data table. + String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable); + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + //assert we are pulling from data table. + assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan); + + rs = stmt1.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("UNAME1", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("UNAME2", rs.getString(1)); + + //run the index MR job. + final IndexTool indexingTool = new IndexTool(); + indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + + final String[] cmdArgs = getArgValues(dataTable,indxTable); + int status = indexingTool.run(cmdArgs); + assertEquals(0, status); + + // insert two more rows + upsertRow(stmt1, 3); + upsertRow(stmt1, 4); + conn.commit(); + + //assert we are pulling from index table. + rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(actualExplainPlan,dataTable,indxTable,isLocal); + + rs = stmt.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("UNAME1", rs.getString(1)); + assertEquals(1, rs.getInt(2)); + + assertTrue(rs.next()); + assertEquals("UNAME2", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + + assertTrue(rs.next()); + assertEquals("UNAME3", rs.getString(1)); + assertEquals(3, rs.getInt(2)); + + assertTrue(rs.next()); + assertEquals("UNAME4", rs.getString(1)); + assertEquals(4, rs.getInt(2)); + + assertFalse(rs.next()); + + conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable)); + } finally { + conn.close(); + } + } + + + /** + * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before + * the MR job runs, do show up in the index table . + * @throws Exception + */ + @Test + public void testMutalbleIndexWithUpdates() throws Exception { + + final String dataTable = "DATA_TABLE5"; + final String indxTable = String.format("%s_%s",dataTable,"INDX"); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props); + Statement stmt = conn.createStatement(); + try { + + stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable)); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + int id = 1; + // insert two rows + upsertRow(stmt1, id++); + upsertRow(stmt1, id++); + conn.commit(); + + stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable)); + + //update a row + stmt1.setInt(1, 1); + stmt1.setString(2, "uname" + String.valueOf(10)); + stmt1.setInt(3, 95050 + 1); + stmt1.executeUpdate(); + conn.commit(); + + //verify rows are fetched from data table. + String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable); + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + //assert we are pulling from data table. + assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan); + + rs = stmt1.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("UNAME10", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("UNAME2", rs.getString(1)); + + //run the index MR job. + final IndexTool indexingTool = new IndexTool(); + indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration())); + + final String[] cmdArgs = getArgValues(dataTable,indxTable); + int status = indexingTool.run(cmdArgs); + assertEquals(0, status); + + //assert we are pulling from index table. + rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(actualExplainPlan,dataTable,indxTable,false); + + rs = stmt.executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("UNAME10", rs.getString(1)); + assertEquals(1, rs.getInt(2)); + + assertTrue(rs.next()); + assertEquals("UNAME2", rs.getString(1)); + assertEquals(2, rs.getInt(2)); + conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable)); + } finally { + conn.close(); + } + } + + private void assertExplainPlan(final String actualExplainPlan, String dataTable, + String indxTable, boolean isLocal) { + + String expectedExplainPlan = ""; + if(isLocal) { + final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable); + expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]" + + "\n SERVER FILTER BY FIRST KEY ONLY" + + "\nCLIENT MERGE SORT", localIndexName); + } else { + expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s" + + "\n SERVER FILTER BY FIRST KEY ONLY",indxTable); + } + assertEquals(expectedExplainPlan,actualExplainPlan); + } + + private String[] getArgValues(String dataTable, String indxTable) { + final List<String> args = Lists.newArrayList(); + args.add("-dt"); + args.add(dataTable); + args.add("-it"); + args.add(indxTable); + args.add("-op"); + args.add("/tmp/"+UUID.randomUUID().toString()); + return args.toArray(new String[0]); + } + + private void upsertRow(PreparedStatement stmt, int i) throws SQLException { + // insert row + stmt.setInt(1, i); + stmt.setString(2, "uname" + String.valueOf(i)); + stmt.setInt(3, 95050 + i); + stmt.executeUpdate(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + PhoenixDriver.INSTANCE.close(); + } finally { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + try { + hbaseTestUtil.shutdownMiniMapReduceCluster(); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index 9e843a0..9750ee7 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -84,6 +84,7 @@ tokens WITHIN='within'; SET='set'; CAST='cast'; + ACTIVE='active'; USABLE='usable'; UNUSABLE='unusable'; DISABLE='disable'; @@ -109,6 +110,7 @@ tokens STATISTICS='statistics'; COLUMNS='columns'; TRACE='trace'; + ASYNC='async'; } @@ -405,9 +407,10 @@ create_index_node returns [CreateIndexStatement ret] : CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name (LPAREN ik=ik_constraint RPAREN) (INCLUDE (LPAREN icrefs=column_names RPAREN))? + (async=ASYNC)? (p=fam_properties)? (SPLIT ON v=value_expression_list)? - {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, getBindCount()); } + {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount()); } ; // Parse a create sequence statement. @@ -498,7 +501,7 @@ drop_index_node returns [DropIndexStatement ret] // Parse a alter index statement alter_index_node returns [AlterIndexStatement ret] - : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE) + : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE) {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); } ; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java index c8cf28e..5836b99 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java @@ -28,6 +28,8 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.IndexUtil; +import com.google.common.collect.Lists; + /** * Class that compiles plan to generate initial data values after a DDL command for @@ -36,10 +38,15 @@ import org.apache.phoenix.util.IndexUtil; public class PostIndexDDLCompiler { private final PhoenixConnection connection; private final TableRef dataTableRef; - + private List<String> indexColumnNames; + private List<String> dataColumnNames; + private String selectQuery; + public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef) { this.connection = connection; this.dataTableRef = dataTableRef; + indexColumnNames = Lists.newArrayList(); + dataColumnNames = Lists.newArrayList(); } public MutationPlan compile(final PTable indexTable) throws SQLException { @@ -66,8 +73,11 @@ public class PostIndexDDLCompiler { for (int i = posOffset; i < nIndexPKColumns; i++) { PColumn col = indexPKColumns.get(i); String indexColName = col.getName().getString(); - dataColumns.append(col.getExpressionStr()).append(","); + String dataColName = col.getExpressionStr(); + dataColumns.append(dataColName).append(","); indexColumns.append('"').append(indexColName).append("\","); + indexColumnNames.add(indexColName); + dataColumnNames.add(dataColName); } // Add the covered columns @@ -82,6 +92,8 @@ public class PostIndexDDLCompiler { } dataColumns.append('"').append(dataColumnName).append("\","); indexColumns.append('"').append(indexColName).append("\","); + indexColumnNames.add(indexColName); + dataColumnNames.add(dataColumnName); } } } @@ -93,11 +105,27 @@ public class PostIndexDDLCompiler { StringBuilder updateStmtStr = new StringBuilder(); updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(") - .append(indexColumns).append(") SELECT ").append(dataColumns).append(" FROM ") - .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"'); + .append(indexColumns).append(") "); + final StringBuilder selectQueryBuilder = new StringBuilder(); + selectQueryBuilder.append(" SELECT ").append(dataColumns).append(" FROM ") + .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"'); + this.selectQuery = selectQueryBuilder.toString(); + updateStmtStr.append(this.selectQuery); final PhoenixStatement statement = new PhoenixStatement(connection); return statement.compileMutation(updateStmtStr.toString()); } + public List<String> getIndexColumnNames() { + return indexColumnNames; + } + + public List<String> getDataColumnNames() { + return dataColumnNames; + } + + public String getSelectQuery() { + return selectQuery; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 746c0b7..4e61391 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -535,8 +535,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement { public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, - ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) { - super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount); + ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) { + super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async , bindCount); } @SuppressWarnings("unchecked") @@ -879,8 +879,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @Override public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, - ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) { - return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount); + ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) { + return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java index e62cbb8..8f0f7d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java @@ -19,10 +19,12 @@ package org.apache.phoenix.mapreduce; import java.util.List; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.ColumnInfo; +import com.google.common.base.Preconditions; + /** * Collection of utility methods for setting up bulk import jobs. */ @@ -65,7 +67,7 @@ public class CsvBulkImportUtil { */ public static void configurePreUpsertProcessor(Configuration conf, Class<? extends ImportPreUpsertKeyValueProcessor> processorClass) { - conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, processorClass, + conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, processorClass, ImportPreUpsertKeyValueProcessor.class); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index 6ff7ba3..90cb854 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -28,15 +28,6 @@ import java.util.Properties; import javax.annotation.Nullable; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -49,9 +40,9 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; @@ -59,6 +50,15 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + /** * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles. * <p/> @@ -73,9 +73,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import"; - /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */ - public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor"; - /** Configuration key for the field delimiter for input csv records */ public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter"; @@ -136,7 +133,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0), conf.get(ESCAPE_CHAR_CONFKEY).charAt(0)); - preUpdateProcessor = loadPreUpsertProcessor(conf); + preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){ tableName = Bytes.toBytes(conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY)); } else { @@ -193,23 +190,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes } /** - * Load the configured ImportPreUpsertKeyValueProcessor, or supply a dummy processor. - */ - @VisibleForTesting - static ImportPreUpsertKeyValueProcessor loadPreUpsertProcessor(Configuration conf) { - Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null; - try { - processorClass = conf.getClass( - UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class, - ImportPreUpsertKeyValueProcessor.class); - } catch (Exception e) { - throw new IllegalStateException("Couldn't load upsert hook class", e); - } - - return ReflectionUtils.newInstance(processorClass, conf); - } - - /** * Build up the JDBC URL for connecting to Phoenix. * * @return the full JDBC URL for a Phoenix connection http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 7c67c2c..a83b9ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -22,6 +22,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.util.PhoenixRuntime; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -99,7 +101,12 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException { Preconditions.checkNotNull(context); try{ - final Connection connection = ConnectionUtil.getConnection(configuration); + final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + final Properties overridingProps = new Properties(); + if(currentScnValue != null) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); + } + final Connection connection = ConnectionUtil.getConnection(configuration,overridingProps); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); Preconditions.checkNotNull(selectStatement); final Statement statement = connection.createStatement(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java new file mode 100644 index 0000000..4a869d9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +/** + * Counters used during Map Reduce jobs + * + */ +public enum PhoenixJobCounters { + + INPUT_RECORDS, + FAILED_RECORDS, + OUTPUT_RECORDS; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java new file mode 100644 index 0000000..d93ef9c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.compile.PostIndexDDLCompiler; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * An MR job to populate the index table from the data table. + * + */ +public class IndexTool extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class); + + private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, "Data table name (mandatory)"); + private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true, "Index table name(mandatory)"); + private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true, "Output path where the files are written(mandatory)"); + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + private static final String ALTER_INDEX_QUERY_TEMPLATE = "ALTER INDEX IF EXISTS %s ON %s %s"; + private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s"; + + + private Options getOptions() { + final Options options = new Options(); + options.addOption(SCHEMA_NAME_OPTION); + options.addOption(DATA_TABLE_OPTION); + options.addOption(INDEX_TABLE_OPTION); + options.addOption(OUTPUT_PATH_OPTION); + options.addOption(HELP_OPTION); + return options; + } + + /** + * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are + * missing. + * + * @param args supplied command line arguments + * @return the parsed command line + */ + private CommandLine parseOptions(String[] args) { + + final Options options = getOptions(); + + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("Error parsing command line options: "+ e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + + if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) { + throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory " + + "parameter"); + } + + if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { + throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + " is a mandatory " + + "parameter"); + } + + if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) { + throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + + "parameter"); + } + return cmdLine; + } + + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, 1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + @Override + public int run(String[] args) throws Exception { + Connection connection = null; + try { + CommandLine cmdLine = null; + try { + cmdLine = parseOptions(args); + } catch (IllegalStateException e) { + printHelpAndExit(e.getMessage(), getOptions()); + } + final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); + final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); + final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable); + final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable); + + connection = ConnectionUtil.getConnection(configuration); + if(!isValidIndexTable(connection, dataTable, indexTable)) { + throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable)); + } + + final PTable pdataTable = PhoenixRuntime.getTable(connection, dataTable); + final PTable pindexTable = PhoenixRuntime.getTable(connection, indexTable); + + // this is set to ensure index tables remains consistent post population. + long indxTimestamp = pindexTable.getTimeStamp(); + configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,Long.toString(indxTimestamp + 1)); + + // check if the index type is LOCAL, if so, set the logicalIndexName that is computed from the dataTable name. + String logicalIndexTable = qIndexTable; + if(IndexType.LOCAL.equals(pindexTable.getIndexType())) { + logicalIndexTable = MetaDataUtil.getLocalIndexTableName(dataTable); + } + + final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); + final PostIndexDDLCompiler ddlCompiler = new PostIndexDDLCompiler(pConnection,new TableRef(pdataTable)); + ddlCompiler.compile(pindexTable); + + final List<String> indexColumns = ddlCompiler.getIndexColumnNames(); + final String selectQuery = ddlCompiler.getSelectQuery(); + final String upsertQuery = QueryUtil.constructUpsertStatement(indexTable, indexColumns, Hint.NO_INDEX); + + configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery); + PhoenixConfigurationUtil.setOutputTableName(configuration, logicalIndexTable); + PhoenixConfigurationUtil.setUpsertColumnNames(configuration,indexColumns.toArray(new String[indexColumns.size()])); + final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); + final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList); + configuration.set(PhoenixConfigurationUtil.UPSERT_COLUMN_INFO_KEY, encodedColumnInfos); + + final Path outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()),logicalIndexTable); + + final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE,dataTable,indexTable); + final Job job = Job.getInstance(configuration, jobName); + job.setJarByClass(IndexTool.class); + + job.setMapperClass(PhoenixIndexImportMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,dataTable,selectQuery); + + TableMapReduceUtil.initCredentials(job); + FileOutputFormat.setOutputPath(job, outputPath); + + final HTable htable = new HTable(configuration, logicalIndexTable); + HFileOutputFormat.configureIncrementalLoad(job, htable); + + boolean status = job.waitForCompletion(true); + if (!status) { + LOG.error("Failed to run the IndexTool job. "); + htable.close(); + return -1; + } + + LOG.info("Loading HFiles from {}", outputPath); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); + loader.doBulkLoad(outputPath, htable); + htable.close(); + + LOG.info("Removing output directory {}", outputPath); + if (!FileSystem.get(configuration).delete(outputPath, true)) { + LOG.error("Removing output directory {} failed", outputPath); + } + + // finally update the index state to ACTIVE. + updateIndexState(connection,dataTable,indexTable,PIndexState.ACTIVE); + return 0; + + } catch (Exception ex) { + LOG.error(" An exception occured while performing the indexing job , error message {} ",ex.getMessage()); + return -1; + } finally { + try { + if(connection != null) { + connection.close(); + } + } catch(SQLException sqle) { + LOG.error(" Failed to close connection ",sqle.getMessage()); + throw new RuntimeException("Failed to close connection"); + } + } + } + + /** + * Checks for the validity of the index table passed to the job. + * @param connection + * @param masterTable + * @param indexTable + * @return + * @throws SQLException + */ + private boolean isValidIndexTable(final Connection connection, final String masterTable, final String indexTable) throws SQLException { + final DatabaseMetaData dbMetaData = connection.getMetaData(); + final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable); + final String tableName = SchemaUtil.getTableNameFromFullName(masterTable); + + ResultSet rs = null; + try { + rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false); + while(rs.next()) { + final String indexName = rs.getString(6); + if(indexTable.equalsIgnoreCase(indexName)) { + return true; + } + } + } finally { + if(rs != null) { + rs.close(); + } + } + return false; + } + + /** + * Updates the index state. + * @param connection + * @param masterTable + * @param indexTable + * @param state + * @throws SQLException + */ + private void updateIndexState(Connection connection, final String masterTable , final String indexTable, PIndexState state) throws SQLException { + Preconditions.checkNotNull(connection); + final String alterQuery = String.format(ALTER_INDEX_QUERY_TEMPLATE,indexTable,masterTable,state.name()); + connection.createStatement().execute(alterQuery); + LOG.info(" Updated the status of the index {} to {} " , indexTable , state.name()); + } + + public static void main(final String[] args) throws Exception { + int result = ToolRunner.run(new IndexTool(), args); + System.exit(result); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java new file mode 100644 index 0000000..2be810a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.util.ColumnInfo; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * A {@link DBWritable} class that reads and write records. + * + * + */ +public class PhoenixIndexDBWritable implements DBWritable { + + private List<ColumnInfo> columnMetadata; + + private List<Object> values; + + private int columnCount = -1; + + @Override + public void write(PreparedStatement statement) throws SQLException { + Preconditions.checkNotNull(values); + Preconditions.checkNotNull(columnMetadata); + for(int i = 0 ; i < values.size() ; i++) { + Object value = values.get(i); + ColumnInfo columnInfo = columnMetadata.get(i); + if(value == null) { + statement.setNull(i + 1, columnInfo.getSqlType()); + } else { + statement.setObject(i + 1, value , columnInfo.getSqlType()); + } + } + + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + // we do this once per mapper. + if(columnCount == -1) { + this.columnCount = resultSet.getMetaData().getColumnCount(); + } + + values = Lists.newArrayListWithCapacity(columnCount); + for(int i = 0 ; i < columnCount ; i++) { + Object value = resultSet.getObject(i + 1); + values.add(value); + } + + } + + public List<ColumnInfo> getColumnMetadata() { + return columnMetadata; + } + + public void setColumnMetadata(List<ColumnInfo> columnMetadata) { + this.columnMetadata = columnMetadata; + } + + public List<Object> getValues() { + return values; + } + + public void setValues(List<Object> values) { + this.values = values; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java new file mode 100644 index 0000000..7bf4bfc --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mapper that hands over rows from data table to the index table. + * + */ +public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, KeyValue> { + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexImportMapper.class); + + private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable(); + + private List<ColumnInfo> indxTblColumnMetadata ; + + private Connection connection; + + private String indexTableName; + + private ImportPreUpsertKeyValueProcessor preUpdateProcessor; + + private PreparedStatement pStatement; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + final Configuration configuration = context.getConfiguration(); + try { + indxTblColumnMetadata = PhoenixConfigurationUtil.getUpsertColumnMetadataList(context.getConfiguration()); + indxWritable.setColumnMetadata(indxTblColumnMetadata); + + preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(configuration); + indexTableName = PhoenixConfigurationUtil.getOutputTableName(configuration); + final Properties overrideProps = new Properties (); + overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); + connection = ConnectionUtil.getConnection(configuration,overrideProps); + connection.setAutoCommit(false); + final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.pStatement = connection.prepareStatement(upsertQuery); + + } catch (SQLException e) { + throw new RuntimeException(e.getMessage()); + } + } + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + + try { + final ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + final List<Object> values = record.getValues(); + indxWritable.setValues(values); + indxWritable.write(this.pStatement); + this.pStatement.execute(); + + final Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true); + while (uncommittedDataIterator.hasNext()) { + Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); + if (Bytes.compareTo(Bytes.toBytes(indexTableName), kvPair.getFirst()) != 0) { + // skip edits for other tables + continue; + } + List<KeyValue> keyValueList = kvPair.getSecond(); + keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); + for (KeyValue kv : keyValueList) { + outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); + context.write(outputKey, kv); + } + context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); + } + connection.rollback(); + } catch (SQLException e) { + LOG.error(" Error {} while read/write of a record ",e.getMessage()); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new RuntimeException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + if(connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",e.getMessage()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java index 364baf7..3234967 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java @@ -42,6 +42,17 @@ public class ConnectionUtil { * @throws SQLException */ public static Connection getConnection(final Configuration configuration) throws SQLException { + return getConnection(configuration, null); + } + + /** + * Used primarily in cases where we need to pass few additional/overriding properties + * @param configuration + * @param properties + * @return + * @throws SQLException + */ + public static Connection getConnection(final Configuration configuration , final Properties properties) throws SQLException { Preconditions.checkNotNull(configuration); final Properties props = new Properties(); Iterator<Map.Entry<String, String>> iterator = configuration.iterator(); @@ -51,6 +62,9 @@ public class ConnectionUtil { props.setProperty(entry.getKey(), entry.getValue()); } } + if(properties != null && !properties.isEmpty()) { + props.putAll(properties); + } final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props); return conn; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 4d025ee..b8b64b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -33,7 +33,10 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor; +import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; import org.apache.phoenix.mapreduce.PhoenixInputFormat; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; @@ -83,6 +86,11 @@ public final class PhoenixConfigurationUtil { public static final String INPUT_CLASS = "phoenix.input.class"; + public static final String CURRENT_SCN_VALUE = "phoenix.mr.currentscn.value"; + + /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */ + public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor"; + public enum SchemaType { TABLE, QUERY; @@ -313,4 +321,17 @@ public final class PhoenixConfigurationUtil { //In order to have phoenix working on a secured cluster TableMapReduceUtil.initCredentials(job); } + + public static ImportPreUpsertKeyValueProcessor loadPreUpsertProcessor(Configuration conf) { + Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null; + try { + processorClass = conf.getClass( + UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class, + ImportPreUpsertKeyValueProcessor.class); + } catch (Exception e) { + throw new IllegalStateException("Couldn't load upsert hook class", e); + } + + return ReflectionUtils.newInstance(processorClass, conf); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java index bf76174..5f52f59 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java @@ -35,10 +35,11 @@ public class CreateIndexStatement extends SingleTableStatement { private final ListMultimap<String,Pair<String,Object>> props; private final boolean ifNotExists; private final IndexType indexType; + private final boolean async; public CreateIndexStatement(NamedNode indexTableName, NamedTableNode dataTable, IndexKeyConstraint indexKeyConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, - ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) { + ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) { super(dataTable, bindCount); this.indexTableName =TableName.create(dataTable.getName().getSchemaName(),indexTableName.getName()); this.indexKeyConstraint = indexKeyConstraint == null ? IndexKeyConstraint.EMPTY : indexKeyConstraint; @@ -47,6 +48,7 @@ public class CreateIndexStatement extends SingleTableStatement { this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props; this.ifNotExists = ifNotExists; this.indexType = indexType; + this.async = async; } public IndexKeyConstraint getIndexConstraint() { @@ -78,4 +80,8 @@ public class CreateIndexStatement extends SingleTableStatement { return indexType; } + public boolean isAsync() { + return async; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 4e8f792..2a4168d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -282,8 +282,8 @@ public class ParseNodeFactory { return new CreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount); } - public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) { - return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount); + public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount) { + return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount); } public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith, http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index e133433..ab3c284 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1128,6 +1128,11 @@ public class MetaDataClient { return new MutationState(0,connection); } + // In async process, we return immediately as the MR job needs to be triggered . + if(statement.isAsync()) { + return new MutationState(0, connection); + } + // If our connection is at a fixed point-in-time, we need to open a new // connection so that our new index table is visible. if (connection.getSCN() != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java index 55865c0..46350be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java @@ -16,7 +16,6 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PDataType; import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; import com.google.common.collect.Lists; /** @@ -33,6 +32,9 @@ public class ColumnInfo { public ColumnInfo(String columnName, int sqlType) { Preconditions.checkNotNull(columnName, "columnName cannot be null"); Preconditions.checkArgument(!columnName.isEmpty(), "columnName cannot be empty"); + if(!columnName.startsWith(SchemaUtil.ESCAPE_CHARACTER)) { + columnName = SchemaUtil.getEscapedFullColumnName(columnName); + } this.columnName = columnName; this.sqlType = sqlType; } @@ -63,7 +65,7 @@ public class ColumnInfo { @Override public String toString() { - return columnName + STR_SEPARATOR + getPDataType().getSqlTypeName(); + return getPDataType().getSqlTypeName() + STR_SEPARATOR + columnName ; } @Override @@ -97,14 +99,15 @@ public class ColumnInfo { */ public static ColumnInfo fromString(String stringRepresentation) { List<String> components = - Lists.newArrayList(Splitter.on(STR_SEPARATOR).split(stringRepresentation)); + Lists.newArrayList(stringRepresentation.split(":",2)); + if (components.size() != 2) { throw new IllegalArgumentException("Unparseable string: " + stringRepresentation); } return new ColumnInfo( - components.get(0), - PDataType.fromSqlTypeName(components.get(1)).getSqlType()); + components.get(1), + PDataType.fromSqlTypeName(components.get(0)).getSqlType()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index a2953f2..b69dab5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.parse.HintNode; +import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.WildcardParseNode; import org.apache.phoenix.query.QueryServices; @@ -110,21 +112,53 @@ public final class QueryUtil { throw new IllegalArgumentException("At least one column must be provided for upserts"); } + final List<String> columnNames = Lists.transform(columnInfos, new Function<ColumnInfo,String>() { + @Override + public String apply(ColumnInfo columnInfo) { + return columnInfo.getColumnName(); + } + }); + return constructUpsertStatement(tableName, columnNames, null); + + } + + /** + * Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter markers. The list of + * {@code ColumnInfo}s must contain at least one element. + * + * @param tableName name of the table for which the upsert statement is to be created + * @param columns list of columns to be included in the upsert statement + * @param Hint hint to be added to the UPSERT statement. + * @return the created {@code UPSERT} statement + */ + public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) { + + if (columns.isEmpty()) { + throw new IllegalArgumentException("At least one column must be provided for upserts"); + } + + String hintStr = ""; + if(hint != null) { + final HintNode node = new HintNode(hint.name()); + hintStr = node.toString(); + } + List<String> parameterList = Lists.newArrayList(); - for (int i = 0; i < columnInfos.size(); i++) { + for (int i = 0; i < columns.size(); i++) { parameterList.add("?"); } return String.format( - "UPSERT INTO %s (%s) VALUES (%s)", + "UPSERT %s INTO %s (%s) VALUES (%s)", + hintStr, tableName, Joiner.on(", ").join( Iterables.transform( - columnInfos, - new Function<ColumnInfo, String>() { + columns, + new Function<String, String>() { @Nullable @Override - public String apply(@Nullable ColumnInfo columnInfo) { - return getEscapedFullColumnName(columnInfo.getColumnName()); + public String apply(@Nullable String columnName) { + return getEscapedFullColumnName(columnName); } })), Joiner.on(", ").join(parameterList)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index c9574e3..1d986c6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -632,6 +632,9 @@ public class SchemaUtil { } public static String getEscapedFullColumnName(String fullColumnName) { + if(fullColumnName.startsWith(ESCAPE_CHARACTER)) { + return fullColumnName; + } int index = fullColumnName.indexOf(QueryConstants.NAME_SEPARATOR); if (index < 0) { return getEscapedArgument(fullColumnName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java index 4cb5732..a00e228 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java @@ -17,17 +17,19 @@ */ package org.apache.phoenix.mapreduce; -import com.google.common.collect.ImmutableList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import com.google.common.collect.ImmutableList; public class CsvBulkImportUtilTest { @@ -57,7 +59,7 @@ public class CsvBulkImportUtilTest { public void testConfigurePreUpsertProcessor() { Configuration conf = new Configuration(); CsvBulkImportUtil.configurePreUpsertProcessor(conf, MockProcessor.class); - ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf); + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); assertEquals(MockProcessor.class, processor.getClass()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java index ee9d0e1..4033a65 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java @@ -17,23 +17,25 @@ */ package org.apache.phoenix.mapreduce; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PIntegerArray; import org.apache.phoenix.schema.types.PUnsignedInt; import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; public class CsvToKeyValueMapperTest { @@ -110,10 +112,10 @@ public class CsvToKeyValueMapperTest { @Test public void testLoadPreUpdateProcessor() { Configuration conf = new Configuration(); - conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, + conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, ImportPreUpsertKeyValueProcessor.class); - ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf); + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); assertEquals(MockUpsertProcessor.class, processor.getClass()); } @@ -121,7 +123,7 @@ public class CsvToKeyValueMapperTest { public void testLoadPreUpdateProcessor_NotConfigured() { Configuration conf = new Configuration(); - ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf); + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); assertEquals(CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class, processor.getClass()); @@ -130,9 +132,9 @@ public class CsvToKeyValueMapperTest { @Test(expected=IllegalStateException.class) public void testLoadPreUpdateProcessor_ClassNotFound() { Configuration conf = new Configuration(); - conf.set(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass"); + conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass"); - CsvToKeyValueMapper.loadPreUpsertProcessor(conf); + PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java index 931d6fd..7f460cd 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java @@ -30,7 +30,7 @@ public class ColumnInfoTest { @Test public void testToFromStringRoundTrip() { - ColumnInfo columnInfo = new ColumnInfo("myColumn", Types.INTEGER); + ColumnInfo columnInfo = new ColumnInfo("a.myColumn", Types.INTEGER); assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString())); } @@ -49,4 +49,10 @@ public class ColumnInfoTest { assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), sqlE.getErrorCode()); } } + + @Test + public void testToFromColonInColumnName() { + ColumnInfo columnInfo = new ColumnInfo(":myColumn", Types.INTEGER); + assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString())); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java index 33e3b5a..beabaf1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java @@ -38,7 +38,7 @@ public class QueryUtilTest { @Test public void testConstructUpsertStatement_ColumnInfos() { assertEquals( - "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)", + "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)", QueryUtil.constructUpsertStatement("MYTAB", ImmutableList.of(ID_COLUMN, NAME_COLUMN))); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java index 7a861b9..abfb442 100644 --- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java +++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java @@ -35,6 +35,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.SchemaUtil; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.data.DataType; @@ -65,10 +66,10 @@ public class PhoenixPigSchemaUtilTest { // expected schema. final ResourceFieldSchema[] fields = new ResourceFieldSchema[2]; - fields[0] = new ResourceFieldSchema().setName("ID") + fields[0] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("ID")) .setType(DataType.LONG); - fields[1] = new ResourceFieldSchema().setName("NAME") + fields[1] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("NAME")) .setType(DataType.CHARARRAY); final ResourceSchema expected = new ResourceSchema().setFields(fields);