http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
new file mode 100644
index 0000000..3738e5b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -0,0 +1,884 @@
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class IndexIT extends BaseHBaseManagedTimeIT {
+       
+       private final boolean localIndex;
+       private final String tableDDLOptions;
+       
+       public IndexIT(boolean localIndex, boolean mutable, boolean 
transactional) {
+               this.localIndex = localIndex;
+               StringBuilder optionBuilder = new StringBuilder();
+               if (!mutable) 
+                       optionBuilder.append(" IMMUTABLE_ROWS=true ");
+               if (transactional) {
+                       if (!(optionBuilder.length()==0))
+                               optionBuilder.append(",");
+                       optionBuilder.append(" TRANSACTIONAL=true ");
+               }
+               this.tableDDLOptions = optionBuilder.toString();
+       }
+       
+       @Parameters(name="localIndex = {0} , mutable = {1} , transactional = 
{2}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false, false }, { false, false, true }, { false, 
true, false }, { false, true, true }, 
+                 { true, false, false }, { true, false, true }, { true, true, 
false }, { true, true, true }
+           });
+    }
+
+       @Test
+    public void testIndexWithNullableFixedWithCols() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+               Statement stmt = conn.createStatement();
+               stmt.execute(ddl);
+               BaseTest.populateTestTable(fullTableName);
+               ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName 
+                           + " (char_col1 ASC, int_col1 ASC)"
+                           + " INCLUDE (long_col1, long_col2)";
+               stmt.execute(ddl);
+               
+               String query = "SELECT d.char_col1, int_col1 from " + 
fullTableName + " as d";
+               ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
+               if(localIndex) {
+                   assertEquals(
+                       "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + 
tableName + " [-32768]\n" + 
+                       "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                       "CLIENT MERGE SORT",
+                       QueryUtil.getExplainPlan(rs));
+               } else {
+                   assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
indexName + "\n"
+                           + "    SERVER FILTER BY FIRST KEY ONLY", 
QueryUtil.getExplainPlan(rs));
+               }
+               
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("chara", rs.getString(1));
+               assertEquals("chara", rs.getString("char_col1"));
+               assertEquals(2, rs.getInt(2));
+               assertTrue(rs.next());
+               assertEquals("chara", rs.getString(1));
+               assertEquals(3, rs.getInt(2));
+               assertTrue(rs.next());
+               assertEquals("chara", rs.getString(1));
+               assertEquals(4, rs.getInt(2));
+               assertFalse(rs.next());
+               
+               conn.createStatement().execute("DROP INDEX " + indexName + " ON 
" + fullTableName);
+               
+               query = "SELECT char_col1, int_col1 from " + fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               
+               query = "SELECT char_col1, int_col1 from "+indexName;
+               try{
+                   rs = conn.createStatement().executeQuery(query);
+                   fail();
+               } catch (SQLException e) {
+                   
assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+               }
+        }
+    }
+    
+    @Test
+    public void testDeleteFromAllPKColumnIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+               String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+               Statement stmt = conn.createStatement();
+               stmt.execute(ddl);
+               BaseTest.populateTestTable(fullTableName);
+               ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName
+                           + " (long_pk, varchar_pk)"
+                           + " INCLUDE (long_col1, long_col2)";
+               stmt.execute(ddl);
+               
+               ResultSet rs;
+               
+               rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM 
" + fullTableName);
+               assertTrue(rs.next());
+               assertEquals(3,rs.getInt(1));
+               rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM 
" + fullIndexName);
+               assertTrue(rs.next());
+               assertEquals(3,rs.getInt(1));
+               
+               String dml = "DELETE from " + fullTableName + " WHERE long_col2 
= 4";
+               assertEquals(1,conn.createStatement().executeUpdate(dml));
+               conn.commit();
+               
+               String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + 
fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals(1L, rs.getLong(1));
+               assertTrue(rs.next());
+               assertEquals(3L, rs.getLong(1));
+               assertFalse(rs.next());
+               
+               query = "SELECT long_pk FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals(1L, rs.getLong(1));
+               assertTrue(rs.next());
+               assertEquals(3L, rs.getLong(1));
+               assertFalse(rs.next());
+               
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals(1L, rs.getLong(1));
+               assertTrue(rs.next());
+               assertEquals(3L, rs.getLong(1));
+               assertFalse(rs.next());
+               
+               conn.createStatement().execute("DROP INDEX " + indexName + " ON 
" + fullTableName);
+        }
+    }
+    
+    //TODO ENABLE THIS TEST AFTER MERGING MASTER TO SEE IF THE SCAN IS CREATED 
CORRECTLY
+//    @Test
+//    public void testDeleteFromNonPKColumnIndex() throws Exception {
+//        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+//        // create unique table and index names for each parameterized test
+//        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+//        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+//        String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+//        String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+//        String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+//        try (Connection conn = DriverManager.getConnection(getUrl(), props)) 
{
+//             conn.setAutoCommit(false);
+//             Statement stmt = conn.createStatement();
+//             stmt.execute(ddl);
+//             BaseTest.populateTestTable(fullTableName);
+//             ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName
+//                         + " (long_col1, long_col2)"
+//                         + " INCLUDE (decimal_col1, decimal_col2)";
+//             stmt.execute(ddl);
+//        }
+//        try (Connection conn = DriverManager.getConnection(getUrl(), props)) 
{    
+//             ResultSet rs;
+//             
+//             rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM 
" + fullTableName);
+//             assertTrue(rs.next());
+//             assertEquals(3,rs.getInt(1));
+//             rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM 
" + fullIndexName);
+//             assertTrue(rs.next());
+//             assertEquals(3,rs.getInt(1));
+//             
+//             String dml = "DELETE from " + fullTableName + " WHERE long_col2 
= 4";
+//             assertEquals(1,conn.createStatement().executeUpdate(dml));
+//             conn.commit();
+//
+//             // query the data table
+//             String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + 
fullTableName;
+//             rs = conn.createStatement().executeQuery(query);
+//             assertTrue(rs.next());
+//             assertEquals(1L, rs.getLong(1));
+//             assertTrue(rs.next());
+//             assertEquals(3L, rs.getLong(1));
+//             assertFalse(rs.next());
+//             
+//             // query the index table
+//             query = "SELECT long_pk FROM " + fullTableName + " ORDER BY 
long_col1";
+//             rs = conn.createStatement().executeQuery(query);
+//             assertTrue(rs.next());
+//             assertEquals(1L, rs.getLong(1));
+//             assertTrue(rs.next());
+//             assertEquals(3L, rs.getLong(1));
+//             assertFalse(rs.next());
+//             
+//             conn.createStatement().execute("DROP INDEX " + indexName + " ON 
" + fullTableName);
+//        }
+//    }
+    
+    @Test
+    public void testGroupByCount() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+       try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+               Statement stmt = conn.createStatement();
+               stmt.execute(ddl);
+               BaseTest.populateTestTable(fullTableName);
+               ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (int_col2)";
+               stmt.execute(ddl);
+               ResultSet rs;
+               rs = conn.createStatement().executeQuery("SELECT int_col2, 
COUNT(*) FROM " + fullTableName + " GROUP BY int_col2");
+               assertTrue(rs.next());
+               assertEquals(1,rs.getInt(2));
+       }
+    }
+
+    @Test
+    public void testSelectDistinctOnTableWithSecondaryImmutableIndex() throws 
Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+       try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+               Statement stmt = conn.createStatement();
+               stmt.execute(ddl);
+               BaseTest.populateTestTable(fullTableName);
+               ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (int_col2)";
+            PreparedStatement pstmt = conn.prepareStatement(ddl);
+            pstmt.execute();
+            ResultSet rs = conn.createStatement().executeQuery("SELECT 
distinct int_col2 FROM " + fullTableName + " where int_col2 > 0");
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            assertFalse(rs.next());
+       }
+    }
+
+    @Test
+    public void testInClauseWithIndexOnColumnOfUsignedIntType() throws 
Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+       try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+               Statement stmt = conn.createStatement();
+               stmt.execute(ddl);
+               BaseTest.populateTestTable(fullTableName);
+               ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (int_col1)";
+            stmt.execute(ddl);
+            ResultSet rs = conn.createStatement().executeQuery("SELECT 
int_col1 FROM " + fullTableName + " where int_col1 IN (1, 2, 3, 4)");
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+            assertFalse(rs.next());
+       }
+    }
+    
+    @Test
+    public void createIndexOnTableWithSpecifiedDefaultCF() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+            ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+               String ddl ="CREATE TABLE " + fullTableName 
+                               + " (k VARCHAR NOT NULL PRIMARY KEY, v1 
VARCHAR, v2 VARCHAR) DEFAULT_COLUMN_FAMILY='A'" + (!tableDDLOptions.isEmpty() ? 
"," + tableDDLOptions : "");
+               Statement stmt = conn.createStatement();
+               stmt.execute(ddl);
+
+               query = "SELECT * FROM " + tableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+
+               String options = localIndex ? "SALT_BUCKETS=10, 
MULTI_TENANT=true, IMMUTABLE_ROWS=true, DISABLE_WAL=true" : "";
+               conn.createStatement().execute(
+                       "CREATE INDEX " + indexName + " ON " + fullTableName + 
" (v1) INCLUDE (v2) " + options);
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+               
+               //check options set correctly on index
+               TableName indexTableName = 
TableName.create(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+               NamedTableNode indexNode = NamedTableNode.create(null, 
indexTableName, null);
+               ColumnResolver resolver = FromCompiler.getResolver(indexNode, 
conn.unwrap(PhoenixConnection.class));
+               PTable indexTable = resolver.getTables().get(0).getTable();
+               // Can't set IMMUTABLE_ROWS, MULTI_TENANT or 
DEFAULT_COLUMN_FAMILY_NAME on an index
+               assertNull(indexTable.getDefaultFamilyName());
+               assertFalse(indexTable.isMultiTenant());
+               assertFalse(indexTable.isImmutableRows());
+               if(localIndex) {
+                   assertEquals(10, indexTable.getBucketNum().intValue());
+                   assertTrue(indexTable.isWALDisabled());
+               }
+        }
+    }
+    
+    @Test
+    public void testIndexWithNullableDateCol() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+            Date date = new Date(System.currentTimeMillis());
+            
+            createMultiCFTestTable(fullTableName, tableDDLOptions);
+            populateMultiCFTestTable(fullTableName, date);
+            String ddl = "CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " 
+ indexName + " ON " + fullTableName + " (date_col)";
+            PreparedStatement stmt = conn.prepareStatement(ddl);
+            stmt.execute();
+            
+            String query = "SELECT int_pk from " + fullTableName ;
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
+            if (localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName +" [-32768]\n"
+                           + "    SERVER FILTER BY FIRST KEY ONLY\n"
+                           + "CLIENT MERGE SORT", 
QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullIndexName + "\n"
+                        + "    SERVER FILTER BY FIRST KEY ONLY", 
QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            query = "SELECT date_col from " + fullTableName + " order by 
date_col" ;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if (localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName + " [-32768]\n"
+                        + "    SERVER FILTER BY FIRST KEY ONLY\n"
+                        + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullIndexName + "\n"
+                        + "    SERVER FILTER BY FIRST KEY ONLY", 
QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(date, rs.getDate(1));
+            assertTrue(rs.next());
+            assertEquals(new Date(date.getTime() + TestUtil.MILLIS_IN_DAY), 
rs.getDate(1));
+            assertTrue(rs.next());
+            assertEquals(new Date(date.getTime() + 2 * 
TestUtil.MILLIS_IN_DAY), rs.getDate(1));
+            assertFalse(rs.next());
+        } 
+    }
+    
+    @Test
+    public void testSelectAllAndAliasWithIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+               
+               String ddl = "CREATE TABLE " + fullTableName + " (k VARCHAR NOT 
NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions;
+                       conn.createStatement().execute(ddl);
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+               
+               ddl = "CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (v2 DESC) INCLUDE (v1)";
+               conn.createStatement().execute(ddl);
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+
+               PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
+               stmt.setString(1,"a");
+               stmt.setString(2, "x");
+               stmt.setString(3, "1");
+               stmt.execute();
+               stmt.setString(1,"b");
+               stmt.setString(2, "y");
+               stmt.setString(3, "2");
+               stmt.execute();
+               conn.commit();
+               
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+               if(localIndex){
+                   assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", 
QueryUtil.getExplainPlan(rs));
+               } else {
+                   assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullIndexName, QueryUtil.getExplainPlan(rs));
+               }
+
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("b",rs.getString(1));
+               assertEquals("y",rs.getString(2));
+               assertEquals("2",rs.getString(3));
+               assertEquals("b",rs.getString("k"));
+               assertEquals("y",rs.getString("v1"));
+               assertEquals("2",rs.getString("v2"));
+               assertTrue(rs.next());
+               assertEquals("a",rs.getString(1));
+               assertEquals("x",rs.getString(2));
+               assertEquals("1",rs.getString(3));
+               assertEquals("a",rs.getString("k"));
+               assertEquals("x",rs.getString("v1"));
+               assertEquals("1",rs.getString("v2"));
+               assertFalse(rs.next());
+               
+               query = "SELECT v1 as foo FROM " + fullTableName + " WHERE v2 = 
'1' ORDER BY foo";
+               rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+               if(localIndex){
+                   assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" +fullTableName + " [-32768,~'1']\n" + 
+                           "    SERVER SORTED BY [\"V1\"]\n" + 
+                           "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+               } else {
+                   assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " 
+fullIndexName + " [~'1']\n" + 
+                           "    SERVER SORTED BY [\"V1\"]\n" + 
+                           "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+               }
+
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("x",rs.getString(1));
+               assertEquals("x",rs.getString("foo"));
+               assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testSelectCF() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+               
+               String ddl = "CREATE TABLE " + fullTableName + " (k VARCHAR NOT 
NULL PRIMARY KEY, a.v1 VARCHAR, a.v2 VARCHAR, b.v1 VARCHAR) " + tableDDLOptions;
+                       conn.createStatement().execute(ddl);
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+               ddl = "CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (v2 DESC) INCLUDE (a.v1)";
+               conn.createStatement().execute(ddl);
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+       
+               PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?,?)");
+               stmt.setString(1,"a");
+               stmt.setString(2, "x");
+               stmt.setString(3, "1");
+               stmt.setString(4, "A");
+               stmt.execute();
+               stmt.setString(1,"b");
+               stmt.setString(2, "y");
+               stmt.setString(3, "2");
+               stmt.setString(4, "B");
+               stmt.execute();
+               conn.commit();
+               
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+               assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullTableName, QueryUtil.getExplainPlan(rs));
+       
+               query = "SELECT a.* FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+               if(localIndex) {
+                   assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", 
QueryUtil.getExplainPlan(rs));
+               } else {
+                   assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullIndexName, QueryUtil.getExplainPlan(rs));
+               }
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("y",rs.getString(1));
+               assertEquals("2",rs.getString(2));
+               assertEquals("y",rs.getString("v1"));
+               assertEquals("2",rs.getString("v2"));
+               assertTrue(rs.next());
+               assertEquals("x",rs.getString(1));
+               assertEquals("1",rs.getString(2));
+               assertEquals("x",rs.getString("v1"));
+               assertEquals("1",rs.getString("v2"));
+               assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testUpsertAfterIndexDrop() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    
+               // make sure that the tables are empty, but reachable
+               conn.createStatement().execute(
+                 "CREATE TABLE " + fullTableName
+                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR)" + tableDDLOptions);
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+           
+               conn.createStatement().execute(
+                 "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + " (v1, v2)");
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+           
+               // load some data into the table
+               PreparedStatement stmt =
+                   conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
+               stmt.setString(1, "a");
+               stmt.setString(2, "x");
+               stmt.setString(3, "1");
+               stmt.execute();
+               conn.commit();
+               
+               // make sure the index is working as expected
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("x", rs.getString(1));
+               assertEquals("1", rs.getString(2));
+               assertEquals("a", rs.getString(3));
+               assertFalse(rs.next());
+       
+               String ddl = "DROP INDEX " + indexName + " ON " + fullTableName;
+               stmt = conn.prepareStatement(ddl);
+               stmt.execute();
+               
+               stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + 
"(k, v1) VALUES(?,?)");
+               stmt.setString(1, "a");
+               stmt.setString(2, "y");
+               stmt.execute();
+               conn.commit();
+           
+               query = "SELECT * FROM " + fullTableName;
+           
+               // check that the data table matches as expected
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("a", rs.getString(1));
+               assertEquals("y", rs.getString(2));
+               assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testMultipleUpdatesAcrossRegions() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    
+               // make sure that the tables are empty, but reachable
+               conn.createStatement().execute(
+                 "CREATE TABLE " + fullTableName
+                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) "  + HTableDescriptor.MAX_FILESIZE + "=1, " + 
HTableDescriptor.MEMSTORE_FLUSHSIZE + "=1 " 
+                                 + (!tableDDLOptions.isEmpty() ? "," + 
tableDDLOptions : "") + "SPLIT ON ('b')");
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+       
+               conn.createStatement().execute(
+                         "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + " (v1, v2)");
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+           
+               // load some data into the table
+               PreparedStatement stmt =
+                   conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
+               stmt.setString(1, "a");
+               stmt.setString(2, "x");
+               stmt.setString(3, "1");
+               stmt.execute();
+               stmt.setString(1, "b");
+               stmt.setString(2, "y");
+               stmt.setString(3, "2");
+               stmt.execute();
+               stmt.setString(1, "c");
+               stmt.setString(2, "z");
+               stmt.setString(3, "3");
+               stmt.execute();
+               conn.commit();
+               
+               // make sure the index is working as expected
+               query = "SELECT * FROM " + fullIndexName;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("x", rs.getString(1));
+               assertEquals("1", rs.getString(2));
+               assertEquals("a", rs.getString(3));
+               assertTrue(rs.next());
+               assertEquals("y", rs.getString(1));
+               assertEquals("2", rs.getString(2));
+               assertEquals("b", rs.getString(3));
+               assertTrue(rs.next());
+               assertEquals("z", rs.getString(1));
+               assertEquals("3", rs.getString(2));
+               assertEquals("c", rs.getString(3));
+               assertFalse(rs.next());
+       
+               query = "SELECT * FROM " + fullTableName;
+               rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+               if (localIndex) {
+                   assertEquals("CLIENT PARALLEL 2-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName+" [-32768]\n"
+                              + "    SERVER FILTER BY FIRST KEY ONLY\n"
+                              + "CLIENT MERGE SORT",
+                       QueryUtil.getExplainPlan(rs));
+               } else {
+                   assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullIndexName + "\n"
+                           + "    SERVER FILTER BY FIRST KEY ONLY",
+                       QueryUtil.getExplainPlan(rs));
+               }
+           
+               // check that the data table matches as expected
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               assertEquals("a", rs.getString(1));
+               assertEquals("x", rs.getString(2));
+               assertEquals("1", rs.getString(3));
+               assertTrue(rs.next());
+               assertEquals("b", rs.getString(1));
+               assertEquals("y", rs.getString(2));
+               assertEquals("2", rs.getString(3));
+               assertTrue(rs.next());
+               assertEquals("c", rs.getString(1));
+               assertEquals("z", rs.getString(2));
+               assertEquals("3", rs.getString(3));
+               assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testIndexWithCaseSensitiveCols() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+               
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + " 
(k VARCHAR NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" 
VARCHAR)"+tableDDLOptions);
+            query = "SELECT * FROM "+fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+            conn.createStatement().execute(
+                         "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + "(\"v2\") INCLUDE (\"V1\")");
+            query = "SELECT * FROM "+fullIndexName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName + " VALUES(?,?,?)");
+            stmt.setString(1,"a");
+            stmt.setString(2, "x");
+            stmt.setString(3, "1");
+            stmt.execute();
+            stmt.setString(1,"b");
+            stmt.setString(2, "y");
+            stmt.setString(3, "2");
+            stmt.execute();
+            conn.commit();
+
+            query = "SELECT * FROM " + fullTableName + " WHERE \"v2\" = '1'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if(localIndex){
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName + " [-32768,'1']\n"
+                           + "CLIENT MERGE SORT", 
QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + 
fullIndexName + " ['1']", QueryUtil.getExplainPlan(rs));
+            }
+
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("a",rs.getString(1));
+            assertEquals("x",rs.getString(2));
+            assertEquals("1",rs.getString(3));
+            assertEquals("a",rs.getString("k"));
+            assertEquals("x",rs.getString("V1"));
+            assertEquals("1",rs.getString("v2"));
+            assertFalse(rs.next());
+
+            query = "SELECT \"V1\", \"V1\" as foo1, \"v2\" as foo, \"v2\" as 
\"Foo1\", \"v2\" FROM " + fullTableName + " ORDER BY foo";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if(localIndex){
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName + " [-32768]\nCLIENT MERGE SORT",
+                    QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER 
"+fullIndexName, QueryUtil.getExplainPlan(rs));
+            }
+
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("x",rs.getString(1));
+            assertEquals("x",rs.getString("V1"));
+            assertEquals("x",rs.getString(2));
+            assertEquals("x",rs.getString("foo1"));
+            assertEquals("1",rs.getString(3));
+            assertEquals("1",rs.getString("Foo"));
+            assertEquals("1",rs.getString(4));
+            assertEquals("1",rs.getString("Foo1"));
+            assertEquals("1",rs.getString(5));
+            assertEquals("1",rs.getString("v2"));
+            assertTrue(rs.next());
+            assertEquals("y",rs.getString(1));
+            assertEquals("y",rs.getString("V1"));
+            assertEquals("y",rs.getString(2));
+            assertEquals("y",rs.getString("foo1"));
+            assertEquals("2",rs.getString(3));
+            assertEquals("2",rs.getString("Foo"));
+            assertEquals("2",rs.getString(4));
+            assertEquals("2",rs.getString("Foo1"));
+            assertEquals("2",rs.getString(5));
+            assertEquals("2",rs.getString("v2"));
+            assertFalse(rs.next());
+        } 
+    }
+
+    @Test
+    public void testInFilterOnIndexedTable() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               
+               String ddl = "CREATE TABLE " + fullTableName +"  (PK1 CHAR(2) 
NOT NULL PRIMARY KEY, CF1.COL1 BIGINT) " + tableDDLOptions;
+               conn.createStatement().execute(ddl);
+               ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + "(COL1)";
+               conn.createStatement().execute(ddl);
+       
+               query = "SELECT COUNT(COL1) FROM " + fullTableName +" WHERE 
COL1 IN (1,25,50,75,100)"; 
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+        } 
+    }
+
+    @Test
+    public void testIndexWithDecimalCol() throws Exception {
+       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+               conn.setAutoCommit(false);
+               String query;
+               ResultSet rs;
+               // create unique table and index names for each parameterized 
test
+               String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + 
System.currentTimeMillis();
+               String indexName = "IDX"  + "_" + System.currentTimeMillis();
+               String fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+               String fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+            Date date = new Date(System.currentTimeMillis());
+            
+            createMultiCFTestTable(fullTableName, tableDDLOptions);
+            populateMultiCFTestTable(fullTableName, date);
+            String ddl = null;
+            ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + " (decimal_pk) INCLUDE (decimal_col1, 
decimal_col2)";
+            PreparedStatement stmt = conn.prepareStatement(ddl);
+            stmt.execute();
+            
+            query = "SELECT decimal_pk, decimal_col1, decimal_col2 from " + 
fullTableName ;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if(localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER 
_LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", 
QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
fullIndexName, QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(new BigDecimal("1.1"), rs.getBigDecimal(1));
+            assertEquals(new BigDecimal("2.1"), rs.getBigDecimal(2));
+            assertEquals(new BigDecimal("3.1"), rs.getBigDecimal(3));
+            assertTrue(rs.next());
+            assertEquals(new BigDecimal("2.2"), rs.getBigDecimal(1));
+            assertEquals(new BigDecimal("3.2"), rs.getBigDecimal(2));
+            assertEquals(new BigDecimal("4.2"), rs.getBigDecimal(3));
+            assertTrue(rs.next());
+            assertEquals(new BigDecimal("3.3"), rs.getBigDecimal(1));
+            assertEquals(new BigDecimal("4.3"), rs.getBigDecimal(2));
+            assertEquals(new BigDecimal("5.3"), rs.getBigDecimal(3));
+            assertFalse(rs.next());
+        } 
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
deleted file mode 100644
index fe17dbc..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalMutableIndexIT.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-public class LocalMutableIndexIT extends BaseMutableIndexIT {
-
-    public LocalMutableIndexIT() {
-        super(true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index d11c059..e2af717 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -26,11 +26,13 @@ import static 
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
@@ -73,6 +75,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 /**
  * 
  * Test for failure of region server to write to index table.
@@ -84,9 +89,9 @@ import org.junit.experimental.categories.Category;
  */
 
 @Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
     private static final int NUM_SLAVES = 4;
-    private static String url;
     private static PhoenixTestDriver driver;
     private static HBaseTestingUtility util;
     private Timer scheduleTimer;
@@ -95,7 +100,15 @@ public class MutableIndexFailureIT extends BaseTest {
     private static final String INDEX_TABLE_NAME = "I";
     private static final String DATA_TABLE_FULL_NAME = 
SchemaUtil.getTableName(SCHEMA_NAME, "T");
     private static final String INDEX_TABLE_FULL_NAME = 
SchemaUtil.getTableName(SCHEMA_NAME, "I");
-
+    
+    private boolean transactional; 
+       private final String tableDDLOptions;
+    
+    public MutableIndexFailureIT(boolean transactional) {
+               this.transactional = transactional;
+               this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : 
"";
+    }
+    
     @Before
     public void doSetup() throws Exception {
         Configuration conf = HBaseConfiguration.create();
@@ -113,6 +126,15 @@ public class MutableIndexFailureIT extends BaseTest {
         url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + 
JDBC_PROTOCOL_SEPARATOR + clientPort
                 + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
         driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+               clusterInitialized = true;
+               setupTxManager();
+    }
+       
+       @Parameters(name="transactional = {0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false}, {true } 
+           });
     }
 
     @After
@@ -133,227 +155,275 @@ public class MutableIndexFailureIT extends BaseTest {
 
     @Test(timeout=300000)
     public void testWriteFailureDisablesLocalIndex() throws Exception {
-        testWriteFailureDisablesIndex(true);
+       helpTestWriteFailureDisablesIndex(true);
     }
- 
+    
     @Test(timeout=300000)
-    public void testWriteFailureDisablesIndex() throws Exception {
-        testWriteFailureDisablesIndex(false);
+    public void testWriteFailureDisablesGlobalIndex() throws Exception {
+       helpTestWriteFailureDisablesIndex(false);
     }
-    
-    public void testWriteFailureDisablesIndex(boolean localIndex) throws 
Exception {
-        String query;
-        ResultSet rs;
 
+    private void helpTestWriteFailureDisablesIndex(boolean localIndex) throws 
Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL 
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        if(localIndex) {
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + 
DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
-        } else {
-            conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+        try (Connection conn = driver.connect(url, props);) {
+               String query;
+               ResultSet rs;
+               conn.setAutoCommit(false);
+               conn.createStatement().execute(
+                       "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR 
NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+               query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+       
+               if(localIndex) {
+                   conn.createStatement().execute(
+                       "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+                   conn.createStatement().execute(
+                       "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " 
+ DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
+               } else {
+                   conn.createStatement().execute(
+                       "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+               }
+                   
+               query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+       
+               // Verify the metadata for index is correct.
+               rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                       new String[] { PTableType.INDEX.toString() });
+               assertTrue(rs.next());
+               assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+               assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
+               assertFalse(rs.next());
+               
+               PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+               stmt.setString(1, "a");
+               stmt.setString(2, "x");
+               stmt.setString(3, "1");
+               stmt.execute();
+               conn.commit();
+       
+               TableName indexTable =
+                       TableName.valueOf(localIndex ? MetaDataUtil
+                               .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : 
INDEX_TABLE_FULL_NAME);
+               HBaseAdmin admin = this.util.getHBaseAdmin();
+               HTableDescriptor indexTableDesc = 
admin.getTableDescriptor(indexTable);
+               try{
+                 admin.disableTable(indexTable);
+                 admin.deleteTable(indexTable);
+               } catch (TableNotFoundException ignore) {}
+       
+               stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+               stmt.setString(1, "a2");
+               stmt.setString(2, "x2");
+               stmt.setString(3, "2");
+               stmt.execute();
+               
+               if (transactional) {
+                   try {
+                       conn.commit();
+                       fail();
+                   } catch (SQLException e1) {
+                               try {
+                                       conn.rollback();
+                                       fail();
+                               } catch (SQLException e2) {
+                                       // rollback fails as well because index 
is disabled
+                               }
+                   }
+               }
+               else {
+                       conn.commit();
+               }
+               
+               // Verify the metadata for index is correct.
+               rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                       new String[] { PTableType.INDEX.toString() });
+               assertTrue(rs.next());
+               assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+               // if the table is transactional, the index will not be 
disabled if there is a failure
+               PIndexState indexState = transactional ? PIndexState.ACTIVE : 
PIndexState.DISABLE;
+                       assertEquals(indexState.toString(), 
rs.getString("INDEX_STATE"));
+               assertFalse(rs.next());
+               if(localIndex) {
+                   rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
+                       new String[] { PTableType.INDEX.toString() });
+                   assertTrue(rs.next());
+                   assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
+                   assertEquals(indexState.toString(), 
rs.getString("INDEX_STATE"));
+                   assertFalse(rs.next());
+               }
+       
+               // if the table is transactional the write to the index table  
will fail because the index has not been disabled
+               if (!transactional) {
+                       // Verify UPSERT on data table still work after index 
is disabled       
+                       stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+                       stmt.setString(1, "a3");
+                       stmt.setString(2, "x3");
+                       stmt.setString(3, "3");
+                       stmt.execute();
+                       conn.commit();
+               }
+               
+               if (transactional) {
+                       // if the table was transactional there should be 1 row 
(written before the index was disabled)
+                       query = "SELECT /*+ NO_INDEX */ v2 FROM " + 
DATA_TABLE_FULL_NAME;
+                       rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
+                       String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN 
OVER " + DATA_TABLE_FULL_NAME;
+                               assertEquals(expectedPlan, 
QueryUtil.getExplainPlan(rs));
+                       rs = conn.createStatement().executeQuery(query);
+                       assertTrue(rs.next());
+                       assertEquals("1", rs.getString(1));
+                       assertFalse(rs.next());
+               } else {
+                       // if the table was not transactional there should be 
three rows (all writes to data table should succeed)
+                       query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME;
+                       rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
+                       String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN 
OVER " + DATA_TABLE_FULL_NAME;
+                               assertEquals(expectedPlan, 
QueryUtil.getExplainPlan(rs));
+                       rs = conn.createStatement().executeQuery(query);
+                       assertTrue(rs.next());
+                       assertEquals("1", rs.getString(1));
+                       assertTrue(rs.next());
+                       assertEquals("2", rs.getString(1));
+                       assertTrue(rs.next());
+                       assertEquals("3", rs.getString(1));
+                       assertFalse(rs.next());
+               }
+               
+               // recreate index table
+               admin.createTable(indexTableDesc);
+               do {
+                 rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                     new String[] { PTableType.INDEX.toString() });
+                 assertTrue(rs.next());
+                 
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                     break;
+                 }
+                 if(localIndex) {
+                     rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
+                         new String[] { PTableType.INDEX.toString() });
+                     assertTrue(rs.next());
+                     
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                         break;
+                     }
+                 }
+                 Thread.sleep(15 * 1000); // sleep 15 secs
+               } while(true);
+               
+               // Verify UPSERT on data table still work after index table is 
recreated       
+               stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+               stmt.setString(1, "a4");
+               stmt.setString(2, "x4");
+               stmt.setString(3, "4");
+               stmt.execute();
+               conn.commit();
+               
+               // verify index table has data
+               query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
+               rs = conn.createStatement().executeQuery(query);
+               assertTrue(rs.next());
+               
+               // for txn tables there will be only one row in the index (a4)
+               // for non txn tables there will be three rows because we only 
partially build index from where we failed and the oldest 
+               // index row has been deleted when we dropped the index table 
during test
+               assertEquals( transactional ? 1: 3, rs.getInt(1));
         }
-            
-        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-
-        TableName indexTable =
-                TableName.valueOf(localIndex ? MetaDataUtil
-                        .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : 
INDEX_TABLE_FULL_NAME);
-        HBaseAdmin admin = this.util.getHBaseAdmin();
-        HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
-        try{
-          admin.disableTable(indexTable);
-          admin.deleteTable(indexTable);
-        } catch (TableNotFoundException ignore) {}
-
-        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " 
VALUES(?,?,?)");
-        stmt.setString(1, "a2");
-        stmt.setString(2, "x2");
-        stmt.setString(3, "2");
-        stmt.execute();
-        try {
-            conn.commit();
-        } catch (SQLException e) {}
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.DISABLE.toString(), 
rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        if(localIndex) {
-            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
-                new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
-            assertEquals(PIndexState.DISABLE.toString(), 
rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-        }
-
-        // Verify UPSERT on data table still work after index is disabled      
 
-        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " 
VALUES(?,?,?)");
-        stmt.setString(1, "a3");
-        stmt.setString(2, "x3");
-        stmt.setString(3, "3");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'";
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 
1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME));
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        // recreate index table
-        admin.createTable(indexTableDesc);
-        do {
-          Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-              new String[] { PTableType.INDEX.toString() });
-          assertTrue(rs.next());
-          
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-              break;
-          }
-          if(localIndex) {
-              rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
-                  new String[] { PTableType.INDEX.toString() });
-              assertTrue(rs.next());
-              
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-                  break;
-              }
-          }
-        } while(true);
-        
-        // verify index table has data
-        query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        // using 2 here because we only partially build index from where we 
failed and the oldest 
-        // index row has been deleted when we dropped the index table during 
test.
-        assertEquals(2, rs.getInt(1));
     }
     
     @Test(timeout=300000)
     public void testWriteFailureWithRegionServerDown() throws Exception {
-        String query;
-        ResultSet rs;
-
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL 
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        // find a RS which doesn't has CATALOG table
-        TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
-        TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
-        final HBaseCluster cluster = this.util.getHBaseCluster();
-        Collection<ServerName> rss = cluster.getClusterStatus().getServers();
-        HBaseAdmin admin = this.util.getHBaseAdmin();
-        List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
-        ServerName catalogRS = 
cluster.getServerHoldingRegion(regions.get(0).getRegionName());
-        ServerName metaRS = cluster.getServerHoldingMeta();
-        ServerName rsToBeKilled = null;
-        
-        // find first RS isn't holding META or CATALOG table
-        for(ServerName curRS : rss) {
-            if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
-                rsToBeKilled = curRS;
-                break;
-            }
+        try (Connection conn = driver.connect(url, props);) {
+               String query;
+               ResultSet rs;
+               conn.setAutoCommit(false);
+               conn.createStatement().execute(
+                       "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR 
NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+               query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+       
+               conn.createStatement().execute(
+                       "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+               query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+               rs = conn.createStatement().executeQuery(query);
+               assertFalse(rs.next());
+       
+               // Verify the metadata for index is correct.
+               rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                       new String[] { PTableType.INDEX.toString() });
+               assertTrue(rs.next());
+               assertEquals(INDEX_TABLE_NAME, rs.getString(3));
+               assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
+               assertFalse(rs.next());
+               
+               PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+               stmt.setString(1, "a");
+               stmt.setString(2, "x");
+               stmt.setString(3, "1");
+               stmt.execute();
+               conn.commit();
+               
+               // find a RS which doesn't has CATALOG table
+               TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
+               TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
+               final HBaseCluster cluster = this.util.getHBaseCluster();
+               Collection<ServerName> rss = 
cluster.getClusterStatus().getServers();
+               HBaseAdmin admin = this.util.getHBaseAdmin();
+               List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
+               ServerName catalogRS = 
cluster.getServerHoldingRegion(regions.get(0).getRegionName());
+               ServerName metaRS = cluster.getServerHoldingMeta();
+               ServerName rsToBeKilled = null;
+               
+               // find first RS isn't holding META or CATALOG table
+               for(ServerName curRS : rss) {
+                   if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
+                       rsToBeKilled = curRS;
+                       break;
+                   }
+               }
+               assertTrue(rsToBeKilled != null);
+               
+               regions = admin.getTableRegions(indexTable);
+               final HRegionInfo indexRegion = regions.get(0);
+               final ServerName dstRS = rsToBeKilled;
+               admin.move(indexRegion.getEncodedNameAsBytes(), 
Bytes.toBytes(rsToBeKilled.getServerName()));
+               this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() 
{
+                   @Override
+                   public boolean evaluate() throws Exception {
+                     ServerName sn = 
cluster.getServerHoldingRegion(indexRegion.getRegionName());
+                     return (sn != null && sn.equals(dstRS));
+                   }
+                 });
+               
+               // use timer sending updates in every 10ms
+               this.scheduleTimer = new Timer(true);
+               this.scheduleTimer.schedule(new 
SendingUpdatesScheduleTask(conn), 0, 10);
+               // let timer sending some updates
+               Thread.sleep(100);
+               
+               // kill RS hosting index table
+               this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
+               
+               // wait for index table completes recovery
+               this.util.waitUntilAllRegionsAssigned(indexTable);
+               
+               // Verify the metadata for index is correct.       
+               do {
+                 Thread.sleep(15 * 1000); // sleep 15 secs
+                 rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
+                     new String[] { PTableType.INDEX.toString() });
+                 assertTrue(rs.next());
+                 
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                     break;
+                 }
+               } while(true);
+               this.scheduleTimer.cancel();
+               
+               assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
         }
-        assertTrue(rsToBeKilled != null);
-        
-        regions = admin.getTableRegions(indexTable);
-        final HRegionInfo indexRegion = regions.get(0);
-        final ServerName dstRS = rsToBeKilled;
-        admin.move(indexRegion.getEncodedNameAsBytes(), 
Bytes.toBytes(rsToBeKilled.getServerName()));
-        this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
-            @Override
-            public boolean evaluate() throws Exception {
-              ServerName sn = 
cluster.getServerHoldingRegion(indexRegion.getRegionName());
-              return (sn != null && sn.equals(dstRS));
-            }
-          });
-        
-        // use timer sending updates in every 10ms
-        this.scheduleTimer = new Timer(true);
-        this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 
10);
-        // let timer sending some updates
-        Thread.sleep(100);
-        
-        // kill RS hosting index table
-        this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
-        
-        // wait for index table completes recovery
-        this.util.waitUntilAllRegionsAssigned(indexTable);
-        
-        // Verify the metadata for index is correct.       
-        do {
-          Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-              new String[] { PTableType.INDEX.toString() });
-          assertTrue(rs.next());
-          
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-              break;
-          }
-        } while(true);
-        this.scheduleTimer.cancel();
-        
-        assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
     }
     
     static class SendingUpdatesScheduleTask extends TimerTask {

Reply via email to