http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index e2af717,364b358..aee729f
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@@ -73,11 -71,9 +73,12 @@@ import org.apache.phoenix.util.SchemaUt
  import org.apache.phoenix.util.StringUtil;
  import org.junit.After;
  import org.junit.Before;
+ import org.junit.Ignore;
  import org.junit.Test;
  import org.junit.experimental.categories.Category;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +import org.junit.runners.Parameterized.Parameters;
  /**
   * 
   * Test for failure of region server to write to index table.
@@@ -153,277 -132,233 +154,281 @@@ public class MutableIndexFailureIT exte
          }
      }
  
+     @Ignore("See PHOENIX-2331")
      @Test(timeout=300000)
      public void testWriteFailureDisablesLocalIndex() throws Exception {
 -        testWriteFailureDisablesIndex(true);
 +      helpTestWriteFailureDisablesIndex(true);
      }
-     
+  
+     @Ignore("See PHOENIX-2332")
      @Test(timeout=300000)
 -    public void testWriteFailureDisablesIndex() throws Exception {
 -        testWriteFailureDisablesIndex(false);
 +    public void testWriteFailureDisablesGlobalIndex() throws Exception {
 +      helpTestWriteFailureDisablesIndex(false);
      }
 -    
 -    public void testWriteFailureDisablesIndex(boolean localIndex) throws 
Exception {
 -        String query;
 -        ResultSet rs;
  
 +    private void helpTestWriteFailureDisablesIndex(boolean localIndex) throws 
Exception {
          Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 -        Connection conn = driver.connect(url, props);
 -        conn.setAutoCommit(false);
 -        conn.createStatement().execute(
 -                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT 
NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
 -        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
 -        rs = conn.createStatement().executeQuery(query);
 -        assertFalse(rs.next());
 -
 -        if(localIndex) {
 -            conn.createStatement().execute(
 -                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
 -            conn.createStatement().execute(
 -                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + 
DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
 -        } else {
 -            conn.createStatement().execute(
 -                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
 +        try (Connection conn = driver.connect(url, props);) {
 +              String query;
 +              ResultSet rs;
 +              conn.setAutoCommit(false);
 +              conn.createStatement().execute(
 +                      "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR 
NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
 +              query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
 +              rs = conn.createStatement().executeQuery(query);
 +              assertFalse(rs.next());
 +      
 +              if(localIndex) {
 +                  conn.createStatement().execute(
 +                      "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
 +                  conn.createStatement().execute(
 +                      "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " 
+ DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
 +              } else {
 +                  conn.createStatement().execute(
 +                      "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
 +              }
 +                  
 +              query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
 +              rs = conn.createStatement().executeQuery(query);
 +              assertFalse(rs.next());
 +      
 +              // Verify the metadata for index is correct.
 +              rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 +                      new String[] { PTableType.INDEX.toString() });
 +              assertTrue(rs.next());
 +              assertEquals(INDEX_TABLE_NAME, rs.getString(3));
 +              assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
 +              assertFalse(rs.next());
 +              
 +              PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 +              stmt.setString(1, "a");
 +              stmt.setString(2, "x");
 +              stmt.setString(3, "1");
 +              stmt.execute();
 +              conn.commit();
 +      
 +              TableName indexTable =
 +                      TableName.valueOf(localIndex ? MetaDataUtil
 +                              .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : 
INDEX_TABLE_FULL_NAME);
 +              HBaseAdmin admin = this.util.getHBaseAdmin();
 +              HTableDescriptor indexTableDesc = 
admin.getTableDescriptor(indexTable);
 +              try{
 +                admin.disableTable(indexTable);
 +                admin.deleteTable(indexTable);
 +              } catch (TableNotFoundException ignore) {}
 +      
 +              stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 +              stmt.setString(1, "a2");
 +              stmt.setString(2, "x2");
 +              stmt.setString(3, "2");
 +              stmt.execute();
 +              
 +              if (transactional) {
 +                  try {
 +                      conn.commit();
 +                      fail();
 +                  } catch (SQLException e1) {
 +                              try {
 +                                      conn.rollback();
 +                                      fail();
 +                              } catch (SQLException e2) {
 +                                      // rollback fails as well because index 
is disabled
 +                              }
 +                  }
 +              }
 +              else {
 +                      conn.commit();
 +              }
 +              
 +              // Verify the metadata for index is correct.
 +              rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 +                      new String[] { PTableType.INDEX.toString() });
 +              assertTrue(rs.next());
 +              assertEquals(INDEX_TABLE_NAME, rs.getString(3));
 +              // if the table is transactional, the index will not be 
disabled if there is a failure
 +              PIndexState indexState = transactional ? PIndexState.ACTIVE : 
PIndexState.DISABLE;
 +                      assertEquals(indexState.toString(), 
rs.getString("INDEX_STATE"));
 +              assertFalse(rs.next());
 +              if(localIndex) {
 +                  rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
 +                      new String[] { PTableType.INDEX.toString() });
 +                  assertTrue(rs.next());
 +                  assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
 +                  assertEquals(indexState.toString(), 
rs.getString("INDEX_STATE"));
 +                  assertFalse(rs.next());
 +              }
 +      
 +              // if the table is transactional the write to the index table  
will fail because the index has not been disabled
 +              if (!transactional) {
 +                      // Verify UPSERT on data table still work after index 
is disabled       
 +                      stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 +                      stmt.setString(1, "a3");
 +                      stmt.setString(2, "x3");
 +                      stmt.setString(3, "3");
 +                      stmt.execute();
 +                      conn.commit();
 +              }
 +              
 +              if (transactional) {
 +                      // if the table was transactional there should be 1 row 
(written before the index was disabled)
 +                      query = "SELECT /*+ NO_INDEX */ v2 FROM " + 
DATA_TABLE_FULL_NAME;
 +                      rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
 +                      String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN 
OVER " + DATA_TABLE_FULL_NAME;
 +                              assertEquals(expectedPlan, 
QueryUtil.getExplainPlan(rs));
 +                      rs = conn.createStatement().executeQuery(query);
 +                      assertTrue(rs.next());
 +                      assertEquals("1", rs.getString(1));
 +                      assertFalse(rs.next());
 +              } else {
 +                      // if the table was not transactional there should be 
three rows (all writes to data table should succeed)
 +                      query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME;
 +                      rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
 +                      String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN 
OVER " + DATA_TABLE_FULL_NAME;
 +                              assertEquals(expectedPlan, 
QueryUtil.getExplainPlan(rs));
 +                      rs = conn.createStatement().executeQuery(query);
 +                      assertTrue(rs.next());
 +                      assertEquals("1", rs.getString(1));
 +                      assertTrue(rs.next());
 +                      assertEquals("2", rs.getString(1));
 +                      assertTrue(rs.next());
 +                      assertEquals("3", rs.getString(1));
 +                      assertFalse(rs.next());
 +              }
 +              
 +              // recreate index table
 +              admin.createTable(indexTableDesc);
 +              do {
 +                rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 +                    new String[] { PTableType.INDEX.toString() });
 +                assertTrue(rs.next());
 +                
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
 +                    break;
 +                }
 +                if(localIndex) {
 +                    rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
 +                        new String[] { PTableType.INDEX.toString() });
 +                    assertTrue(rs.next());
 +                    
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
 +                        break;
 +                    }
 +                }
 +                Thread.sleep(15 * 1000); // sleep 15 secs
 +              } while(true);
 +              
 +              // Verify UPSERT on data table still work after index table is 
recreated       
 +              stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 +              stmt.setString(1, "a4");
 +              stmt.setString(2, "x4");
 +              stmt.setString(3, "4");
 +              stmt.execute();
 +              conn.commit();
 +              
 +              // verify index table has data
 +              query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
 +              rs = conn.createStatement().executeQuery(query);
 +              assertTrue(rs.next());
 +              
 +              // for txn tables there will be only one row in the index (a4)
 +              // for non txn tables there will be three rows because we only 
partially build index from where we failed and the oldest 
 +              // index row has been deleted when we dropped the index table 
during test
 +              assertEquals( transactional ? 1: 3, rs.getInt(1));
          }
 -            
 -        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
 -        rs = conn.createStatement().executeQuery(query);
 -        assertFalse(rs.next());
 -
 -        // Verify the metadata for index is correct.
 -        rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 -                new String[] { PTableType.INDEX.toString() });
 -        assertTrue(rs.next());
 -        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
 -        assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
 -        assertFalse(rs.next());
 -        
 -        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 -        stmt.setString(1, "a");
 -        stmt.setString(2, "x");
 -        stmt.setString(3, "1");
 -        stmt.execute();
 -        conn.commit();
 -
 -        TableName indexTable =
 -                TableName.valueOf(localIndex ? MetaDataUtil
 -                        .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : 
INDEX_TABLE_FULL_NAME);
 -        HBaseAdmin admin = this.util.getHBaseAdmin();
 -        HTableDescriptor indexTableDesc = 
admin.getTableDescriptor(indexTable);
 -        try{
 -          admin.disableTable(indexTable);
 -          admin.deleteTable(indexTable);
 -        } catch (TableNotFoundException ignore) {}
 -
 -        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + 
" VALUES(?,?,?)");
 -        stmt.setString(1, "a2");
 -        stmt.setString(2, "x2");
 -        stmt.setString(3, "2");
 -        stmt.execute();
 -        try {
 -            conn.commit();
 -        } catch (SQLException e) {}
 -
 -        // Verify the metadata for index is correct.
 -        rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 -                new String[] { PTableType.INDEX.toString() });
 -        assertTrue(rs.next());
 -        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
 -        assertEquals(PIndexState.DISABLE.toString(), 
rs.getString("INDEX_STATE"));
 -        assertFalse(rs.next());
 -        if(localIndex) {
 -            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
 -                new String[] { PTableType.INDEX.toString() });
 -            assertTrue(rs.next());
 -            assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
 -            assertEquals(PIndexState.DISABLE.toString(), 
rs.getString("INDEX_STATE"));
 -            assertFalse(rs.next());
 -        }
 -
 -        // Verify UPSERT on data table still work after index is disabled     
  
 -        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + 
" VALUES(?,?,?)");
 -        stmt.setString(1, "a3");
 -        stmt.setString(2, "x3");
 -        stmt.setString(3, "3");
 -        stmt.execute();
 -        conn.commit();
 -        
 -        query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'";
 -        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
 -        assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 
1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME));
 -        rs = conn.createStatement().executeQuery(query);
 -        assertTrue(rs.next());
 -        
 -        // recreate index table
 -        admin.createTable(indexTableDesc);
 -        do {
 -          Thread.sleep(15 * 1000); // sleep 15 secs
 -          rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 -              new String[] { PTableType.INDEX.toString() });
 -          assertTrue(rs.next());
 -          
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
 -              break;
 -          }
 -          if(localIndex) {
 -              rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
 -                  new String[] { PTableType.INDEX.toString() });
 -              assertTrue(rs.next());
 -              
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
 -                  break;
 -              }
 -          }
 -        } while(true);
 -        
 -        // verify index table has data
 -        query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
 -        rs = conn.createStatement().executeQuery(query);
 -        assertTrue(rs.next());
 -        
 -        // using 2 here because we only partially build index from where we 
failed and the oldest 
 -        // index row has been deleted when we dropped the index table during 
test.
 -        assertEquals(2, rs.getInt(1));
      }
      
      @Test(timeout=300000)
      public void testWriteFailureWithRegionServerDown() throws Exception {
 -        String query;
 -        ResultSet rs;
 -
          Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 -        Connection conn = driver.connect(url, props);
 -        conn.setAutoCommit(false);
 -        conn.createStatement().execute(
 -                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT 
NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
 -        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
 -        rs = conn.createStatement().executeQuery(query);
 -        assertFalse(rs.next());
 -
 -        conn.createStatement().execute(
 -                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
 -        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
 -        rs = conn.createStatement().executeQuery(query);
 -        assertFalse(rs.next());
 -
 -        // Verify the metadata for index is correct.
 -        rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 -                new String[] { PTableType.INDEX.toString() });
 -        assertTrue(rs.next());
 -        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
 -        assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
 -        assertFalse(rs.next());
 -        
 -        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 -        stmt.setString(1, "a");
 -        stmt.setString(2, "x");
 -        stmt.setString(3, "1");
 -        stmt.execute();
 -        conn.commit();
 -        
 -        // find a RS which doesn't has CATALOG table
 -        TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
 -        TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
 -        final HBaseCluster cluster = this.util.getHBaseCluster();
 -        Collection<ServerName> rss = cluster.getClusterStatus().getServers();
 -        HBaseAdmin admin = this.util.getHBaseAdmin();
 -        List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
 -        ServerName catalogRS = 
cluster.getServerHoldingRegion(regions.get(0).getTable(),
 -                regions.get(0).getRegionName());
 -        ServerName metaRS = cluster.getServerHoldingMeta();
 -        ServerName rsToBeKilled = null;
 -        
 -        // find first RS isn't holding META or CATALOG table
 -        for(ServerName curRS : rss) {
 -            if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
 -                rsToBeKilled = curRS;
 -                break;
 -            }
 +        try (Connection conn = driver.connect(url, props);) {
 +              String query;
 +              ResultSet rs;
 +              conn.setAutoCommit(false);
 +              conn.createStatement().execute(
 +                      "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR 
NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
 +              query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
 +              rs = conn.createStatement().executeQuery(query);
 +              assertFalse(rs.next());
 +      
 +              conn.createStatement().execute(
 +                      "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
 +              query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
 +              rs = conn.createStatement().executeQuery(query);
 +              assertFalse(rs.next());
 +      
 +              // Verify the metadata for index is correct.
 +              rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 +                      new String[] { PTableType.INDEX.toString() });
 +              assertTrue(rs.next());
 +              assertEquals(INDEX_TABLE_NAME, rs.getString(3));
 +              assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
 +              assertFalse(rs.next());
 +              
 +              PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
 +              stmt.setString(1, "a");
 +              stmt.setString(2, "x");
 +              stmt.setString(3, "1");
 +              stmt.execute();
 +              conn.commit();
 +              
 +              // find a RS which doesn't has CATALOG table
 +              TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
 +              TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
 +              final HBaseCluster cluster = this.util.getHBaseCluster();
 +              Collection<ServerName> rss = 
cluster.getClusterStatus().getServers();
 +              HBaseAdmin admin = this.util.getHBaseAdmin();
 +              List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
-               ServerName catalogRS = 
cluster.getServerHoldingRegion(regions.get(0).getRegionName());
++              ServerName catalogRS = 
cluster.getServerHoldingRegion(regions.get(0).getTable(),
++                      regions.get(0).getRegionName());
 +              ServerName metaRS = cluster.getServerHoldingMeta();
 +              ServerName rsToBeKilled = null;
 +              
 +              // find first RS isn't holding META or CATALOG table
 +              for(ServerName curRS : rss) {
 +                  if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
 +                      rsToBeKilled = curRS;
 +                      break;
 +                  }
 +              }
 +              assertTrue(rsToBeKilled != null);
 +              
 +              regions = admin.getTableRegions(indexTable);
 +              final HRegionInfo indexRegion = regions.get(0);
 +              final ServerName dstRS = rsToBeKilled;
 +              admin.move(indexRegion.getEncodedNameAsBytes(), 
Bytes.toBytes(rsToBeKilled.getServerName()));
 +              this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() 
{
 +                  @Override
 +                  public boolean evaluate() throws Exception {
-                     ServerName sn = 
cluster.getServerHoldingRegion(indexRegion.getRegionName());
++                    ServerName sn = 
cluster.getServerHoldingRegion(indexRegion.getTable(),
++                              indexRegion.getRegionName());
 +                    return (sn != null && sn.equals(dstRS));
 +                  }
 +                });
 +              
 +              // use timer sending updates in every 10ms
 +              this.scheduleTimer = new Timer(true);
 +              this.scheduleTimer.schedule(new 
SendingUpdatesScheduleTask(conn), 0, 10);
 +              // let timer sending some updates
 +              Thread.sleep(100);
 +              
 +              // kill RS hosting index table
 +              this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
 +              
 +              // wait for index table completes recovery
 +              this.util.waitUntilAllRegionsAssigned(indexTable);
 +              
 +              // Verify the metadata for index is correct.       
 +              do {
 +                Thread.sleep(15 * 1000); // sleep 15 secs
 +                rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 +                    new String[] { PTableType.INDEX.toString() });
 +                assertTrue(rs.next());
 +                
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
 +                    break;
 +                }
 +              } while(true);
 +              this.scheduleTimer.cancel();
 +              
 +              assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
          }
 -        assertTrue(rsToBeKilled != null);
 -        
 -        regions = admin.getTableRegions(indexTable);
 -        final HRegionInfo indexRegion = regions.get(0);
 -        final ServerName dstRS = rsToBeKilled;
 -        admin.move(indexRegion.getEncodedNameAsBytes(), 
Bytes.toBytes(rsToBeKilled.getServerName()));
 -        this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
 -            @Override
 -            public boolean evaluate() throws Exception {
 -              ServerName sn = 
cluster.getServerHoldingRegion(indexRegion.getTable(),
 -                      indexRegion.getRegionName());
 -              return (sn != null && sn.equals(dstRS));
 -            }
 -          });
 -        
 -        // use timer sending updates in every 10ms
 -        this.scheduleTimer = new Timer(true);
 -        this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 
10);
 -        // let timer sending some updates
 -        Thread.sleep(100);
 -        
 -        // kill RS hosting index table
 -        this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
 -        
 -        // wait for index table completes recovery
 -        this.util.waitUntilAllRegionsAssigned(indexTable);
 -        
 -        // Verify the metadata for index is correct.       
 -        do {
 -          Thread.sleep(15 * 1000); // sleep 15 secs
 -          rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
 -              new String[] { PTableType.INDEX.toString() });
 -          assertTrue(rs.next());
 -          
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
 -              break;
 -          }
 -        } while(true);
 -        this.scheduleTimer.cancel();
 -        
 -        assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
      }
      
      static class SendingUpdatesScheduleTask extends TimerTask {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 0000000,e0f0a3c..b3b3316
mode 000000,100644..100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@@ -1,0 -1,318 +1,318 @@@
+ /*
+  * Copyright 2014 The Apache Software Foundation
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  *distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you maynot use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicablelaw or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.execute;
+ 
+ import static com.google.common.collect.Lists.newArrayList;
+ import static com.google.common.collect.Sets.newHashSet;
+ import static java.util.Collections.singletonList;
+ import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
+ import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+ import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+ import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+ import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+ import static 
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+ import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+ import static org.junit.Assert.assertArrayEquals;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ import java.io.IOException;
+ import java.sql.Connection;
+ import java.sql.Driver;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+ import java.util.Comparator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hbase.DoNotRetryIOException;
+ import org.apache.hadoop.hbase.HBaseIOException;
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.client.Delete;
+ import org.apache.hadoop.hbase.client.Durability;
+ import org.apache.hadoop.hbase.client.Put;
+ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+ import org.apache.phoenix.hbase.index.Indexer;
+ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+ import org.apache.phoenix.jdbc.PhoenixConnection;
+ import org.apache.phoenix.query.QueryServices;
+ import org.apache.phoenix.schema.TableRef;
+ import org.apache.phoenix.util.PhoenixRuntime;
+ import org.apache.phoenix.util.ReadOnlyProps;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ 
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.Maps;
+ 
+ @Category(NeedsOwnMiniClusterTest.class)
+ public class PartialCommitIT {
+     
+     private static final String TABLE_NAME_TO_FAIL = 
"b_failure_table".toUpperCase();
+     private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
+     private static final String UPSERT_TO_FAIL = "upsert into " + 
TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
+     private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + 
TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
+     private static final String DELETE_TO_FAIL = "delete from " + 
TABLE_NAME_TO_FAIL + "  where k='z'";
+     private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+     private static String url;
+     private static Driver driver;
+     private static final Properties props = new Properties();
+     
+     static {
+         props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+     }
+     
+     @BeforeClass
+     public static void setupCluster() throws Exception {
+       Configuration conf = TEST_UTIL.getConfiguration();
+       setUpConfigForMiniCluster(conf);
+       conf.setClass("hbase.coprocessor.region.classes", 
FailingRegionObserver.class, RegionObserver.class);
+       conf.setBoolean("hbase.coprocessor.abortonerror", false);
+       conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+       TEST_UTIL.startMiniCluster();
+       String clientPort = 
TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+       url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + 
JDBC_PROTOCOL_SEPARATOR + clientPort
+               + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+ 
+       Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+       // Must update config before starting server
+       props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+       driver = initAndRegisterDriver(url, new 
ReadOnlyProps(props.entrySet().iterator()));
+       createTablesWithABitOfData();
+     }
+     
+     private static void createTablesWithABitOfData() throws Exception {
+         Properties props = new Properties();
+         props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+ 
+         try (Connection con = driver.connect(url, new Properties())) {
+             Statement sta = con.createStatement();
+             sta.execute("create table a_success_table (k varchar primary key, 
c varchar)");
+             sta.execute("create table b_failure_table (k varchar primary key, 
c varchar)");
+             sta.execute("create table c_success_table (k varchar primary key, 
c varchar)");
+             con.commit();
+         }
+ 
+         props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+ 
+         try (Connection con = driver.connect(url, new Properties())) {
+             con.setAutoCommit(false);
+             Statement sta = con.createStatement();
+             for (String table : newHashSet("a_success_table", 
TABLE_NAME_TO_FAIL, "c_success_table")) {
+                 sta.execute("upsert into " + table + " values ('z', 'z')");
+                 sta.execute("upsert into " + table + " values ('zz', 'zz')");
+                 sta.execute("upsert into " + table + " values ('zzz', 
'zzz')");
+             }
+             con.commit();
+         }
+     }
+     
+     @AfterClass
+     public static void teardownCluster() throws Exception {
+       TEST_UTIL.shutdownMiniCluster();
+     }
+     
+     @Test
+     public void testNoFailure() {
+         testPartialCommit(singletonList("upsert into a_success_table values 
('testNoFailure', 'a')"), 0, new int[0], false,
+                                         singletonList("select count(*) from 
a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
+     }
+     
+     @Test
+     public void testUpsertFailure() {
+         testPartialCommit(newArrayList("upsert into a_success_table values 
('testUpsertFailure1', 'a')", 
+                                        UPSERT_TO_FAIL, 
+                                        "upsert into a_success_table values 
('testUpsertFailure2', 'b')"), 
+                                        1, new int[]{1}, true,
+                                        newArrayList("select count(*) from 
a_success_table where k like 'testUpsertFailure_'",
+                                                     "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                        newArrayList(new Integer(2), new 
Integer(0)));
+     }
+     
+     @Test
+     public void testUpsertSelectFailure() throws SQLException {
+         props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+ 
+         try (Connection con = driver.connect(url, new Properties())) {
+             con.createStatement().execute("upsert into a_success_table values 
('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
+             con.commit();
+         }
+         
+         testPartialCommit(newArrayList("upsert into a_success_table values 
('testUpsertSelectFailure', 'a')", 
+                                        UPSERT_SELECT_TO_FAIL), 
+                                        1, new int[]{1}, true,
+                                        newArrayList("select count(*) from 
a_success_table where k in ('testUpsertSelectFailure', '" + 
Bytes.toString(ROW_TO_FAIL) + "')",
+                                                     "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                        newArrayList(new Integer(2), new 
Integer(0)));
+     }
+     
+     @Test
+     public void testDeleteFailure() {
+         testPartialCommit(newArrayList("upsert into a_success_table values 
('testDeleteFailure1', 'a')", 
+                                        DELETE_TO_FAIL,
+                                        "upsert into a_success_table values 
('testDeleteFailure2', 'b')"), 
+                                        1, new int[]{1}, true,
+                                        newArrayList("select count(*) from 
a_success_table where k like 'testDeleteFailure_'",
+                                                     "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                        newArrayList(new Integer(2), new 
Integer(1)));
+     }
+     
+     /**
+      * {@link MutationState} keeps mutations ordered lexicographically by 
table name.
+      */
+     @Test
+     public void testOrderOfMutationsIsPredicatable() {
+         testPartialCommit(newArrayList("upsert into c_success_table values 
('testOrderOfMutationsIsPredicatable', 'c')", // will fail because 
c_success_table is after b_failure_table by table sort order
+                                        UPSERT_TO_FAIL, 
+                                        "upsert into a_success_table values 
('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because 
a_success_table is before b_failure_table by table sort order
+                                        2, new int[]{0,1}, true,
+                                        newArrayList("select count(*) from 
c_success_table where k='testOrderOfMutationsIsPredicatable'",
+                                                     "select count(*) from 
a_success_table where k='testOrderOfMutationsIsPredicatable'",
+                                                     "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                        newArrayList(new Integer(0), new 
Integer(1), new Integer(0)));
+     }
+     
+     @Test
+     public void checkThatAllStatementTypesMaintainOrderInConnection() {
+         testPartialCommit(newArrayList("upsert into a_success_table values 
('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", 
+                                        "upsert into a_success_table select k, 
c from c_success_table",
+                                        DELETE_TO_FAIL,
+                                        "select * from a_success_table", 
+                                        UPSERT_TO_FAIL), 
+                                        2, new int[]{2,4}, true,
+                                        newArrayList("select count(*) from 
a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", 
// rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
+                                                     "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
+                                                     "select count(*) from " + 
TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                        newArrayList(new Integer(4), new 
Integer(0), new Integer(1)));
+     }
+     
+     private void testPartialCommit(List<String> statements, int failureCount, 
int[] expectedUncommittedStatementIndexes, boolean willFail,
+                                    List<String> 
countStatementsForVerification, List<Integer> expectedCountsForVerification) {
+         Preconditions.checkArgument(countStatementsForVerification.size() == 
expectedCountsForVerification.size());
+         
+         try (Connection con = 
getConnectionWithTableOrderPreservingMutationState()) {
+             con.setAutoCommit(false);
+             Statement sta = con.createStatement();
+             for (String statement : statements) {
+                 sta.execute(statement);
+             }
+             try {
+                 con.commit();
+                 if (willFail) {
+                     fail("Expected at least one statement in the list to 
fail");
+                 } else {
+                     assertEquals(0, 
con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should 
have been reset to 0 in commit()
+                 }
+             } catch (SQLException sqle) {
+                 if (!willFail) {
+                     fail("Expected no statements to fail");
+                 }
+                 assertEquals(CommitException.class, sqle.getClass());
+                 int[] uncommittedStatementIndexes = 
((CommitException)sqle).getUncommittedStatementIndexes();
+                 assertEquals(failureCount, 
uncommittedStatementIndexes.length);
+                 assertArrayEquals(expectedUncommittedStatementIndexes, 
uncommittedStatementIndexes);
+             }
+             
+             // verify data in HBase
+             for (int i = 0; i < countStatementsForVerification.size(); i++) {
+                 String countStatement = countStatementsForVerification.get(i);
+                 ResultSet rs = sta.executeQuery(countStatement);
+                 if (!rs.next()) {
+                     fail("Expected a single row from count query");
+                 }
+                 assertEquals(expectedCountsForVerification.get(i).intValue(), 
rs.getInt(1));
+             }
+         } catch (SQLException e) {
+             fail(e.toString());
+         }
+     }
+     
+     private PhoenixConnection 
getConnectionWithTableOrderPreservingMutationState() throws SQLException {
+         Connection con = driver.connect(url, new Properties());
+         PhoenixConnection phxCon = new 
PhoenixConnection(con.unwrap(PhoenixConnection.class));
+         final 
Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = 
Maps.newTreeMap(new TableRefComparator());
+         return new PhoenixConnection(phxCon) {
+             @Override
+             protected MutationState newMutationState(int maxSize) {
 -                return new MutationState(maxSize, this, mutations);
++                return new MutationState(maxSize, this, mutations, null);
+             };
+         };
+     }
+     
+     public static class FailingRegionObserver extends SimpleRegionObserver {
+         @Override
+         public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, 
Put put, WALEdit edit,
+                 final Durability durability) throws HBaseIOException {
+             if (shouldFailUpsert(c, put)) {
+                 // throwing anything other than instances of IOException 
result
+                 // in this coprocessor being unloaded
+                 // DoNotRetryIOException tells HBase not to retry this 
mutation
+                 // multiple times
+                 throw new DoNotRetryIOException();
+             }
+         }
+         
+         @Override
+         public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c,
+                 Delete delete, WALEdit edit, Durability durability) throws 
IOException {
+             if (shouldFailDelete(c, delete)) {
+                 // throwing anything other than instances of IOException 
result
+                 // in this coprocessor being unloaded
+                 // DoNotRetryIOException tells HBase not to retry this 
mutation
+                 // multiple times
+                 throw new DoNotRetryIOException();
+             }
+         }
+         
+         private boolean 
shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+             String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+             return TABLE_NAME_TO_FAIL.equals(tableName) && 
Bytes.equals(ROW_TO_FAIL, put.getRow());
+         }
+         
+         private boolean 
shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete 
delete) {
+             String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+             return TABLE_NAME_TO_FAIL.equals(tableName) &&  
+                 // Phoenix deletes are sent as Mutations with empty values
+                 
delete.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0 
&&
+                 
delete.getFamilyCellMap().firstEntry().getValue().get(0).getQualifierLength() 
== 0;
+         }
+     }
+     
+     /**
+      * Used for ordering {@link MutationState#mutations} map.
+      */
+     private static class TableRefComparator implements Comparator<TableRef> {
+         @Override
+         public int compare(TableRef tr1, TableRef tr2) {
+             return 
tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 1cdd508,6b2309e..2396719
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@@ -312,11 -312,11 +312,11 @@@ public class EndToEndCoveredColumnsInde
      HTable primary = new HTable(UTIL.getConfiguration(), tableNameBytes);
  
      // overwrite the codec so we can verify the current state
-     HRegion region = 
UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
+     Region region = 
UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
      Indexer indexer =
          (Indexer) 
region.getCoprocessorHost().findCoprocessor(Indexer.class.getName());
 -    CoveredColumnsIndexBuilder builder =
 -        (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting();
 +    NonTxIndexBuilder builder =
 +        (NonTxIndexBuilder) indexer.getBuilderForTesting();
      VerifyingIndexCodec codec = new VerifyingIndexCodec();
      builder.setIndexCodecForTesting(codec);
  

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
index 0000000,138c75d..acf01dc
mode 000000,100644..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java
@@@ -1,0 -1,82 +1,72 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.compile;
+ 
+ import java.sql.ParameterMetaData;
+ import java.sql.SQLException;
+ import java.sql.SQLFeatureNotSupportedException;
+ import java.util.Collections;
+ 
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.phoenix.execute.MutationState;
+ import org.apache.phoenix.jdbc.PhoenixConnection;
+ import org.apache.phoenix.jdbc.PhoenixStatement;
+ import org.apache.phoenix.parse.CreateFunctionStatement;
+ import org.apache.phoenix.schema.MetaDataClient;
+ 
+ public class CreateFunctionCompiler {
+ 
+     private final PhoenixStatement statement;
+     
+     public CreateFunctionCompiler(PhoenixStatement statement) {
+         this.statement = statement;
+     }
+ 
+     public MutationPlan compile(final CreateFunctionStatement create) throws 
SQLException {
+         final PhoenixConnection connection = statement.getConnection();
+         PhoenixConnection connectionToBe = connection;
+         final StatementContext context = new StatementContext(statement);
+         final MetaDataClient client = new MetaDataClient(connectionToBe);
+         
 -        return new MutationPlan() {
 -
 -            @Override
 -            public ParameterMetaData getParameterMetaData() {
 -                return context.getBindManager().getParameterMetaData();
 -            }
++        return new BaseMutationPlan(context, create.getOperation()) {
+ 
+             @Override
+             public MutationState execute() throws SQLException {
+                 try {
+                     return client.createFunction(create);
+                 } finally {
+                     if (client.getConnection() != connection) {
+                         client.getConnection().close();
+                     }
+                 }
+             }
+ 
+             @Override
+             public ExplainPlan getExplainPlan() throws SQLException {
+                 return new ExplainPlan(Collections.singletonList("CREATE"
+                         + (create.getFunctionInfo().isReplace() ? " OR 
REPLACE" : "")
+                         + " FUNCTION"));
+             }
+ 
+             @Override
 -            public PhoenixConnection getConnection() {
 -                return connection;
 -            }
 -            
 -            @Override
+             public StatementContext getContext() {
+                 return context;
+             }
+         };
+     }
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index 49e9059,a5adc49..1ffd36e
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@@ -40,7 -41,7 +40,8 @@@ import org.apache.phoenix.expression.Ro
  import 
org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor;
  import org.apache.phoenix.jdbc.PhoenixConnection;
  import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+ import org.apache.phoenix.parse.BindParseNode;
  import org.apache.phoenix.parse.ColumnParseNode;
  import org.apache.phoenix.parse.CreateTableStatement;
  import org.apache.phoenix.parse.ParseNode;
@@@ -62,12 -67,11 +67,13 @@@ import com.google.common.collect.Iterat
  
  
  public class CreateTableCompiler {
+     private static final PDatum VARBINARY_DATUM = new VarbinaryDatum();
      private final PhoenixStatement statement;
 +    private final Operation operation;
      
 -    public CreateTableCompiler(PhoenixStatement statement) {
 +    public CreateTableCompiler(PhoenixStatement statement, Operation 
operation) {
          this.statement = statement;
 +        this.operation = operation;
      }
  
      public MutationPlan compile(final CreateTableStatement create) throws 
SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f0ba8e9,96588d1..bbc16b2
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@@ -1,5 -1,5 +1,4 @@@
  /*
-- * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
@@@ -90,15 -91,14 +92,16 @@@ public class DeleteCompiler 
      private static ParseNodeFactory FACTORY = new ParseNodeFactory();
      
      private final PhoenixStatement statement;
 +    private final Operation operation;
      
 -    public DeleteCompiler(PhoenixStatement statement) {
 +    public DeleteCompiler(PhoenixStatement statement, Operation operation) {
          this.statement = statement;
 +        this.operation = operation;
      }
      
-     private static MutationState deleteRows(PhoenixStatement statement, 
TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, 
RowProjector projector, TableRef sourceTableRef) throws SQLException {
+     private static MutationState deleteRows(StatementContext childContext, 
TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, 
RowProjector projector, TableRef sourceTableRef) throws SQLException {
          PTable table = targetTableRef.getTable();
+         PhoenixStatement statement = childContext.getStatement();
          PhoenixConnection connection = statement.getConnection();
          PName tenantId = connection.getTenantId();
          byte[] tenantIdBytes = null;
@@@ -345,12 -331,12 +353,12 @@@
                          hint, false, aliasedNodes, delete.getWhere(), 
                          Collections.<ParseNode>emptyList(), null, 
                          delete.getOrderBy(), delete.getLimit(),
-                         delete.getBindCount(), false, false);
+                         delete.getBindCount(), false, false, 
Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes());
 -                select = StatementNormalizer.normalize(select, resolver);
 -                SelectStatement transformedSelect = 
SubqueryRewriter.transform(select, resolver, connection);
 +                select = StatementNormalizer.normalize(select, resolverToBe);
 +                SelectStatement transformedSelect = 
SubqueryRewriter.transform(select, resolverToBe, connection);
                  if (transformedSelect != select) {
 -                    resolver = 
FromCompiler.getResolverForQuery(transformedSelect, connection);
 -                    select = StatementNormalizer.normalize(transformedSelect, 
resolver);
 +                    resolverToBe = 
FromCompiler.getResolverForQuery(transformedSelect, connection);
 +                    select = StatementNormalizer.normalize(transformedSelect, 
resolverToBe);
                  }
                  parallelIteratorFactory = hasLimit ? null : new 
DeletingParallelIteratorFactory(connection);
                  QueryOptimizer optimizer = new QueryOptimizer(services);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 855915d,dd92b00..a9dd616
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@@ -492,7 -651,7 +658,7 @@@ public class FromCompiler 
                      PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                      null, null, columns, null, null, 
Collections.<PTable>emptyList(),
                      false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
--                    null, null, false);
++                    null, null, false, false);
  
              String alias = subselectNode.getAlias();
              TableRef tableRef = new TableRef(alias, t, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index daec761,e4ca5d9..b55e4aa
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@@ -1296,8 -1297,12 +1297,12 @@@ public class JoinCompiler 
          }
          
          return PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(),
-                 
PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), 
right.getName().getString())), left.getType(), left.getIndexState(), 
left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), 
left.getBucketNum(), merged,
-                 left.getParentSchemaName(), left.getParentTableName(), 
left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), 
null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), 
left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), 
left.getIndexType(), left.isTransactional());
+                 
PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), 
right.getName().getString())),
+                 left.getType(), left.getIndexState(), left.getTimeStamp(), 
left.getSequenceNumber(), left.getPKName(), 
+                 left.getBucketNum(), merged,left.getParentSchemaName(), 
left.getParentTableName(), left.getIndexes(),
+                 left.isImmutableRows(), Collections.<PName>emptyList(), null, 
null, PTable.DEFAULT_DISABLE_WAL,
+                 left.isMultiTenant(), left.getStoreNulls(), 
left.getViewType(), left.getViewIndexId(), left.getIndexType(),
 -                left.rowKeyOrderOptimizable());
++                left.rowKeyOrderOptimizable(), left.isTransactional());
      }
  
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 0000000,f93ab03..25bf35f
mode 000000,100644..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@@ -1,0 -1,233 +1,245 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.compile;
+ 
+ import java.io.IOException;
+ import java.sql.ParameterMetaData;
+ import java.sql.SQLException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
++import java.util.Set;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.LocatedFileStatus;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.RemoteIterator;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellUtil;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.KeyValue.Type;
+ import org.apache.hadoop.hbase.client.Result;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+ import org.apache.phoenix.expression.Determinism;
+ import org.apache.phoenix.expression.Expression;
+ import org.apache.phoenix.expression.LiteralExpression;
+ import org.apache.phoenix.expression.RowKeyColumnExpression;
+ import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+ import org.apache.phoenix.iterate.ParallelScanGrouper;
+ import org.apache.phoenix.iterate.ResultIterator;
+ import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+ import org.apache.phoenix.jdbc.PhoenixStatement;
++import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+ import org.apache.phoenix.parse.FilterableStatement;
+ import org.apache.phoenix.parse.LiteralParseNode;
+ import org.apache.phoenix.parse.ParseNodeFactory;
+ import org.apache.phoenix.query.HBaseFactoryProvider;
+ import org.apache.phoenix.query.KeyRange;
+ import org.apache.phoenix.query.QueryServices;
+ import org.apache.phoenix.schema.PColumn;
+ import org.apache.phoenix.schema.PColumnImpl;
+ import org.apache.phoenix.schema.PNameFactory;
+ import org.apache.phoenix.schema.RowKeyValueAccessor;
+ import org.apache.phoenix.schema.SortOrder;
+ import org.apache.phoenix.schema.TableRef;
+ import org.apache.phoenix.schema.tuple.ResultTuple;
+ import org.apache.phoenix.schema.tuple.Tuple;
+ import org.apache.phoenix.schema.types.PVarchar;
+ import org.apache.phoenix.util.ByteUtil;
+ import org.apache.phoenix.util.SizedUtil;
+ 
+ public class ListJarsQueryPlan implements QueryPlan {
+ 
+     private PhoenixStatement stmt = null;
+     private StatementContext context = null;
+     private boolean first = true;
+ 
+     private static final RowProjector JARS_PROJECTOR;
+     
+     static {
+         List<ExpressionProjector> projectedColumns = new 
ArrayList<ExpressionProjector>();
+         PColumn column =
+                 new PColumnImpl(PNameFactory.newName("jar_location"), null,
+                         PVarchar.INSTANCE, null, null, false, 0, 
SortOrder.getDefault(), 0, null,
+                         false, null, false);
+         List<PColumn> columns = new ArrayList<PColumn>();
+         columns.add(column);
+         Expression expression =
+                 new RowKeyColumnExpression(column, new 
RowKeyValueAccessor(columns, 0));
+         projectedColumns.add(new ExpressionProjector("jar_location", "", 
expression,
+                 true));
+         int estimatedByteSize = SizedUtil.KEY_VALUE_SIZE;
+         JARS_PROJECTOR = new RowProjector(projectedColumns, 
estimatedByteSize, false);
+     }
+ 
+     public ListJarsQueryPlan(PhoenixStatement stmt) {
+         this.stmt = stmt;
+         this.context = new StatementContext(stmt);
+     }
+ 
+     @Override
+     public StatementContext getContext() {
+         return this.context;
+     }
+ 
+     @Override
+     public ParameterMetaData getParameterMetaData() {
+         return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+     }
+ 
+     @Override
+     public ExplainPlan getExplainPlan() throws SQLException {
+         return ExplainPlan.EMPTY_PLAN;
+     }
+     
+     @Override
+     public ResultIterator iterator() throws SQLException {
+         return iterator(DefaultParallelScanGrouper.getInstance());
+     }
+ 
+     @Override
+     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
+         return new ResultIterator() {
+             private RemoteIterator<LocatedFileStatus> listFiles = null;
+ 
+             @Override
+             public void close() throws SQLException {
+                 
+             }
+             
+             @Override
+             public Tuple next() throws SQLException {
+                 try {
+                     if(first) {
+                         String dynamicJarsDir =
+                                 
stmt.getConnection().getQueryServices().getProps()
+                                         
.get(QueryServices.DYNAMIC_JARS_DIR_KEY);
+                         if(dynamicJarsDir == null) {
+                             throw new 
SQLException(QueryServices.DYNAMIC_JARS_DIR_KEY
+                                     + " is not configured for the listing the 
jars.");
+                         }
+                         dynamicJarsDir =
+                                 dynamicJarsDir.endsWith("/") ? dynamicJarsDir 
: dynamicJarsDir + '/';
+                         Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+                         Path dynamicJarsDirPath = new Path(dynamicJarsDir);
+                         FileSystem fs = 
dynamicJarsDirPath.getFileSystem(conf);
+                         listFiles = fs.listFiles(dynamicJarsDirPath, true);
+                         first = false;
+                     }
+                     if(listFiles == null || !listFiles.hasNext()) return null;
+                     ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                     ParseNodeFactory factory = new ParseNodeFactory();
+                     LiteralParseNode literal =
+                             
factory.literal(listFiles.next().getPath().toString());
+                     LiteralExpression expression =
+                             LiteralExpression.newConstant(literal.getValue(), 
PVarchar.INSTANCE,
+                                 Determinism.ALWAYS);
+                     expression.evaluate(null, ptr);
+                     byte[] rowKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                     Cell cell =
+                             CellUtil.createCell(rowKey, 
HConstants.EMPTY_BYTE_ARRAY,
+                                 HConstants.EMPTY_BYTE_ARRAY, 
System.currentTimeMillis(),
+                                 Type.Put.getCode(), 
HConstants.EMPTY_BYTE_ARRAY);
+                     List<Cell> cells = new ArrayList<Cell>(1);
+                     cells.add(cell);
+                     return new ResultTuple(Result.create(cells));
+                 } catch (IOException e) {
+                     throw new SQLException(e);
+                 }  
+             }
+             
+             @Override
+             public void explain(List<String> planSteps) {
+             }
+         };
+     }
+     
+     @Override
+     public long getEstimatedSize() {
+         return PVarchar.INSTANCE.getByteSize();
+     }
+ 
+     @Override
+     public TableRef getTableRef() {
+         return null;
+     }
+ 
+     @Override
+     public RowProjector getProjector() {
+         return JARS_PROJECTOR;
+     }
+ 
+     @Override
+     public Integer getLimit() {
+         return null;
+     }
+ 
+     @Override
+     public OrderBy getOrderBy() {
+         return OrderBy.EMPTY_ORDER_BY;
+     }
+ 
+     @Override
+     public GroupBy getGroupBy() {
+         return GroupBy.EMPTY_GROUP_BY;
+     }
+ 
+     @Override
+     public List<KeyRange> getSplits() {
+         return Collections.emptyList();
+     }
+ 
+     @Override
+     public List<List<Scan>> getScans() {
+         return Collections.emptyList();
+     }
+ 
+     @Override
+     public FilterableStatement getStatement() {
+         return null;
+     }
+ 
+     @Override
+     public boolean isDegenerate() {
+         return false;
+     }
+ 
+     @Override
+     public boolean isRowKeyOrdered() {
+         return false;
+     }
+     
+     @Override
+     public boolean useRoundRobinIterator() {
+         return false;
+     }
++
++      @Override
++      public Set<TableRef> getSourceRefs() {
++              return Collections.<TableRef>emptySet();
++      }
++
++      @Override
++      public Operation getOperation() {
++              return stmt.getUpdateOperation();
++      }
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 9a50067,630760c..6a53f80
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@@ -53,21 -53,34 +53,34 @@@ public abstract class MutatingParallelI
      /**
       * Method that does the actual mutation work
       */
-     abstract protected MutationState mutate(ResultIterator iterator, 
PhoenixConnection connection) throws SQLException;
+     abstract protected MutationState mutate(StatementContext parentContext, 
ResultIterator iterator, PhoenixConnection connection) throws SQLException;
      
      @Override
-     public PeekingResultIterator newIterator(ResultIterator iterator, Scan 
scan) throws SQLException {
-         final PhoenixConnection connection = new 
PhoenixConnection(this.connection);
-         MutationState state = mutate(iterator, connection);
 -    public PeekingResultIterator newIterator(final StatementContext 
parentContext, ResultIterator iterator, Scan scan, String tableName) throws 
SQLException {
++    public PeekingResultIterator newIterator(StatementContext parentContext, 
ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+         final PhoenixConnection clonedConnection = new 
PhoenixConnection(this.connection);
+         
+         MutationState state = mutate(parentContext, iterator, 
clonedConnection);
+         
          long totalRowCount = state.getUpdateCount();
-         if (connection.getAutoCommit()) {
-             connection.getMutationState().join(state);
-             connection.commit();
-             ConnectionQueryServices services = connection.getQueryServices();
-             int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-             state = new MutationState(maxSize, connection, totalRowCount);
+         if (clonedConnection.getAutoCommit()) {
+             clonedConnection.getMutationState().join(state);
+             clonedConnection.commit();
+             ConnectionQueryServices services = 
clonedConnection.getQueryServices();
+             int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+             /*
+              * Everything that was mutated as part of the clonedConnection 
has been committed. However, we want to
+              * report the mutation work done using this clonedConnection as 
part of the overall mutation work of the
+              * parent connection. So we need to set those metrics in the 
empty mutation state so that they could be
+              * combined with the parent connection's mutation metrics (as 
part of combining mutation state) in the
+              * close() method of the iterator being returned. Don't combine 
the read metrics in parent context yet
+              * though because they are possibly being concurrently modified 
by other threads at this stage. Instead we
+              * will get hold of the read metrics when all the mutating 
iterators are done.
+              */
+             state = MutationState.emptyMutationState(maxSize, 
clonedConnection);
+             
state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue());
          }
          final MutationState finalState = state;
+         
          byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
          KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
          final Tuple tuple = new SingleKeyValueTuple(keyValue);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 1f35b10,d75fe38..96b057d
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@@ -31,16 -32,18 +31,18 @@@ import org.apache.phoenix.execute.Aggre
  import org.apache.phoenix.execute.MutationState;
  import org.apache.phoenix.iterate.ResultIterator;
  import org.apache.phoenix.jdbc.PhoenixConnection;
 -import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
  import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+ import org.apache.phoenix.parse.PFunction;
  import org.apache.phoenix.parse.SelectStatement;
  import org.apache.phoenix.query.QueryConstants;
  import org.apache.phoenix.schema.AmbiguousColumnException;
  import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
  import org.apache.phoenix.schema.ColumnNotFoundException;
  import org.apache.phoenix.schema.ColumnRef;
++import org.apache.phoenix.schema.FunctionNotFoundException;
  import org.apache.phoenix.schema.PColumn;
  import org.apache.phoenix.schema.PColumnFamily;
 -import org.apache.phoenix.schema.types.PLong;
  import org.apache.phoenix.schema.TableNotFoundException;
  import org.apache.phoenix.schema.TableRef;
  import org.apache.phoenix.schema.tuple.Tuple;
@@@ -79,31 -80,23 +81,47 @@@ public class PostDDLCompiler 
  
      public MutationPlan compile(final List<TableRef> tableRefs, final byte[] 
emptyCF, final byte[] projectCF, final List<PColumn> deleteList,
              final long timestamp) throws SQLException {
 -        
 -        return new MutationPlan() {
 -            
 -            @Override
 -            public PhoenixConnection getConnection() {
 -                return connection;
 -            }
 -            
 -            @Override
 -            public ParameterMetaData getParameterMetaData() {
 -                return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
 -            }
 -            
 -            @Override
 -            public ExplainPlan getExplainPlan() throws SQLException {
 -                return ExplainPlan.EMPTY_PLAN;
 -            }
 +        PhoenixStatement statement = new PhoenixStatement(connection);
 +        final StatementContext context = new StatementContext(
 +                statement, 
 +                new ColumnResolver() {
 +
 +                    @Override
 +                    public List<TableRef> getTables() {
 +                        return tableRefs;
 +                    }
 +
 +                    @Override
 +                    public TableRef resolveTable(String schemaName, String 
tableName) throws SQLException {
 +                        throw new UnsupportedOperationException();
 +                    }
 +
 +                    @Override
 +                    public ColumnRef resolveColumn(String schemaName, String 
tableName, String colName)
 +                            throws SQLException {
 +                        throw new UnsupportedOperationException();
 +                    }
++
++                                      @Override
++                                      public List<PFunction> getFunctions() {
++                                              return 
Collections.<PFunction>emptyList();
++                                      }
++
++                                      @Override
++                                      public PFunction resolveFunction(String 
functionName)
++                                                      throws SQLException {
++                                              throw new 
FunctionNotFoundException(functionName);
++                                      }
++
++                                      @Override
++                                      public boolean hasUDFs() {
++                                              return false;
++                                      }
 +                    
 +                },
 +                scan,
 +                new SequenceManager(statement));
 +        return new BaseMutationPlan(context, Operation.UPSERT /* FIXME */) {
              
              @Override
              public MutationState execute() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 97b68a5,1c0c469..136ee21
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@@ -45,11 -47,12 +47,13 @@@ public interface QueryPlan extends Stat
       */
      public ResultIterator iterator() throws SQLException;
      
+     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException;
+     
      public long getEstimatedSize();
      
 -    // TODO: change once joins are supported
 +    @Deprecated
      TableRef getTableRef();
 +    
      /**
       * Returns projector used to formulate resultSet row
       */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index cd42fd1,50ec919..a9754b3
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@@ -37,10 -38,12 +39,12 @@@ import org.apache.phoenix.expression.De
  import org.apache.phoenix.expression.Expression;
  import org.apache.phoenix.expression.LiteralExpression;
  import org.apache.phoenix.expression.RowKeyColumnExpression;
+ import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+ import org.apache.phoenix.iterate.ParallelScanGrouper;
  import org.apache.phoenix.iterate.ResultIterator;
  import org.apache.phoenix.jdbc.PhoenixConnection;
 -import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
  import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
  import org.apache.phoenix.metrics.MetricInfo;
  import org.apache.phoenix.parse.FilterableStatement;
  import org.apache.phoenix.parse.LiteralParseNode;
@@@ -104,11 -100,21 +106,16 @@@ public class TraceQueryPlan implements 
  
      @Override
      public ParameterMetaData getParameterMetaData() {
 -        return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
 -    }
 -
 -    @Override
 -    public ExplainPlan getExplainPlan() throws SQLException {
 -        return ExplainPlan.EMPTY_PLAN;
 +        return context.getBindManager().getParameterMetaData();
      }
- 
+     
      @Override
      public ResultIterator iterator() throws SQLException {
+       return iterator(DefaultParallelScanGrouper.getInstance());
+     }
+ 
+     @Override
+     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
          final PhoenixConnection conn = stmt.getConnection();
          if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {
              return ResultIterator.EMPTY_ITERATOR;
@@@ -223,9 -233,9 +239,14 @@@
      public boolean isRowKeyOrdered() {
          return false;
      }
 +
 +    @Override
 +    public ExplainPlan getExplainPlan() throws SQLException {
 +        return ExplainPlan.EMPTY_PLAN;
 +    }
+     
+     @Override
+     public boolean useRoundRobinIterator() {
+         return false;
+     }
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index d15ee7f,c6aa546..551b05c
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@@ -151,7 -152,7 +152,7 @@@ public class TupleProjectionCompiler 
                  table.getBucketNum(), projectedColumns, 
table.getParentSchemaName(),
                  table.getParentName(), table.getIndexes(), 
table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                  table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                 table.getIndexType(), table.isTransactional());
 -                table.getIndexType(), table.rowKeyOrderOptimizable());
++                table.getIndexType(), table.rowKeyOrderOptimizable(), 
table.isTransactional());
      }
  
      public static PTable createProjectedTable(TableRef tableRef, 
List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
@@@ -178,7 -179,7 +179,7 @@@
                      retainPKColumns ? table.getBucketNum() : null, 
projectedColumns, null,
                      null, Collections.<PTable>emptyList(), 
table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                      table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                     null, table.isTransactional());
 -                    null, table.rowKeyOrderOptimizable());
++                    null, table.rowKeyOrderOptimizable(), 
table.isTransactional());
      }
  
      // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index 0000000,afca97a..298303d
mode 000000,100644..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@@ -1,0 -1,89 +1,89 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.compile;
+ 
+ import java.sql.SQLException;
+ import java.util.ArrayList;
+ import java.util.List;
+ 
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.phoenix.exception.SQLExceptionCode;
+ import org.apache.phoenix.exception.SQLExceptionInfo;
+ import org.apache.phoenix.expression.Expression;
+ import org.apache.phoenix.jdbc.PhoenixStatement;
+ import org.apache.phoenix.parse.AliasedNode;
+ import org.apache.phoenix.schema.PColumn;
+ import org.apache.phoenix.schema.PColumnImpl;
+ import org.apache.phoenix.schema.PName;
+ import org.apache.phoenix.schema.PNameFactory;
+ import org.apache.phoenix.schema.PTable;
+ import org.apache.phoenix.schema.PTableImpl;
+ import org.apache.phoenix.schema.PTableType;
+ import org.apache.phoenix.schema.TableRef;
+ import org.apache.phoenix.schema.types.PDataType;
+ 
+ public class UnionCompiler {
+     private static final PName UNION_FAMILY_NAME = 
PNameFactory.newName("unionFamilyName");
+     private static final PName UNION_SCHEMA_NAME = 
PNameFactory.newName("unionSchemaName");
+     private static final PName UNION_TABLE_NAME = 
PNameFactory.newName("unionTableName");
+ 
+     public static List<QueryPlan> checkProjectionNumAndTypes(List<QueryPlan> 
selectPlans) throws SQLException {
+         QueryPlan plan = selectPlans.get(0);
+         int columnCount = plan.getProjector().getColumnCount();
+         List<? extends ColumnProjector> projectors = 
plan.getProjector().getColumnProjectors();
+         List<PDataType> selectTypes = new ArrayList<PDataType>();
+         for (ColumnProjector pro : projectors) {
+             selectTypes.add(pro.getExpression().getDataType());
+         }
+ 
+         for (int i = 1;  i < selectPlans.size(); i++) {     
+             plan = selectPlans.get(i);
+             if (columnCount !=plan.getProjector().getColumnCount()) {
+                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SELECT_COLUMN_NUM_IN_UNIONALL_DIFFS).setMessage(".").build().buildException();
+             }
+             List<? extends ColumnProjector> pros =  
plan.getProjector().getColumnProjectors();
+             for (int j = 0; j < columnCount; j++) {
+                 PDataType type = pros.get(j).getExpression().getDataType();
+                 if (!type.isCoercibleTo(selectTypes.get(j))) {
+                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SELECT_COLUMN_TYPE_IN_UNIONALL_DIFFS).setMessage(".").build().buildException();
+                 }
+             }
+         }
+         return selectPlans;
+     }
+ 
+     public static TableRef contructSchemaTable(PhoenixStatement statement, 
QueryPlan plan, List<AliasedNode> selectNodes) throws SQLException {
+         List<PColumn> projectedColumns = new ArrayList<PColumn>();
+         for (int i=0; i< plan.getProjector().getColumnCount(); i++) {
+             ColumnProjector colProj = 
plan.getProjector().getColumnProjector(i);
+             Expression sourceExpression = colProj.getExpression();
+             String name = selectNodes == null ? colProj.getName() : 
selectNodes.get(i).getAlias();
+             PColumnImpl projectedColumn = new 
PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME,
+                     sourceExpression.getDataType(), 
sourceExpression.getMaxLength(), sourceExpression.getScale(), 
sourceExpression.isNullable(),
+                     i, sourceExpression.getSortOrder(), 500, null, false, 
sourceExpression.toString(), false);
+             projectedColumns.add(projectedColumn);
+         }
+         Long scn = statement.getConnection().getSCN();
+         PTable tempTable = 
PTableImpl.makePTable(statement.getConnection().getTenantId(), 
UNION_SCHEMA_NAME, UNION_TABLE_NAME, 
+                 PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn 
== null ? HConstants.LATEST_TIMESTAMP : scn, null, null,
+                         projectedColumns, null, null, null,
 -                        true, null, null, null, true, true, true, null, null, 
null, false);
++                        true, null, null, null, true, true, true, null, null, 
null, false, false);
+         TableRef tableRef = new TableRef(null, tempTable, 0, false);
+         return tableRef;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 09db9c1,56087c0..6b30ed8
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@@ -205,13 -242,22 +243,24 @@@ public class UpsertCompiler 
      }
      
      private final PhoenixStatement statement;
 +    private final Operation operation;
      
 -    public UpsertCompiler(PhoenixStatement statement) {
 +    public UpsertCompiler(PhoenixStatement statement, Operation operation) {
          this.statement = statement;
 +        this.operation = operation;
      }
      
+     private static LiteralParseNode getNodeForRowTimestampColumn(PColumn col) 
{
+         PDataType type = col.getDataType();
+         long dummyValue = 0L;
+         if (type.isCoercibleTo(PTimestamp.INSTANCE)) {
+             return new LiteralParseNode(new Timestamp(dummyValue), 
PTimestamp.INSTANCE);
+         } else if (type == PLong.INSTANCE || type == PUnsignedLong.INSTANCE) {
+             return new LiteralParseNode(dummyValue, PLong.INSTANCE);
+         }
+         throw new IllegalArgumentException();
+     }
+     
      public MutationPlan compile(UpsertStatement upsert) throws SQLException {
          final PhoenixConnection connection = statement.getConnection();
          ConnectionQueryServices services = connection.getQueryServices();
@@@ -403,12 -463,12 +468,12 @@@
                      if (! (select.isAggregate() || select.isDistinct() || 
select.getLimit() != null || select.hasSequence()) ) {
                          // We can pipeline the upsert select instead of 
spooling everything to disk first,
                          // if we don't have any post processing that's 
required.
-                         parallelIteratorFactoryToBe = new 
UpsertingParallelIteratorFactory(connection, tableRefToBe);
+                         parallelIteratorFactoryToBe = new 
UpsertingParallelIteratorFactory(connection, tableRefToBe, 
useServerTimestampToBe);
                          // If we're in the else, then it's not an aggregate, 
distinct, limited, or sequence using query,
                          // so we might be able to run it entirely on the 
server side.
-                         // Can't run on same server for transactional data, 
as we need the row keys for the data
-                         // that is being upserted for conflict detection 
purposes.
-                         runOnServer = sameTable && isAutoCommit && 
!table.isTransactional() && !(table.isImmutableRows() && 
!table.getIndexes().isEmpty());
+                         // For a table with row timestamp column, we can't 
guarantee that the row key will reside in the
+                         // region space managed by region servers. So we bail 
out on executing on server side.
 -                        runOnServer = sameTable && isAutoCommit && 
!(table.isImmutableRows() && !table.getIndexes().isEmpty()) && 
table.getRowTimestampColPos() == -1;
++                        runOnServer = sameTable && isAutoCommit && 
!table.isTransactional() && !(table.isImmutableRows() && 
!table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
                      }
                      // If we may be able to run on the server, add a hint 
that favors using the data table
                      // if all else is equal.

Reply via email to