PHOENIX-1609-4.0

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9c889440
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9c889440
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9c889440

Branch: refs/heads/4.0
Commit: 9c8894407153dea46657f70fe09d8534c4f75d99
Parents: 04ceb5f
Author: ravimagham <ravimag...@apache.org>
Authored: Thu Mar 5 16:18:57 2015 -0800
Committer: ravimagham <ravimag...@apache.org>
Committed: Wed Mar 11 20:55:32 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/mapreduce/IndexToolIT.java   | 296 ++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   7 +-
 .../phoenix/compile/PostIndexDDLCompiler.java   |  36 ++-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   8 +-
 .../phoenix/mapreduce/CsvBulkImportUtil.java    |   6 +-
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  42 +--
 .../phoenix/mapreduce/PhoenixInputFormat.java   |   9 +-
 .../phoenix/mapreduce/PhoenixJobCounters.java   |  29 ++
 .../phoenix/mapreduce/index/IndexTool.java      | 302 +++++++++++++++++++
 .../mapreduce/index/PhoenixIndexDBWritable.java |  91 ++++++
 .../index/PhoenixIndexImportMapper.java         | 133 ++++++++
 .../phoenix/mapreduce/util/ConnectionUtil.java  |  14 +
 .../util/PhoenixConfigurationUtil.java          |  21 ++
 .../phoenix/parse/CreateIndexStatement.java     |   8 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   4 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   5 +
 .../org/apache/phoenix/util/ColumnInfo.java     |  13 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |  46 ++-
 .../org/apache/phoenix/util/SchemaUtil.java     |   3 +
 .../mapreduce/CsvBulkImportUtilTest.java        |  14 +-
 .../mapreduce/CsvToKeyValueMapperTest.java      |  26 +-
 .../org/apache/phoenix/util/ColumnInfoTest.java |   8 +-
 .../org/apache/phoenix/util/QueryUtilTest.java  |   2 +-
 .../pig/util/PhoenixPigSchemaUtilTest.java      |   5 +-
 24 files changed, 1048 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
new file mode 100644
index 0000000..2b7b16b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests for the {@link IndexTool}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolIT {
+    
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+  
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        Configuration conf = hbaseTestUtil.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        hbaseTestUtil.startMiniCluster();
+        hbaseTestUtil.startMiniMapReduceCluster();
+        Class.forName(PhoenixDriver.class.getName());
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+    }
+    
+    @Test
+    public void testImmutableGlobalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE1",true, false);
+    }
+    
+    @Test
+    public void testImmutableLocalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE2",true, true);
+    }
+    
+    @Test
+    public void testMutableGlobalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE3",false, false);
+    }
+    
+    @Test
+    public void testMutableLocalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE4",false, true);
+    }
+    
+    public void testSecondaryIndex(final String dataTable , final boolean 
isImmutable , final boolean isLocal) throws Exception {
+        
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = 
DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL 
PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s",dataTable, (isImmutable ? 
"IMMUTABLE_ROWS=true" :"")));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, 
?)",dataTable);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            upsertRow(stmt1, id++);
+            upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s (UPPER(NAME)) 
ASYNC ", (isLocal ? "LOCAL" : ""), indxTable,dataTable));
+   
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM 
%s",dataTable);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL 
SCAN OVER %s",dataTable),actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME1", rs.getString(1));    
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new 
Configuration(hbaseTestUtil.getConfiguration()));
+            
+            final String[] cmdArgs = getArgValues(dataTable,indxTable);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            // insert two more rows
+            upsertRow(stmt1, 3);
+            upsertRow(stmt1, 4);
+            conn.commit();
+
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan,dataTable,indxTable,isLocal);
+            
+            rs = stmt.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME1", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+
+            assertTrue(rs.next());
+            assertEquals("UNAME3", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME4", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+      
+            assertFalse(rs.next());
+            
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON 
%s",indxTable , dataTable));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    
+    /**
+     * This test is to assert that updates that happen to rows of a mutable 
table after an index is created in ASYNC mode and before
+     * the MR job runs, do show up in the index table . 
+     * @throws Exception
+     */
+    @Test
+    public void testMutalbleIndexWithUpdates() throws Exception {
+        
+        final String dataTable = "DATA_TABLE5";
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = 
DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL 
PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, 
?)",dataTable);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            upsertRow(stmt1, id++);
+            upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) 
ASYNC ", indxTable,dataTable));
+            
+            //update a row 
+            stmt1.setInt(1, 1);
+            stmt1.setString(2, "uname" + String.valueOf(10));
+            stmt1.setInt(3, 95050 + 1);
+            stmt1.executeUpdate();
+            conn.commit();  
+            
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM 
%s",dataTable);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL 
SCAN OVER %s",dataTable),actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new 
Configuration(hbaseTestUtil.getConfiguration()));
+            
+            final String[] cmdArgs = getArgValues(dataTable,indxTable);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan,dataTable,indxTable,false);
+            
+            rs = stmt.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON 
%s",indxTable , dataTable));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    private void assertExplainPlan(final String actualExplainPlan, String 
dataTable,
+            String indxTable, boolean isLocal) {
+        
+        String expectedExplainPlan = "";
+        if(isLocal) {
+            final String localIndexName = 
MetaDataUtil.getLocalIndexTableName(dataTable);
+            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY 
RANGE SCAN OVER %s [-32768]"
+                + "\n    SERVER FILTER BY FIRST KEY ONLY"
+                + "\nCLIENT MERGE SORT", localIndexName);
+        } else {
+            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY 
FULL SCAN OVER %s"
+                    + "\n    SERVER FILTER BY FIRST KEY ONLY",indxTable);
+        }
+        assertEquals(expectedExplainPlan,actualExplainPlan);
+    }
+
+    private String[] getArgValues(String dataTable, String indxTable) {
+        final List<String> args = Lists.newArrayList();
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-it");
+        args.add(indxTable);
+        args.add("-op");
+        args.add("/tmp/"+UUID.randomUUID().toString());
+        return args.toArray(new String[0]);
+    }
+
+    private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+        // insert row
+        stmt.setInt(1, i);
+        stmt.setString(2, "uname" + String.valueOf(i));
+        stmt.setInt(3, 95050 + i);
+        stmt.executeUpdate();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        try {
+            PhoenixDriver.INSTANCE.close();
+        } finally {
+            try {
+                DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+            } finally {                    
+                try {
+                    hbaseTestUtil.shutdownMiniMapReduceCluster();
+                } finally {
+                    hbaseTestUtil.shutdownMiniCluster();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g 
b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 9e843a0..9750ee7 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -84,6 +84,7 @@ tokens
     WITHIN='within';
     SET='set';
     CAST='cast';
+    ACTIVE='active';
     USABLE='usable';
     UNUSABLE='unusable';
     DISABLE='disable';
@@ -109,6 +110,7 @@ tokens
     STATISTICS='statistics';    
     COLUMNS='columns';
     TRACE='trace';
+    ASYNC='async';
 }
 
 
@@ -405,9 +407,10 @@ create_index_node returns [CreateIndexStatement ret]
     :   CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON 
t=from_table_name
         (LPAREN ik=ik_constraint RPAREN)
         (INCLUDE (LPAREN icrefs=column_names RPAREN))?
+        (async=ASYNC)?
         (p=fam_properties)?
         (SPLIT ON v=value_expression_list)?
-        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, 
v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, 
getBindCount()); }
+        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, 
v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != 
null, getBindCount()); }
     ;
 
 // Parse a create sequence statement.
@@ -498,7 +501,7 @@ drop_index_node returns [DropIndexStatement ret]
 
 // Parse a alter index statement
 alter_index_node returns [AlterIndexStatement ret]
-    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE 
| UNUSABLE | REBUILD | DISABLE)
+    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE 
| UNUSABLE | REBUILD | DISABLE | ACTIVE)
       {ret = factory.alterIndex(factory.namedTable(null, 
TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, 
PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
     ;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index c8cf28e..5836b99 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -28,6 +28,8 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.IndexUtil;
 
+import com.google.common.collect.Lists;
+
 
 /**
  * Class that compiles plan to generate initial data values after a DDL 
command for
@@ -36,10 +38,15 @@ import org.apache.phoenix.util.IndexUtil;
 public class PostIndexDDLCompiler {
     private final PhoenixConnection connection;
     private final TableRef dataTableRef;
-
+    private List<String> indexColumnNames;
+    private List<String> dataColumnNames;
+    private String selectQuery;
+    
     public PostIndexDDLCompiler(PhoenixConnection connection, TableRef 
dataTableRef) {
         this.connection = connection;
         this.dataTableRef = dataTableRef;
+        indexColumnNames = Lists.newArrayList();
+        dataColumnNames = Lists.newArrayList();
     }
 
     public MutationPlan compile(final PTable indexTable) throws SQLException {
@@ -66,8 +73,11 @@ public class PostIndexDDLCompiler {
         for (int i = posOffset; i < nIndexPKColumns; i++) {
             PColumn col = indexPKColumns.get(i);
             String indexColName = col.getName().getString();
-            dataColumns.append(col.getExpressionStr()).append(",");
+            String dataColName = col.getExpressionStr();
+            dataColumns.append(dataColName).append(",");
             indexColumns.append('"').append(indexColName).append("\",");
+            indexColumnNames.add(indexColName);
+            dataColumnNames.add(dataColName);
         }
         
         // Add the covered columns
@@ -82,6 +92,8 @@ public class PostIndexDDLCompiler {
                     }
                     
dataColumns.append('"').append(dataColumnName).append("\",");
                     
indexColumns.append('"').append(indexColName).append("\",");
+                    indexColumnNames.add(indexColName);
+                    dataColumnNames.add(dataColumnName);
                 }
             }
         }
@@ -93,11 +105,27 @@ public class PostIndexDDLCompiler {
         
         StringBuilder updateStmtStr = new StringBuilder();
         updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO 
").append(schemaName.length() == 0 ? "" : '"' + schemaName + 
"\".").append('"').append(tableName).append("\"(")
-            .append(indexColumns).append(") SELECT 
").append(dataColumns).append(" FROM ")
-            .append(schemaName.length() == 0 ? "" : '"' + schemaName + 
"\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+           .append(indexColumns).append(") ");
+        final StringBuilder selectQueryBuilder = new StringBuilder();
+        selectQueryBuilder.append(" SELECT ").append(dataColumns).append(" 
FROM ")
+        .append(schemaName.length() == 0 ? "" : '"' + schemaName + 
"\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+        this.selectQuery = selectQueryBuilder.toString();
+        updateStmtStr.append(this.selectQuery);
         
         final PhoenixStatement statement = new PhoenixStatement(connection);
         return statement.compileMutation(updateStmtStr.toString());
     }
 
+    public List<String> getIndexColumnNames() {
+        return indexColumnNames;
+    }
+
+    public List<String> getDataColumnNames() {
+        return dataColumnNames;
+    }
+
+    public String getSelectQuery() {
+        return selectQuery;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 746c0b7..4e61391 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -535,8 +535,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
     private static class ExecutableCreateIndexStatement extends 
CreateIndexStatement implements CompilableStatement {
 
         public ExecutableCreateIndexStatement(NamedNode indexName, 
NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> 
includeColumns, List<ParseNode> splits,
-                ListMultimap<String,Pair<String,Object>> props, boolean 
ifNotExists, IndexType indexType, int bindCount) {
-            super(indexName, dataTable, ikConstraint, includeColumns, splits, 
props, ifNotExists, indexType, bindCount);
+                ListMultimap<String,Pair<String,Object>> props, boolean 
ifNotExists, IndexType indexType, boolean async, int bindCount) {
+            super(indexName, dataTable, ikConstraint, includeColumns, splits, 
props, ifNotExists, indexType, async , bindCount);
         }
 
         @SuppressWarnings("unchecked")
@@ -879,8 +879,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
         
         @Override
         public CreateIndexStatement createIndex(NamedNode indexName, 
NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> 
includeColumns, List<ParseNode> splits,
-                ListMultimap<String,Pair<String,Object>> props, boolean 
ifNotExists, IndexType indexType, int bindCount) {
-            return new ExecutableCreateIndexStatement(indexName, dataTable, 
ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+                ListMultimap<String,Pair<String,Object>> props, boolean 
ifNotExists, IndexType indexType, boolean async, int bindCount) {
+            return new ExecutableCreateIndexStatement(indexName, dataTable, 
ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, 
bindCount);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index e62cbb8..8f0f7d5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -19,10 +19,12 @@ package org.apache.phoenix.mapreduce;
 
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.ColumnInfo;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Collection of utility methods for setting up bulk import jobs.
  */
@@ -65,7 +67,7 @@ public class CsvBulkImportUtil {
      */
     public static void configurePreUpsertProcessor(Configuration conf,
             Class<? extends ImportPreUpsertKeyValueProcessor> processorClass) {
-        conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, 
processorClass,
+        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, 
processorClass,
                 ImportPreUpsertKeyValueProcessor.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 6ff7ba3..90cb854 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -28,15 +28,6 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
@@ -49,9 +40,9 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -59,6 +50,15 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 /**
  * MapReduce mapper that converts CSV input lines into KeyValues that can be 
written to HFiles.
  * <p/>
@@ -73,9 +73,6 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
 
     private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce 
Import";
 
-    /** Configuration key for the class name of an 
ImportPreUpsertKeyValueProcessor */
-    public static final String UPSERT_HOOK_CLASS_CONFKEY = 
"phoenix.mapreduce.import.kvprocessor";
-
     /** Configuration key for the field delimiter for input csv records */
     public static final String FIELD_DELIMITER_CONFKEY = 
"phoenix.mapreduce.import.fielddelimiter";
 
@@ -136,7 +133,7 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
         csvLineParser = new 
CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), 
conf.get(QUOTE_CHAR_CONFKEY).charAt(0),
                 conf.get(ESCAPE_CHAR_CONFKEY).charAt(0));
 
-        preUpdateProcessor = loadPreUpsertProcessor(conf);
+        preUpdateProcessor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, 
"").isEmpty()){
                tableName = 
Bytes.toBytes(conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY));
         } else {
@@ -193,23 +190,6 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
     }
 
     /**
-     * Load the configured ImportPreUpsertKeyValueProcessor, or supply a dummy 
processor.
-     */
-    @VisibleForTesting
-    static ImportPreUpsertKeyValueProcessor 
loadPreUpsertProcessor(Configuration conf) {
-        Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = 
null;
-        try {
-            processorClass = conf.getClass(
-                    UPSERT_HOOK_CLASS_CONFKEY, 
DefaultImportPreUpsertKeyValueProcessor.class,
-                    ImportPreUpsertKeyValueProcessor.class);
-        } catch (Exception e) {
-            throw new IllegalStateException("Couldn't load upsert hook class", 
e);
-        }
-
-        return ReflectionUtils.newInstance(processorClass, conf);
-    }
-
-    /**
      * Build up the JDBC URL for connecting to Phoenix.
      *
      * @return the full JDBC URL for a Phoenix connection

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 7c67c2c..a83b9ae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.PhoenixRuntime;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -99,7 +101,12 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
     private QueryPlan getQueryPlan(final JobContext context,final 
Configuration configuration) throws IOException {
         Preconditions.checkNotNull(context);
         try{
-            final Connection connection = 
ConnectionUtil.getConnection(configuration);
+            final String currentScnValue = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            final Properties overridingProps = new Properties();
+            if(currentScnValue != null) {
+                overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
+            }
+            final Connection connection = 
ConnectionUtil.getConnection(configuration,overridingProps);
             final String selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
             Preconditions.checkNotNull(selectStatement);
             final Statement statement = connection.createStatement();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
new file mode 100644
index 0000000..4a869d9
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+/**
+ *  Counters used during Map Reduce jobs
+ *
+ */
+public enum PhoenixJobCounters {
+
+    INPUT_RECORDS,
+    FAILED_RECORDS,
+    OUTPUT_RECORDS;
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
new file mode 100644
index 0000000..d93ef9c
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An MR job to populate the index table from the data table.
+ *
+ */
+public class IndexTool extends Configured implements Tool {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
+
+    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", 
true, "Phoenix schema name (optional)");
+    private static final Option DATA_TABLE_OPTION = new Option("dt", 
"data-table", true, "Data table name (mandatory)");
+    private static final Option INDEX_TABLE_OPTION = new Option("it", 
"index-table", true, "Index table name(mandatory)");
+    private static final Option OUTPUT_PATH_OPTION = new Option("op", 
"output-path", true, "Output path where the files are written(mandatory)");
+    private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+    
+    private static final String ALTER_INDEX_QUERY_TEMPLATE = "ALTER INDEX IF 
EXISTS %s ON %s %s";  
+    private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
+    
+    
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(SCHEMA_NAME_OPTION);
+        options.addOption(DATA_TABLE_OPTION);
+        options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(HELP_OPTION);
+        return options;
+    }
+    
+    /**
+     * Parses the commandline arguments, throws IllegalStateException if 
mandatory arguments are
+     * missing.
+     *
+     * @param args supplied command line arguments
+     * @return the parsed command line
+     */
+    private CommandLine parseOptions(String[] args) {
+
+        final Options options = getOptions();
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: "+ 
e.getMessage(), options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " 
is a mandatory " 
+                   + "parameter");
+        }
+        
+        if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + 
" is a mandatory " 
+                   + "parameter");
+        }
+
+        if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
+            throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + 
" is a mandatory " 
+                   + "parameter");
+        }
+        return cmdLine;
+    }
+
+   
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+    
+    @Override
+    public int run(String[] args) throws Exception {
+        Connection connection = null;
+        try {
+            CommandLine cmdLine = null;
+            try {
+                cmdLine = parseOptions(args);
+            } catch (IllegalStateException e) {
+                printHelpAndExit(e.getMessage(), getOptions());
+            }
+            final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
+            final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+            final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            final String qDataTable = SchemaUtil.getTableName(schemaName, 
dataTable);
+            final String qIndexTable = SchemaUtil.getTableName(schemaName, 
indexTable);
+         
+            connection = ConnectionUtil.getConnection(configuration);
+            if(!isValidIndexTable(connection, dataTable, indexTable)) {
+                throw new IllegalArgumentException(String.format(" %s is not 
an index table for %s ",qIndexTable,qDataTable));
+            }
+            
+            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
dataTable);
+            final PTable pindexTable = PhoenixRuntime.getTable(connection, 
indexTable);
+            
+            // this is set to ensure index tables remains consistent post 
population.
+            long indxTimestamp = pindexTable.getTimeStamp();
+            
configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,Long.toString(indxTimestamp
 + 1));
+            
+            // check if the index type is LOCAL, if so, set the 
logicalIndexName that is computed from the dataTable name.
+            String logicalIndexTable = qIndexTable;
+            if(IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+                logicalIndexTable  = 
MetaDataUtil.getLocalIndexTableName(dataTable);
+            }
+            
+            final PhoenixConnection pConnection = 
connection.unwrap(PhoenixConnection.class);
+            final PostIndexDDLCompiler ddlCompiler = new 
PostIndexDDLCompiler(pConnection,new TableRef(pdataTable));
+            ddlCompiler.compile(pindexTable);
+            
+            final List<String> indexColumns = 
ddlCompiler.getIndexColumnNames();
+            final String selectQuery = ddlCompiler.getSelectQuery();
+            final String upsertQuery = 
QueryUtil.constructUpsertStatement(indexTable, indexColumns, Hint.NO_INDEX);
+       
+            configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, 
upsertQuery);
+            PhoenixConfigurationUtil.setOutputTableName(configuration, 
logicalIndexTable);
+            
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,indexColumns.toArray(new
 String[indexColumns.size()]));
+            final List<ColumnInfo> columnMetadataList = 
PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
+            final String encodedColumnInfos = 
ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+            configuration.set(PhoenixConfigurationUtil.UPSERT_COLUMN_INFO_KEY, 
encodedColumnInfos);
+            
+            final Path outputPath =  new 
Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()),logicalIndexTable);
+            
+            final String jobName = 
String.format(INDEX_JOB_NAME_TEMPLATE,dataTable,indexTable);
+            final Job job = Job.getInstance(configuration, jobName);
+            job.setJarByClass(IndexTool.class);
+           
+            job.setMapperClass(PhoenixIndexImportMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
+            job.setMapOutputValueClass(KeyValue.class);
+            
PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,dataTable,selectQuery);
+     
+            TableMapReduceUtil.initCredentials(job);
+            FileOutputFormat.setOutputPath(job, outputPath);
+            
+            final HTable htable = new HTable(configuration, logicalIndexTable);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            boolean status = job.waitForCompletion(true);
+            if (!status) {
+                LOG.error("Failed to run the IndexTool job. ");
+                htable.close();
+                return -1;
+            }
+            
+            LOG.info("Loading HFiles from {}", outputPath);
+            LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(configuration);
+            loader.doBulkLoad(outputPath, htable);
+            htable.close();
+            
+            LOG.info("Removing output directory {}", outputPath);
+            if (!FileSystem.get(configuration).delete(outputPath, true)) {
+                LOG.error("Removing output directory {} failed", outputPath);
+            }
+            
+            // finally update the index state to ACTIVE.
+            
updateIndexState(connection,dataTable,indexTable,PIndexState.ACTIVE);
+            return 0;
+            
+        } catch (Exception ex) {
+           LOG.error(" An exception occured while performing the indexing job 
, error message {} ",ex.getMessage());
+           return -1;
+        } finally {
+            try {
+                if(connection != null) {
+                    connection.close();
+                }
+            } catch(SQLException sqle) {
+                LOG.error(" Failed to close connection ",sqle.getMessage());
+                throw new RuntimeException("Failed to close connection");
+            }
+        }
+    }
+
+    /**
+     * Checks for the validity of the index table passed to the job.
+     * @param connection
+     * @param masterTable
+     * @param indexTable
+     * @return
+     * @throws SQLException
+     */
+    private boolean isValidIndexTable(final Connection connection, final 
String masterTable, final String indexTable) throws SQLException {
+        final DatabaseMetaData dbMetaData = connection.getMetaData();
+        final String schemaName = 
SchemaUtil.getSchemaNameFromFullName(masterTable);
+        final String tableName = 
SchemaUtil.getTableNameFromFullName(masterTable);
+        
+        ResultSet rs = null;
+        try {
+            rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, 
false);
+            while(rs.next()) {
+                final String indexName = rs.getString(6);
+                if(indexTable.equalsIgnoreCase(indexName)) {
+                    return true;
+                }
+            }
+        } finally {
+            if(rs != null) {
+                rs.close();
+            }
+        }
+        return false;
+    }
+    
+    /**
+     * Updates the index state.
+     * @param connection
+     * @param masterTable
+     * @param indexTable
+     * @param state
+     * @throws SQLException
+     */
+    private void updateIndexState(Connection connection, final String 
masterTable , final String indexTable, PIndexState state) throws SQLException {
+        Preconditions.checkNotNull(connection);
+        final String alterQuery = 
String.format(ALTER_INDEX_QUERY_TEMPLATE,indexTable,masterTable,state.name());
+        connection.createStatement().execute(alterQuery);
+        LOG.info(" Updated the status of the index {} to {} " , indexTable , 
state.name());
+    }
+    
+    public static void main(final String[] args) throws Exception {
+        int result = ToolRunner.run(new IndexTool(), args);
+        System.exit(result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
new file mode 100644
index 0000000..2be810a
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A {@link DBWritable} class that reads and write records.
+ *
+ * 
+ */
+public class PhoenixIndexDBWritable  implements DBWritable { 
+    
+    private List<ColumnInfo> columnMetadata;
+    
+    private List<Object> values;
+    
+    private int columnCount = -1;
+    
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+       Preconditions.checkNotNull(values);
+       Preconditions.checkNotNull(columnMetadata);
+       for(int i = 0 ; i < values.size() ; i++) {
+           Object value = values.get(i);
+           ColumnInfo columnInfo = columnMetadata.get(i);
+           if(value == null) {
+               statement.setNull(i + 1, columnInfo.getSqlType());              
 
+           } else {
+               statement.setObject(i + 1, value , columnInfo.getSqlType());
+           }
+       }
+       
+    }
+
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+        // we do this once per mapper.
+        if(columnCount == -1) {
+            this.columnCount = resultSet.getMetaData().getColumnCount();
+        }
+  
+        values = Lists.newArrayListWithCapacity(columnCount);
+        for(int i = 0 ; i < columnCount ; i++) {
+            Object value = resultSet.getObject(i + 1);
+            values.add(value);
+        }
+        
+    }
+
+    public List<ColumnInfo> getColumnMetadata() {
+        return columnMetadata;
+    }
+
+    public void setColumnMetadata(List<ColumnInfo> columnMetadata) {
+        this.columnMetadata = columnMetadata;
+    }
+
+    public List<Object> getValues() {
+        return values;
+    }
+
+    public void setValues(List<Object> values) {
+        this.values = values;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
new file mode 100644
index 0000000..7bf4bfc
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ *
+ */
+public class PhoenixIndexImportMapper extends Mapper<NullWritable, 
PhoenixIndexDBWritable, ImmutableBytesWritable, KeyValue> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixIndexImportMapper.class);
+    
+    private final PhoenixIndexDBWritable indxWritable = new 
PhoenixIndexDBWritable();
+    
+    private List<ColumnInfo> indxTblColumnMetadata ;
+    
+    private Connection connection;
+    
+    private String indexTableName;
+    
+    private ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    
+    private PreparedStatement pStatement;
+    
+    @Override
+    protected void setup(final Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        final Configuration configuration = context.getConfiguration();
+        try {
+            indxTblColumnMetadata = 
PhoenixConfigurationUtil.getUpsertColumnMetadataList(context.getConfiguration());
+            indxWritable.setColumnMetadata(indxTblColumnMetadata);
+            
+            preUpdateProcessor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(configuration);
+            indexTableName = 
PhoenixConfigurationUtil.getOutputTableName(configuration);
+            final Properties overrideProps = new Properties ();
+            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+            connection = 
ConnectionUtil.getConnection(configuration,overrideProps);
+            connection.setAutoCommit(false);
+            final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            this.pStatement = connection.prepareStatement(upsertQuery);
+            
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage());
+        } 
+    }
+    
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, 
Context context)
+            throws IOException, InterruptedException {
+       
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        
+        try {
+           final ImmutableBytesWritable outputKey = new 
ImmutableBytesWritable();
+           final List<Object> values = record.getValues();
+           indxWritable.setValues(values);
+           indxWritable.write(this.pStatement);
+           this.pStatement.execute();
+            
+           final Iterator<Pair<byte[], List<KeyValue>>> 
uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, 
true);
+           while (uncommittedDataIterator.hasNext()) {
+                Pair<byte[], List<KeyValue>> kvPair = 
uncommittedDataIterator.next();
+                if (Bytes.compareTo(Bytes.toBytes(indexTableName), 
kvPair.getFirst()) != 0) {
+                    // skip edits for other tables
+                    continue;
+                }
+                List<KeyValue> keyValueList = kvPair.getSecond();
+                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);
+                for (KeyValue kv : keyValueList) {
+                    outputKey.set(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength());
+                    context.write(outputKey, kv);
+                }
+                
context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+            }
+            connection.rollback();
+       } catch (SQLException e) {
+           LOG.error(" Error {}  while read/write of a record 
",e.getMessage());
+           context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+           throw new RuntimeException(e);
+        } 
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+         super.cleanup(context);
+         if(connection != null) {
+             try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",e.getMessage());
+            }
+         }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index 364baf7..3234967 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -42,6 +42,17 @@ public class ConnectionUtil {
      * @throws SQLException
      */
     public static Connection getConnection(final Configuration configuration) 
throws SQLException {
+        return getConnection(configuration, null);
+    }
+    
+    /**
+     * Used primarily in cases where we need to pass few additional/overriding 
properties 
+     * @param configuration
+     * @param properties
+     * @return
+     * @throws SQLException
+     */
+    public static Connection getConnection(final Configuration configuration , 
final Properties properties) throws SQLException {
         Preconditions.checkNotNull(configuration);
         final Properties props = new Properties();
         Iterator<Map.Entry<String, String>> iterator = 
configuration.iterator();
@@ -51,6 +62,9 @@ public class ConnectionUtil {
                 props.setProperty(entry.getKey(), entry.getValue());
             }
         }
+        if(properties != null && !properties.isEmpty()) {
+            props.putAll(properties);
+        }
         final Connection conn = 
DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)),
 props);
         return conn;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 4d025ee..b8b64b2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -33,7 +33,10 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import 
org.apache.phoenix.mapreduce.CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -83,6 +86,11 @@ public final class PhoenixConfigurationUtil {
 
     public static final String INPUT_CLASS = "phoenix.input.class";
     
+    public static final String CURRENT_SCN_VALUE = 
"phoenix.mr.currentscn.value";
+    
+    /** Configuration key for the class name of an 
ImportPreUpsertKeyValueProcessor */
+    public static final String UPSERT_HOOK_CLASS_CONFKEY = 
"phoenix.mapreduce.import.kvprocessor";
+    
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -313,4 +321,17 @@ public final class PhoenixConfigurationUtil {
         //In order to have phoenix working on a secured cluster
         TableMapReduceUtil.initCredentials(job);
     }
+    
+    public static ImportPreUpsertKeyValueProcessor 
loadPreUpsertProcessor(Configuration conf) {
+        Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = 
null;
+        try {
+            processorClass = conf.getClass(
+                    UPSERT_HOOK_CLASS_CONFKEY, 
DefaultImportPreUpsertKeyValueProcessor.class,
+                    ImportPreUpsertKeyValueProcessor.class);
+        } catch (Exception e) {
+            throw new IllegalStateException("Couldn't load upsert hook class", 
e);
+        }
+    
+        return ReflectionUtils.newInstance(processorClass, conf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
index bf76174..5f52f59 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
@@ -35,10 +35,11 @@ public class CreateIndexStatement extends 
SingleTableStatement {
     private final ListMultimap<String,Pair<String,Object>> props;
     private final boolean ifNotExists;
     private final IndexType indexType;
+    private final boolean async;
 
     public CreateIndexStatement(NamedNode indexTableName, NamedTableNode 
dataTable, 
             IndexKeyConstraint indexKeyConstraint, List<ColumnName> 
includeColumns, List<ParseNode> splits,
-            ListMultimap<String,Pair<String,Object>> props, boolean 
ifNotExists, IndexType indexType, int bindCount) {
+            ListMultimap<String,Pair<String,Object>> props, boolean 
ifNotExists, IndexType indexType, boolean async, int bindCount) {
         super(dataTable, bindCount);
         this.indexTableName 
=TableName.create(dataTable.getName().getSchemaName(),indexTableName.getName());
         this.indexKeyConstraint = indexKeyConstraint == null ? 
IndexKeyConstraint.EMPTY : indexKeyConstraint;
@@ -47,6 +48,7 @@ public class CreateIndexStatement extends 
SingleTableStatement {
         this.props = props == null ? 
ArrayListMultimap.<String,Pair<String,Object>>create() : props;
         this.ifNotExists = ifNotExists;
         this.indexType = indexType;
+        this.async = async;
     }
 
     public IndexKeyConstraint getIndexConstraint() {
@@ -78,4 +80,8 @@ public class CreateIndexStatement extends 
SingleTableStatement {
         return indexType;
     }
 
+    public boolean isAsync() {
+        return async;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4e8f792..2a4168d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -282,8 +282,8 @@ public class ParseNodeFactory {
         return new CreateTableStatement(tableName, props, columns, 
pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, 
bindCount);
     }
 
-    public CreateIndexStatement createIndex(NamedNode indexName, 
NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> 
includeColumns, List<ParseNode> splits, 
ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType 
indexType, int bindCount) {
-        return new CreateIndexStatement(indexName, dataTable, ikConstraint, 
includeColumns, splits, props, ifNotExists, indexType, bindCount);
+    public CreateIndexStatement createIndex(NamedNode indexName, 
NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> 
includeColumns, List<ParseNode> splits, 
ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType 
indexType,boolean async, int bindCount) {
+        return new CreateIndexStatement(indexName, dataTable, ikConstraint, 
includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
     }
 
     public CreateSequenceStatement createSequence(TableName tableName, 
ParseNode startsWith,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e133433..ab3c284 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1128,6 +1128,11 @@ public class MetaDataClient {
             return new MutationState(0,connection);
         }
 
+        // In async process, we return immediately as the MR job needs to be 
triggered .
+        if(statement.isAsync()) {
+            return new MutationState(0, connection);
+        }
+        
         // If our connection is at a fixed point-in-time, we need to open a new
         // connection so that our new index table is visible.
         if (connection.getSCN() != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
index 55865c0..46350be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -16,7 +16,6 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PDataType;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 
 /**
@@ -33,6 +32,9 @@ public class ColumnInfo {
     public ColumnInfo(String columnName, int sqlType) {
         Preconditions.checkNotNull(columnName, "columnName cannot be null");
         Preconditions.checkArgument(!columnName.isEmpty(), "columnName cannot 
be empty");
+        if(!columnName.startsWith(SchemaUtil.ESCAPE_CHARACTER)) {
+            columnName = SchemaUtil.getEscapedFullColumnName(columnName);
+        }
         this.columnName = columnName;
         this.sqlType = sqlType;
     }
@@ -63,7 +65,7 @@ public class ColumnInfo {
 
     @Override
     public String toString() {
-        return columnName + STR_SEPARATOR + getPDataType().getSqlTypeName();
+        return getPDataType().getSqlTypeName() + STR_SEPARATOR + columnName ;
     }
 
     @Override
@@ -97,14 +99,15 @@ public class ColumnInfo {
      */
     public static ColumnInfo fromString(String stringRepresentation) {
         List<String> components =
-                
Lists.newArrayList(Splitter.on(STR_SEPARATOR).split(stringRepresentation));
+                Lists.newArrayList(stringRepresentation.split(":",2));
+        
         if (components.size() != 2) {
             throw new IllegalArgumentException("Unparseable string: " + 
stringRepresentation);
         }
 
         return new ColumnInfo(
-                components.get(0),
-                PDataType.fromSqlTypeName(components.get(1)).getSqlType());
+                components.get(1),
+                PDataType.fromSqlTypeName(components.get(0)).getSqlType());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index a2953f2..b69dab5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.query.QueryServices;
 
@@ -110,21 +112,53 @@ public final class QueryUtil {
             throw new IllegalArgumentException("At least one column must be 
provided for upserts");
         }
 
+        final List<String> columnNames = Lists.transform(columnInfos, new 
Function<ColumnInfo,String>() {
+            @Override
+            public String apply(ColumnInfo columnInfo) {
+                return columnInfo.getColumnName();
+            }
+        });
+        return constructUpsertStatement(tableName, columnNames, null);
+
+    }
+    
+    /**
+     * Generate an upsert statement based on a list of {@code ColumnInfo}s 
with parameter markers. The list of
+     * {@code ColumnInfo}s must contain at least one element.
+     *
+     * @param tableName name of the table for which the upsert statement is to 
be created
+     * @param columns list of columns to be included in the upsert statement
+     * @param Hint hint to be added to the UPSERT statement.
+     * @return the created {@code UPSERT} statement
+     */
+    public static String constructUpsertStatement(String tableName, 
List<String> columns, Hint hint) {
+
+        if (columns.isEmpty()) {
+            throw new IllegalArgumentException("At least one column must be 
provided for upserts");
+        }
+        
+        String hintStr = "";
+        if(hint != null) {
+           final HintNode node = new HintNode(hint.name());
+           hintStr = node.toString();
+        }
+        
         List<String> parameterList = Lists.newArrayList();
-        for (int i = 0; i < columnInfos.size(); i++) {
+        for (int i = 0; i < columns.size(); i++) {
             parameterList.add("?");
         }
         return String.format(
-                "UPSERT INTO %s (%s) VALUES (%s)",
+                "UPSERT %s INTO %s (%s) VALUES (%s)",
+                hintStr,
                 tableName,
                 Joiner.on(", ").join(
                         Iterables.transform(
-                                columnInfos,
-                                new Function<ColumnInfo, String>() {
+                               columns,
+                                new Function<String, String>() {
                                     @Nullable
                                     @Override
-                                    public String apply(@Nullable ColumnInfo 
columnInfo) {
-                                        return 
getEscapedFullColumnName(columnInfo.getColumnName());
+                                    public String apply(@Nullable String 
columnName) {
+                                        return 
getEscapedFullColumnName(columnName);
                                     }
                                 })),
                 Joiner.on(", ").join(parameterList));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c9574e3..1d986c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -632,6 +632,9 @@ public class SchemaUtil {
     }
     
     public static String getEscapedFullColumnName(String fullColumnName) {
+        if(fullColumnName.startsWith(ESCAPE_CHARACTER)) {
+            return fullColumnName;
+        }
         int index = fullColumnName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {
             return getEscapedArgument(fullColumnName); 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index 4cb5732..a00e228 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,17 +17,19 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import com.google.common.collect.ImmutableList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import com.google.common.collect.ImmutableList;
 
 public class CsvBulkImportUtilTest {
 
@@ -57,7 +59,7 @@ public class CsvBulkImportUtilTest {
     public void testConfigurePreUpsertProcessor() {
         Configuration conf = new Configuration();
         CsvBulkImportUtil.configurePreUpsertProcessor(conf, 
MockProcessor.class);
-        ImportPreUpsertKeyValueProcessor processor = 
CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        ImportPreUpsertKeyValueProcessor processor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         assertEquals(MockProcessor.class, processor.getClass());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index ee9d0e1..4033a65 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -17,23 +17,25 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PIntegerArray;
 import org.apache.phoenix.schema.types.PUnsignedInt;
 import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 public class CsvToKeyValueMapperTest {
 
@@ -110,10 +112,10 @@ public class CsvToKeyValueMapperTest {
     @Test
     public void testLoadPreUpdateProcessor() {
         Configuration conf = new Configuration();
-        conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, 
MockUpsertProcessor.class,
+        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, 
MockUpsertProcessor.class,
                 ImportPreUpsertKeyValueProcessor.class);
 
-        ImportPreUpsertKeyValueProcessor processor = 
CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        ImportPreUpsertKeyValueProcessor processor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         assertEquals(MockUpsertProcessor.class, processor.getClass());
     }
 
@@ -121,7 +123,7 @@ public class CsvToKeyValueMapperTest {
     public void testLoadPreUpdateProcessor_NotConfigured() {
 
         Configuration conf = new Configuration();
-        ImportPreUpsertKeyValueProcessor processor = 
CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        ImportPreUpsertKeyValueProcessor processor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
 
         
assertEquals(CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
                 processor.getClass());
@@ -130,9 +132,9 @@ public class CsvToKeyValueMapperTest {
     @Test(expected=IllegalStateException.class)
     public void testLoadPreUpdateProcessor_ClassNotFound() {
         Configuration conf = new Configuration();
-        conf.set(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, 
"MyUndefinedClass");
+        conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, 
"MyUndefinedClass");
 
-        CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
     }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
index 931d6fd..7f460cd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
@@ -30,7 +30,7 @@ public class ColumnInfoTest {
 
     @Test
     public void testToFromStringRoundTrip() {
-        ColumnInfo columnInfo = new ColumnInfo("myColumn", Types.INTEGER);
+        ColumnInfo columnInfo = new ColumnInfo("a.myColumn", Types.INTEGER);
         assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString()));
     }
 
@@ -49,4 +49,10 @@ public class ColumnInfoTest {
             assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), 
sqlE.getErrorCode());
         }
     }
+    
+    @Test
+    public void testToFromColonInColumnName() {
+        ColumnInfo columnInfo = new ColumnInfo(":myColumn", Types.INTEGER);
+        assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 33e3b5a..beabaf1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -38,7 +38,7 @@ public class QueryUtilTest {
     @Test
     public void testConstructUpsertStatement_ColumnInfos() {
         assertEquals(
-                "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
+                "UPSERT  INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
                 QueryUtil.constructUpsertStatement("MYTAB", 
ImmutableList.of(ID_COLUMN, NAME_COLUMN)));
 
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9c889440/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
 
b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index 7a861b9..abfb442 100644
--- 
a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ 
b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -35,6 +35,7 @@ import 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
@@ -65,10 +66,10 @@ public class PhoenixPigSchemaUtilTest {
         
         // expected schema.
         final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
-        fields[0] = new ResourceFieldSchema().setName("ID")
+        fields[0] = new 
ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("ID"))
                                                 .setType(DataType.LONG);
 
-        fields[1] = new ResourceFieldSchema().setName("NAME")
+        fields[1] = new 
ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("NAME"))
                                                 .setType(DataType.CHARARRAY);
         final ResourceSchema expected = new ResourceSchema().setFields(fields);
         

Reply via email to