http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanQueryIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanQueryIT.java
index 9b28bad..b553816 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanQueryIT.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.util.TestUtil.A_VALUE;
 import static org.apache.phoenix.util.TestUtil.B_VALUE;
 import static org.apache.phoenix.util.TestUtil.C_VALUE;
-import static org.apache.phoenix.util.TestUtil.E_VALUE;
 import static org.apache.phoenix.util.TestUtil.ROW1;
 import static org.apache.phoenix.util.TestUtil.ROW2;
 import static org.apache.phoenix.util.TestUtil.ROW3;
@@ -39,10 +38,8 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 
@@ -53,7 +50,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import com.google.common.collect.Lists;
 import com.google.common.primitives.Doubles;
 import com.google.common.primitives.Floats;
 
@@ -66,8 +62,8 @@ public class ScanQueryIT extends BaseQueryIT {
         return QueryIT.data();
     }
 
-    public ScanQueryIT(String indexDDL) {
-        super(indexDDL);
+    public ScanQueryIT(String indexDDL, boolean mutable, boolean 
columnEncoded) {
+        super(indexDDL, mutable, columnEncoded);
     }
     
     @Test
@@ -440,57 +436,4 @@ public class ScanQueryIT extends BaseQueryIT {
             conn.close();
         }
     }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPointInTimeLimitedScan() throws Exception {
-        // Override value that was set at creation time
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ (ts + 1); // Run query at timestamp 5
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection upsertConn = DriverManager.getConnection(url, props);
-        String upsertStmt =
-            "upsert into " +
-            "ATABLE(" +
-            "    ORGANIZATION_ID, " +
-            "    ENTITY_ID, " +
-            "    A_INTEGER) " +
-            "VALUES (?, ?, ?)";
-        upsertConn.setAutoCommit(true); // Test auto commit
-        // Insert all rows at ts
-        PreparedStatement stmt = upsertConn.prepareStatement(upsertStmt);
-        stmt.setString(1, tenantId);
-        stmt.setString(2, ROW1);
-        stmt.setInt(3, 6);
-        stmt.execute(); // should commit too
-        upsertConn.close();
-
-        // Override value again, but should be ignored since it's past the SCN
-        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 
3);
-        upsertConn = DriverManager.getConnection(url, props);
-        upsertConn.setAutoCommit(true); // Test auto commit
-        // Insert all rows at ts
-        stmt = upsertConn.prepareStatement(upsertStmt);
-        stmt.setString(1, tenantId);
-        stmt.setString(2, ROW1);
-        stmt.setInt(3, 0);
-        stmt.execute(); // should commit too
-        upsertConn.close();
-        
-        String query = "SELECT a_integer,b_string FROM atable WHERE 
organization_id=? and a_integer <= 5 limit 2";
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 2));
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        PreparedStatement statement = conn.prepareStatement(query);
-        statement.setString(1, tenantId);
-        ResultSet rs = statement.executeQuery();
-        List<List<Object>> expectedResultsA = Lists.newArrayList(
-                Arrays.<Object>asList(2, C_VALUE),
-                Arrays.<Object>asList( 3, E_VALUE));
-        List<List<Object>> expectedResultsB = Lists.newArrayList(
-                Arrays.<Object>asList( 5, C_VALUE),
-                Arrays.<Object>asList(4, B_VALUE));
-        // Since we're not ordering and we may be using a descending index, we 
don't
-        // know which rows we'll get back.
-        assertOneOfValuesEqualsResultSet(rs, 
expectedResultsA,expectedResultsB);
-       conn.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/TopNIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TopNIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TopNIT.java
index ca1cd86..39e8cb6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TopNIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TopNIT.java
@@ -50,7 +50,7 @@ public class TopNIT extends BaseClientManagedTimeIT {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
 
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts, getUrl());
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts, getUrl(), null);
         String query = "SELECT entity_id FROM aTable ORDER BY b_string, 
entity_id LIMIT 5";
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 2)); // Execute at timestamp 2
@@ -80,7 +80,7 @@ public class TopNIT extends BaseClientManagedTimeIT {
     public void testDescMultiOrderByExpr() throws Exception {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts, getUrl());
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts, getUrl(), null);
         String query = "SELECT entity_id FROM aTable ORDER BY b_string || 
entity_id desc LIMIT 5";
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 2)); // Execute at timestamp 2
@@ -119,7 +119,7 @@ public class TopNIT extends BaseClientManagedTimeIT {
     private void testTopNDelete(boolean autoCommit) throws Exception {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts, getUrl());
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts, getUrl(), null);
         String query = "DELETE FROM aTable ORDER BY b_string, entity_id LIMIT 
5";
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 2)); // Execute at timestamp 2

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 763f11b..154110a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -89,7 +89,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
     private void testUpsertSelect(boolean createIndex) throws Exception {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts-1, getUrl());
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts-1, getUrl(), null);
 
         ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, 
CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
         String indexName = "IDX1";
@@ -210,7 +210,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT 
{
     public void testUpsertSelectEmptyPKColumn() throws Exception {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts-1, getUrl());
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts-1, getUrl(), null);
         ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, ts-1);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1)); // Execute at timestamp 1
@@ -386,7 +386,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT 
{
     private void testUpsertSelectForAgg(boolean autoCommit) throws Exception {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts-1, getUrl());
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), 
null, ts-1, getUrl(), null);
         ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, ts-1);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1)); // Execute at timestamp 1
@@ -462,7 +462,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT 
{
         byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), 
PInteger.INSTANCE.toBytes(2),
                 PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits, ts-2);
+        ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits, ts-2, 
null);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1));
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -602,7 +602,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT 
{
         byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), 
PInteger.INSTANCE.toBytes(2),
                 PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits,ts-2);
+        ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits,ts-2, 
null);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1));
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 3e0e3af..5fe4988 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -63,7 +63,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT {
     @Test
     public void testGroupByWithLimitOverRowKey() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME, 
null, ts-2);
+        ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME, 
null, ts-2, null);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 10));
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -96,7 +96,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT {
     public void testUpsertDateValues() throws Exception {
         long ts = nextTimestamp();
         Date now = new Date(System.currentTimeMillis());
-        
ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME,null, ts-2);
+        
ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME,null, ts-2, 
null);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1)); // Execute at timestamp 1
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -125,7 +125,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT 
{
     @Test
     public void testUpsertValuesWithExpression() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),"IntKeyTest","IntKeyTest", null, ts-2);
+        ensureTableCreated(getUrl(),"IntKeyTest","IntKeyTest", null, ts-2, 
null);
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 1)); // Execute at timestamp 1
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java
index 6a62673..753f2c8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java
@@ -58,7 +58,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     }
 
     protected static void initGroupByRowKeyColumns(long ts) throws Exception {
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         // Insert all rows at ts
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
@@ -85,7 +85,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     }
 
     protected static void initTableValues(byte[][] splits, long ts) throws 
Exception {
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, splits, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, splits, ts-2, 
null);
 
         // Insert all rows at ts
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
@@ -106,7 +106,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
         stmt.setBigDecimal(4, new BigDecimal(.5));
         stmt.execute();
 
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, splits, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, splits, ts-2, 
null);
         conn.setAutoCommit(false);
 
         // Insert all rows at ts
@@ -431,7 +431,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testNullValueEqualityScan() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         // Insert all rows at ts
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
@@ -459,7 +459,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testVarLengthPKColScan() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -489,7 +489,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testEscapedQuoteScan() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -527,7 +527,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     }
 
     private static void initPtsdbTableValues(long ts) throws Exception {
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -560,7 +560,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     }
 
     private static void initPtsdbTableValues2(long ts, Date d) throws 
Exception {
-        ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -696,7 +696,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testBatchUpsert() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2, 
null);
         Date d = new Date(ts);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
@@ -874,7 +874,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testMissingPKColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -894,7 +894,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testNoKVColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -914,7 +914,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     // Broken, since we don't know if insert vs update. @Test
     public void testMissingKVColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -942,7 +942,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testTooShortKVColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -978,7 +978,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testTooShortPKColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -1014,7 +1014,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testTooLongPKColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -1051,7 +1051,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testTooLongKVColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, 
null);
 
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts; // Insert at timestamp 0
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -1481,7 +1481,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testLikeOnColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         // Insert all rows at ts
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
@@ -1598,7 +1598,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testILikeOnColumn() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         // Insert all rows at ts
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
@@ -1730,7 +1730,7 @@ public class VariableLengthPKIT extends 
BaseClientManagedTimeIT {
     @Test
     public void testIsNullInPK() throws Exception {
         long ts = nextTimestamp();
-        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2);
+        ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null);
 
         // Insert all rows at ts
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 3ee9721..510cbe8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -70,6 +71,7 @@ import com.google.common.collect.Maps;
 public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
 
     private final boolean localIndex;
+    private final boolean columnEncoded;
     private final String tableDDLOptions;
 
     private volatile boolean stopThreads = false;
@@ -78,9 +80,15 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
     private static String INDEX_DDL;
     public static final AtomicInteger NUM_ROWS = new AtomicInteger(0);
 
-    public ImmutableIndexIT(boolean localIndex, boolean transactional) {
-        this.localIndex = localIndex;
+    public ImmutableIndexIT(boolean localIndex, boolean transactional, boolean 
columnEncoded) {
         StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+        this.localIndex = localIndex;
+        this.columnEncoded = columnEncoded;
+        if (!columnEncoded) {
+            if (optionBuilder.length()!=0)
+                optionBuilder.append(",");
+            
optionBuilder.append("COLUMN_ENCODED_BYTES=0,IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+        }
         if (transactional) {
             optionBuilder.append(", TRANSACTIONAL=true");
         }
@@ -98,11 +106,13 @@ public class ImmutableIndexIT extends 
BaseUniqueNamesOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
-    @Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1}") // 
name is used by failsafe as file name in reports
+    
@Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1},columnEncoded={2}")
 // name is used by failsafe as file name in reports
     public static Collection<Boolean[]> data() {
                return Arrays.asList(new Boolean[][] { 
-                               { false, false }, { false, true },
-                               { true, false }, { true, true } });
+                               { false, false, false }, { false, false, true },
+                               { false, true, false }, { false, true, true },
+                               { true, false, false }, { true, false, true },
+                { true, true, false }, { true, true, true } });
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index 3a72088..3f90936 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.DateUtil;
@@ -84,26 +85,39 @@ public class IndexIT extends ParallelStatsDisabledIT {
     private final boolean mutable;
     private final String tableDDLOptions;
 
-    public IndexIT(boolean localIndex, boolean mutable, boolean transactional) 
{
+    public IndexIT(boolean localIndex, boolean mutable, boolean transactional, 
boolean columnEncoded) {
         this.localIndex = localIndex;
         this.transactional = transactional;
         this.mutable = mutable;
         StringBuilder optionBuilder = new StringBuilder();
-        if (!mutable)
-            optionBuilder.append(" IMMUTABLE_ROWS=true ");
+        if (!columnEncoded) {
+            if (optionBuilder.length()!=0)
+                optionBuilder.append(",");
+            optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+        }
+        if (!mutable) {
+            if (optionBuilder.length()!=0)
+                optionBuilder.append(",");
+            optionBuilder.append("IMMUTABLE_ROWS=true");
+            if (!columnEncoded) {
+                
optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+            }
+        }
         if (transactional) {
-            if (!(optionBuilder.length()==0))
+            if (optionBuilder.length()!=0)
                 optionBuilder.append(",");
             optionBuilder.append(" TRANSACTIONAL=true ");
         }
         this.tableDDLOptions = optionBuilder.toString();
     }
 
-    @Parameters(name="IndexIT_localIndex={0},mutable={1},transactional={2}") 
// name is used by failsafe as file name in reports
+    
@Parameters(name="IndexIT_localIndex={0},mutable={1},transactional={2},columnEncoded={3}")
 // name is used by failsafe as file name in reports
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {
-                 { false, false, false }, { false, false, true }, { false, 
true, false }, { false, true, true },
-                 { true, false, false }, { true, false, true }, { true, true, 
false }, { true, true, true }
+                { false, false, false, false }, { false, false, false, true }, 
{ false, false, true, false }, { false, false, true, true }, 
+                { false, true, false, false }, { false, true, false, true }, { 
false, true, true, false }, { false, true, true, true }, 
+                { true, false, false, false }, { true, false, false, true }, { 
true, false, true, false }, { true, false, true, true }, 
+                { true, true, false, false }, { true, true, false, true }, { 
true, true, true, false }, { true, true, true, true } 
            });
     }
 
@@ -780,7 +794,7 @@ public class IndexIT extends ParallelStatsDisabledIT {
             conn.createStatement().execute(
                     "CREATE TABLE " + testTable
                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) "
-                    + (!tableDDLOptions.isEmpty() ? tableDDLOptions : "") + 
"SPLIT ON ('b')");
+                    + (!tableDDLOptions.isEmpty() ? tableDDLOptions : "") + " 
SPLIT ON ('b')");
             query = "SELECT * FROM " + testTable;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index d1ab61e..dd1b4ae 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -68,12 +68,17 @@ public class MutableIndexIT extends ParallelStatsDisabledIT 
{
     protected final boolean localIndex;
     private final String tableDDLOptions;
        
-    public MutableIndexIT(boolean localIndex, boolean transactional) {
+    public MutableIndexIT(boolean localIndex, boolean transactional, boolean 
columnEncoded) {
                this.localIndex = localIndex;
                StringBuilder optionBuilder = new StringBuilder();
                if (transactional) {
                        optionBuilder.append("TRANSACTIONAL=true");
                }
+               if (!columnEncoded) {
+            if (optionBuilder.length()!=0)
+                optionBuilder.append(",");
+            optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+        }
                this.tableDDLOptions = optionBuilder.toString();
        }
     
@@ -88,11 +93,13 @@ public class MutableIndexIT extends ParallelStatsDisabledIT 
{
         return getConnection(props);
     }
     
-       @Parameters(name="MutableIndexIT_localIndex={0},transactional={1}") // 
name is used by failsafe as file name in reports
+       
@Parameters(name="MutableIndexIT_localIndex={0},transactional={1},columnEncoded={2}")
 // name is used by failsafe as file name in reports
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {
-                { false, false }, { false, true }, { true, false }, { true, 
true }
-           });
+        return Arrays.asList(new Boolean[][] { 
+                { false, false, false }, { false, false, true },
+                { false, true, false }, { false, true, true },
+                { true, false, false }, { true, false, true },
+                { true, true, false }, { true, true, true } });
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
index 29f3758..5ae11bf 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
@@ -58,7 +58,7 @@ public class SaltedTableIT extends BaseClientManagedTimeIT {
         // 4abc123jkl444
         try {
             // Upsert with no column specifies.
-            ensureTableCreated(getUrl(), TABLE_WITH_SALTING, 
TABLE_WITH_SALTING, splits, ts-2);
+            ensureTableCreated(getUrl(), TABLE_WITH_SALTING, 
TABLE_WITH_SALTING, splits, ts-2, null);
             String query = "UPSERT INTO " + TABLE_WITH_SALTING + " 
VALUES(?,?,?,?,?)";
             PreparedStatement stmt = conn.prepareStatement(query);
             stmt.setInt(1, 1);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
new file mode 100644
index 0000000..cdf7f1d
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.apache.tephra.TxConstants;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
+    
+    private final String tableDDLOptions;
+
+    public ParameterizedTransactionIT(boolean mutable, boolean columnEncoded) {
+        StringBuilder optionBuilder = new StringBuilder("TRANSACTIONAL=true");
+        if (!columnEncoded) {
+            optionBuilder.append(",COLUMN_ENCODED_BYTES=0");
+        }
+        if (!mutable) {
+            optionBuilder.append(",IMMUTABLE_ROWS=true");
+            if (!columnEncoded) {
+                
optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+            }
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+    
+    @Parameters(name="TransactionIT_mutable={0},columnEncoded={1}") // name is 
used by failsafe as file name in reports
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 {false, false }, {false, true }, {true, false }, { true, true 
},
+           });
+    }
+    
+    @Test
+    public void testReadOwnWrites() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        String selectSql = "SELECT * FROM "+ fullTableName;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+            
+            conn.commit();
+            
+            // verify rows can be read after commit
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testTxnClosedCorrecty() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        String selectSql = "SELECT * FROM "+fullTableName;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            // Long currentTx = 
rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp();
+            assertFalse(rs.next());
+            
+            conn.close();
+            // start new connection
+            // conn.createStatement().executeQuery(selectSql);
+            // assertFalse("This transaction should not be on the invalid 
transactions",
+            // txManager.getCurrentState().getInvalid().contains(currentTx));
+        }
+    }
+    
+    @Test
+    public void testAutoCommitQuerySingleTable() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+            conn.setAutoCommit(true);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + fullTableName);
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testAutoCommitQueryMultiTables() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+            conn.setAutoCommit(true);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + fullTableName + " x JOIN " + fullTableName + " y ON (x.long_pk = 
y.int_pk)");
+            assertFalse(rs.next());
+        } 
+    }
+    
+    @Test
+    public void testSelfJoin() throws Exception {
+        String t1 = generateUniqueName();
+        String t2 = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + t1 + " 
(varchar_pk VARCHAR NOT NULL primary key, a.varchar_col1 VARCHAR, 
b.varchar_col2 VARCHAR)" + tableDDLOptions);
+            conn.createStatement().execute("create table " + t2 + " 
(varchar_pk VARCHAR NOT NULL primary key, a.varchar_col1 VARCHAR, 
b.varchar_col1 VARCHAR)" + tableDDLOptions);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + t1 + " x JOIN " + t1 + " y ON (x.varchar_pk = y.a.varchar_col1)");
+            assertFalse(rs.next());
+            rs = conn.createStatement().executeQuery("SELECT * FROM " + t2 + " 
x JOIN " + t2 + " y ON (x.varchar_pk = y.a.varchar_col1)");
+            assertFalse(rs.next());
+        } 
+    }
+    
+    private void testRowConflicts(String fullTableName) throws Exception {
+        try (Connection conn1 = DriverManager.getConnection(getUrl());
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(false);
+            String selectSql = "SELECT * FROM "+fullTableName;
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+            boolean immutableRows = 
conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, 
fullTableName)).isImmutableRows();
+            assertFalse(rs.next());
+            // upsert row using conn1
+            String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, 
?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 10);
+            stmt.execute();
+            // upsert row using conn2
+            upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, 
?, ?, ?)";
+            stmt = conn2.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 11);
+            stmt.execute();
+            
+            conn1.commit();
+            //second commit should fail
+            try {
+                conn2.commit();
+                if (!immutableRows) fail();
+            }   
+            catch (SQLException e) {
+                if (immutableRows) fail();
+                assertEquals(e.getErrorCode(), 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+            }
+        }
+    }
+    
+    @Test
+    public void testRowConflictDetected() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+        testRowConflicts(fullTableName);
+    }
+    
+    @Test
+    public void testNoConflictDetectionForImmutableRows() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+        conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET 
IMMUTABLE_ROWS=true");
+        testRowConflicts(fullTableName);
+    }
+    
+    @Test
+    public void testNonTxToTxTable() throws Exception {
+        String nonTxTableName = generateUniqueName();
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k 
INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions);
+        conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " 
VALUES (1)");
+        conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " 
VALUES (2, 'a')");
+        conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " 
VALUES (3, 'b')");
+        conn.commit();
+        
+        String index = generateUniqueName();
+        conn.createStatement().execute("CREATE INDEX " + index + " ON " + 
nonTxTableName + "(v)");
+        // Reset empty column value to an empty value like it is 
pre-transactions
+        HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( 
nonTxTableName));
+        List<Put>puts = Lists.newArrayList(new 
Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new 
Put(PInteger.INSTANCE.toBytes(3)));
+        for (Put put : puts) {
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+        }
+        htable.put(puts);
+        
+        conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET 
TRANSACTIONAL=true");
+        
+        htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( 
nonTxTableName));
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index));
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+
+        conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " 
VALUES (4, 'c')");
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL");
+        assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,  nonTxTableName)).isTransactional());
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertFalse(rs.next());
+        conn.commit();
+        
+        conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " 
VALUES (5, 'd')");
+        rs = conn.createStatement().executeQuery("SELECT k FROM " + 
nonTxTableName);
+        assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, index)).isTransactional());
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(2,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(4,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(5,rs.getInt(1));
+        assertFalse(rs.next());
+        conn.rollback();
+        
+        rs = conn.createStatement().executeQuery("SELECT k FROM " + 
nonTxTableName);
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(2,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(4,rs.getInt(1));
+        assertFalse(rs.next());
+    }
+    
+    @Ignore
+    @Test
+    public void testNonTxToTxTableFailure() throws Exception {
+        String nonTxTableName = generateUniqueName();
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        // Put table in SYSTEM schema to prevent attempts to update the cache 
after we disable SYSTEM.CATALOG
+        conn.createStatement().execute("CREATE TABLE SYSTEM." + nonTxTableName 
+ "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions);
+        conn.createStatement().execute("UPSERT INTO SYSTEM." + nonTxTableName 
+ " VALUES (1)");
+        conn.commit();
+        // Reset empty column value to an empty value like it is 
pre-transactions
+        HTableInterface htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM."
 + nonTxTableName));
+        Put put = new Put(PInteger.INSTANCE.toBytes(1));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+        htable.put(put);
+        
+        HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+        try {
+            // This will succeed initially in updating the HBase metadata, but 
then will fail when
+            // the SYSTEM.CATALOG table is attempted to be updated, exercising 
the code to restore
+            // the coprocessors back to the non transactional ones.
+            conn.createStatement().execute("ALTER TABLE SYSTEM." + 
nonTxTableName + " SET TRANSACTIONAL=true");
+            fail();
+        } catch (SQLException e) {
+            
assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME 
+ " is disabled"));
+        } finally {
+            admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+            admin.close();
+        }
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM 
SYSTEM." + nonTxTableName + " WHERE v IS NULL");
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertFalse(rs.next());
+        
+        htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM."
 + nonTxTableName));
+        
assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
+                getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
+                
getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
+    }
+    
+    @Test
+    public void testCreateTableToBeTransactional() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String t1 = generateUniqueName();
+        String t2 = generateUniqueName();
+        String ddl = "CREATE TABLE " + t1 + " (k varchar primary key) " + 
tableDDLOptions;
+        conn.createStatement().execute(ddl);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        PTable table = pconn.getTable(new PTableKey(null, t1));
+        HTableInterface htable = 
pconn.getQueryServices().getTable(Bytes.toBytes(t1));
+        assertTrue(table.isTransactional());
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        
+        try {
+            ddl = "ALTER TABLE " + t1 + " SET transactional=false";
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), 
e.getErrorCode());
+        }
+
+        HBaseAdmin admin = pconn.getQueryServices().getAdmin();
+        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2));
+        desc.addFamily(new 
HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES));
+        admin.createTable(desc);
+        ddl = "CREATE TABLE " + t2 + " (k varchar primary key) 
transactional=true";
+        conn.createStatement().execute(ddl);
+        assertEquals(Boolean.TRUE.toString(), 
admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA));
+        
+        // Should be ok, as HBase metadata should match existing metadata.
+        ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; 
+        try {
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), 
e.getErrorCode());
+        }
+        ddl += " transactional=true";
+        conn.createStatement().execute(ddl);
+        table = pconn.getTable(new PTableKey(null, t1));
+        htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
+        assertTrue(table.isTransactional());
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+    }
+
+    @Test
+    public void testCurrentDate() throws Exception {
+        String transTableName = generateUniqueName();
+        String fullTableName = INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + transTableName;
+        String selectSql = "SELECT current_date() FROM "+fullTableName;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + fullTableName + 
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions);
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            conn.commit();
+            
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            Date date1 = rs.getDate(1);
+            assertFalse(rs.next());
+            
+            Thread.sleep(1000);
+            
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            Date date2 = rs.getDate(1);
+            assertFalse(rs.next());
+            assertTrue("current_date() should change while executing multiple 
statements", date2.getTime() > date1.getTime());
+        }
+    }
+    
+    
+    @Test
+    public void testParallelUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, 
Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        String fullTableName1 = generateUniqueName();
+        String fullTableName2 = generateUniqueName();
+        String sequenceName = "S_" + generateUniqueName();
+        conn.createStatement().execute("CREATE SEQUENCE " + sequenceName);
+        conn.createStatement().execute("CREATE TABLE " + fullTableName1 + " 
(pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4"
+                + (!tableDDLOptions.isEmpty()? "," : "") + tableDDLOptions);
+        conn.createStatement().execute("CREATE TABLE " + fullTableName2 + " 
(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions);
+
+        for (int i = 0; i < 100; i++) {
+            conn.createStatement().execute("UPSERT INTO " + fullTableName1 + " 
VALUES (NEXT VALUE FOR " + sequenceName + ", " + (i%10) + ")");
+        }
+        conn.commit();
+        conn.setAutoCommit(true);
+        int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " 
+ fullTableName2 + " SELECT pk, val FROM " + fullTableName1);
+        assertEquals(100,upsertCount);
+        conn.close();
+    }
+
+    @Test
+    public void testInflightPartialEval() throws SQLException {
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String transactTableName = generateUniqueName();
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR 
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
+
+            
+            try (Connection conn1 = DriverManager.getConnection(getUrl()); 
Connection conn2 = DriverManager.getConnection(getUrl())) {
+                conn1.createStatement().execute("UPSERT INTO " + 
transactTableName + " VALUES ('a','b','x')");
+                // Select to force uncommitted data to be written
+                ResultSet rs = conn1.createStatement().executeQuery("SELECT * 
FROM " + transactTableName);
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertEquals("b", rs.getString(2));
+                assertFalse(rs.next());
+                
+                conn2.createStatement().execute("UPSERT INTO " + 
transactTableName + " VALUES ('a','c','x')");
+                // Select to force uncommitted data to be written
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
transactTableName );
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertEquals("c", rs.getString(2));
+                assertFalse(rs.next());
+                
+                // If the AndExpression were to see the uncommitted row from 
conn2, the filter would
+                // filter the row out early and no longer continue to evaluate 
other cells due to
+                // the way partial evaluation holds state.
+                rs = conn1.createStatement().executeQuery("SELECT * FROM " +  
transactTableName + " WHERE v1 != 'c' AND v2 = 'x'");
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertEquals("b", rs.getString(2));
+                assertFalse(rs.next());
+                
+                // Same as above for conn1 data
+                rs = conn2.createStatement().executeQuery("SELECT * FROM " + 
transactTableName + " WHERE v1 != 'b' AND v2 = 'x'");
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertEquals("c", rs.getString(2));
+                assertFalse(rs.next());
+            }
+
+        }
+    }
+    
+}

Reply via email to