Repository: phoenix
Updated Branches:
  refs/heads/encodecolumns2 c004c6ea2 -> 61d9035cd


http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index 9a5e412..752ace0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -40,6 +40,10 @@ import java.util.Random;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -58,6 +62,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,20 +73,46 @@ import com.google.common.collect.Maps;
 @RunWith(Parameterized.class)
 public class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
     private final String tableDDLOptions;
+    private final boolean columnEncoded;
     private String tableName;
     private String schemaName;
     private String fullTableName;
     private String physicalTableName;
     private final boolean userTableNamespaceMapped;
+    private final boolean mutable;
     
-    public StatsCollectorIT(boolean transactional, boolean 
userTableNamespaceMapped) {
-        this.tableDDLOptions= transactional ? " TRANSACTIONAL=true" : "";
+    public StatsCollectorIT(boolean mutable, boolean transactional, boolean 
userTableNamespaceMapped, boolean columnEncoded) {
+        StringBuilder sb = new StringBuilder();
+        if (transactional) {
+            sb.append("TRANSACTIONAL=true");
+        }
+        if (columnEncoded) {
+            if (sb.length()>0) {
+                sb.append(",");
+            }
+            sb.append("COLUMN_ENCODED_BYTES=4");
+        }
+        if (!mutable) {
+            if (sb.length()>0) {
+                sb.append(",");
+            }
+            sb.append("IMMUTABLE_ROWS=true");
+        }
+        this.tableDDLOptions = sb.toString();
         this.userTableNamespaceMapped = userTableNamespaceMapped;
+        this.columnEncoded = columnEncoded;
+        this.mutable = mutable;
     }
     
-    @Parameters(name="transactional = {0}, isUserTableNamespaceMapped = {1}")
+    @Parameters(name="columnEncoded = {0}, mutable = {1}, transactional = {2}, 
isUserTableNamespaceMapped = {3}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {{false,true}, {false, false}, 
{true, false}, {true, true}});
+        return Arrays.asList(new Boolean[][] {     
+                { false, false, false, false }, { false, false, false, true }, 
{ false, false, true, false }, { false, false, true, true },
+                // no need to test non column encoded mutable case and this is 
the same as non column encoded immutable 
+                //{ false, true, false, false }, { false, true, false, true }, 
{ false, true, true, false }, { false, true, true, true }, 
+                { true, false, false, false }, { true, false, false, true }, { 
true, false, true, false }, { true, false, true, true }, 
+                { true, true, false, false }, { true, true, false, true }, { 
true, true, true, false }, { true, true, true, true } 
+          });
     }
     
     @BeforeClass
@@ -147,25 +178,28 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
                 "CREATE TABLE " + fullTableName +" ( k VARCHAR PRIMARY KEY, 
a.v1 VARCHAR, b.v2 VARCHAR ) " + tableDDLOptions + (tableDDLOptions.isEmpty() ? 
"" : ",") + "SALT_BUCKETS = 3");
         conn.createStatement().execute("UPSERT INTO " + fullTableName + 
"(k,v1) VALUES('a','123456789')");
         conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
+                
         ResultSet rs;
         String explainPlan;
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + 
fullTableName + " WHERE v2='foo'");
         explainPlan = QueryUtil.getExplainPlan(rs);
+        // if we are using the ONE_CELL_PER_COLUMN_FAMILY storage scheme, we 
will have the single kv even though there are no values for col family v2 
+        String stats = columnEncoded && !mutable  ? "4-CHUNK 1 ROWS 58 BYTES" 
: "3-CHUNK 0 ROWS 0 BYTES";
         assertEquals(
-                "CLIENT 3-CHUNK 0 ROWS 0 BYTES PARALLEL 3-WAY FULL SCAN OVER " 
+ physicalTableName + "\n" +
+                "CLIENT " + stats + " PARALLEL 3-WAY FULL SCAN OVER " + 
physicalTableName + "\n" +
                 "    SERVER FILTER BY B.V2 = 'foo'\n" + 
                 "CLIENT MERGE SORT",
                 explainPlan);
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + 
fullTableName);
         explainPlan = QueryUtil.getExplainPlan(rs);
         assertEquals(
-                "CLIENT 4-CHUNK 1 ROWS 28 BYTES PARALLEL 3-WAY FULL SCAN OVER 
" + physicalTableName + "\n" +
+                "CLIENT 4-CHUNK 1 ROWS " + (columnEncoded ? "28" : "34") + " 
BYTES PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" +
                 "CLIENT MERGE SORT",
                 explainPlan);
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + 
fullTableName + " WHERE k = 'a'");
         explainPlan = QueryUtil.getExplainPlan(rs);
         assertEquals(
-                "CLIENT 1-CHUNK 1 ROWS 204 BYTES PARALLEL 1-WAY POINT LOOKUP 
ON 1 KEY OVER " + physicalTableName + "\n" +
+                "CLIENT 1-CHUNK 1 ROWS " + (columnEncoded ? "204" : "202") + " 
BYTES PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + physicalTableName + "\n" +
                 "CLIENT MERGE SORT",
                 explainPlan);
         
@@ -368,11 +402,13 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
+    @Ignore //TODO remove this once  
https://issues.apache.org/jira/browse/TEPHRA-208 is fixed
     public void testCompactUpdatesStats() throws Exception {
         testCompactUpdatesStats(0, fullTableName);
     }
     
     @Test
+    @Ignore //TODO remove this once  
https://issues.apache.org/jira/browse/TEPHRA-208 is fixed
     public void testCompactUpdatesStatsWithMinStatsUpdateFreq() throws 
Exception {
         
testCompactUpdatesStats(QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS, 
fullTableName);
     }
@@ -390,6 +426,7 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
         Connection conn = getConnection(statsUpdateFreq);
         PreparedStatement stmt;
         conn.createStatement().execute("CREATE TABLE " + tableName + "(k 
CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) "
+                + (!tableDDLOptions.isEmpty() ? tableDDLOptions + "," : "") 
                 + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
         stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES(?,?,?)");
         for (int i = 0; i < nRows; i++) {
@@ -399,11 +436,13 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
             stmt.executeUpdate();
         }
         conn.commit();
+        
         compactTable(conn, physicalTableName);
-        if (statsUpdateFreq == null) {
+        
+        if (statsUpdateFreq != 0) {
             invalidateStats(conn, tableName);
         } else {
-            // Confirm that when we have a non zero 
MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
+            // Confirm that when we have a non zero 
STATS_UPDATE_FREQ_MS_ATTRIB, after we run
             // UPDATATE STATISTICS, the new statistics are faulted in as 
expected.
             List<KeyRange>keyRanges = getAllSplits(conn, tableName);
             assertNotEquals(nRows+1, keyRanges.size());
@@ -419,20 +458,40 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
         conn.commit();
         assertEquals(5, nDeletedRows);
         
+        Scan scan = new Scan();
+        scan.setRaw(true);
+        PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+        try (HTableInterface htable = 
phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) {
+            ResultScanner scanner = htable.getScanner(scan);
+            Result result;
+            while ((result = scanner.next())!=null) {
+                System.out.println(result);
+            }
+        }
+        
         compactTable(conn, physicalTableName);
-        if (statsUpdateFreq == null) {
-            invalidateStats(conn, tableName);
+        
+        scan = new Scan();
+        scan.setRaw(true);
+        phxConn = conn.unwrap(PhoenixConnection.class);
+        try (HTableInterface htable = 
phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) {
+            ResultScanner scanner = htable.getScanner(scan);
+            Result result;
+            while ((result = scanner.next())!=null) {
+                System.out.println(result);
+            }
         }
         
-        keyRanges = getAllSplits(conn, tableName);
-        if (statsUpdateFreq != null) {
+        if (statsUpdateFreq != 0) {
+            invalidateStats(conn, tableName);
+        } else {
             assertEquals(nRows+1, keyRanges.size());
-            // If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE 
STATISTICS will invalidate the cache
+            // If we've set STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS 
will invalidate the cache
             // and force us to pull over the new stats
             int rowCount = conn.createStatement().executeUpdate("UPDATE 
STATISTICS " + tableName);
             assertEquals(5, rowCount);
-            keyRanges = getAllSplits(conn, tableName);
         }
+        keyRanges = getAllSplits(conn, tableName);
         assertEquals(nRows/2+1, keyRanges.size());
         ResultSet rs = conn.createStatement().executeQuery("SELECT 
SUM(GUIDE_POSTS_ROW_COUNT) FROM "
                 + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " WHERE 
PHYSICAL_NAME='" + physicalTableName + "'");
@@ -447,7 +506,8 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
         PreparedStatement stmt;
         conn.createStatement().execute(
                 "CREATE TABLE " + fullTableName
-                        + "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER, 
c.v INTEGER NULL, d.v INTEGER NULL) ");
+                        + "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER, 
c.v INTEGER NULL, d.v INTEGER NULL) "
+                        + tableDDLOptions );
         stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?, ?, ?, ?)");
         byte[] val = new byte[250];
         for (int i = 0; i < nRows; i++) {
@@ -473,7 +533,7 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
         List<KeyRange> keyRanges = getAllSplits(conn, fullTableName);
         assertEquals(26, keyRanges.size());
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + 
fullTableName);
-        assertEquals("CLIENT 26-CHUNK 25 ROWS 12530 BYTES PARALLEL 1-WAY FULL 
SCAN OVER " + physicalTableName,
+        assertEquals("CLIENT 26-CHUNK 25 ROWS " + (columnEncoded ? ( mutable ? 
"12530" : "14422" ) : "12420") + " BYTES PARALLEL 1-WAY FULL SCAN OVER " + 
physicalTableName,
                 QueryUtil.getExplainPlan(rs));
 
         ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
@@ -485,7 +545,8 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
                 + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + 
Long.toString(1000);
         conn.createStatement().execute(query);
         keyRanges = getAllSplits(conn, fullTableName);
-        assertEquals(12, keyRanges.size());
+        boolean oneCellPerColFamliyStorageScheme = !mutable && columnEncoded;
+        assertEquals(oneCellPerColFamliyStorageScheme ? 13 : 12, 
keyRanges.size());
 
         rs = conn
                 .createStatement()
@@ -496,25 +557,25 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
         assertTrue(rs.next());
         assertEquals("A", rs.getString(1));
         assertEquals(24, rs.getInt(2));
-        assertEquals(12252, rs.getInt(3));
-        assertEquals(11, rs.getInt(4));
+        assertEquals(columnEncoded ? ( mutable ? 12252 : 14144 ) : 12144, 
rs.getInt(3));
+        assertEquals(oneCellPerColFamliyStorageScheme ? 12 : 11, rs.getInt(4));
 
         assertTrue(rs.next());
         assertEquals("B", rs.getString(1));
-        assertEquals(20, rs.getInt(2));
-        assertEquals(5600, rs.getInt(3));
-        assertEquals(5, rs.getInt(4));
+        assertEquals(oneCellPerColFamliyStorageScheme ? 24 : 20, rs.getInt(2));
+        assertEquals(columnEncoded ? ( mutable ? 5600 : 7492 ) : 5540, 
rs.getInt(3));
+        assertEquals(oneCellPerColFamliyStorageScheme ? 6 : 5, rs.getInt(4));
 
         assertTrue(rs.next());
         assertEquals("C", rs.getString(1));
         assertEquals(24, rs.getInt(2));
-        assertEquals(6724, rs.getInt(3));
+        assertEquals(columnEncoded ? ( mutable ? 6724 : 7516 ) : 6652, 
rs.getInt(3));
         assertEquals(6, rs.getInt(4));
 
         assertTrue(rs.next());
         assertEquals("D", rs.getString(1));
         assertEquals(24, rs.getInt(2));
-        assertEquals(6724, rs.getInt(3));
+        assertEquals(columnEncoded ? ( mutable ? 6724 : 7516 ) : 6652, 
rs.getInt(3));
         assertEquals(6, rs.getInt(4));
 
         assertFalse(rs.next());
@@ -539,7 +600,7 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
         Connection conn = getConnection();
         String ddl = "CREATE TABLE " + fullTableName + " (t_id VARCHAR NOT 
NULL,\n" + "k1 INTEGER NOT NULL,\n"
                 + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 
VARCHAR,\n"
-                + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) split on 
('e','j','o')";
+                + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + 
tableDDLOptions + " split on ('e','j','o')";
         conn.createStatement().execute(ddl);
         String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", 
"k", "l", "m", "n", "o", "p", "q", "r",
                 "s", "t", "u", "v", "w", "x", "y", "z" };
@@ -559,7 +620,7 @@ public class StatsCollectorIT extends 
BaseUniqueNamesOwnClusterIT {
             int startIndex = r.nextInt(strings.length);
             int endIndex = r.nextInt(strings.length - startIndex) + startIndex;
             long rows = endIndex - startIndex;
-            long c2Bytes = rows * 37;
+            long c2Bytes = rows * (columnEncoded ? ( mutable ? 37 : 70 ) : 35);
             String physicalTableName = 
SchemaUtil.getPhysicalHBaseTableName(fullTableName, userTableNamespaceMapped, 
PTableType.TABLE).getString();
             rs = conn.createStatement().executeQuery(
                     "SELECT 
COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH) from 
SYSTEM.STATS where PHYSICAL_NAME = '"

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
index 8e1db17..ed294bf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
@@ -25,31 +25,34 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.expression.ArrayColumnExpression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Tests to demonstrate and verify the STORE_NULLS option on a table,
@@ -57,82 +60,101 @@ import org.junit.Test;
  * functionality allows having row-level versioning (similar to how 
KEEP_DELETED_CELLS works), but
  * also allows permanently deleting a row.
  */
-
-//TODO: samarth  parameterize this test once the storage scheme is optional
+@RunWith(Parameterized.class)
 public class StoreNullsIT extends ParallelStatsDisabledIT {
-    private static final Log LOG = LogFactory.getLog(StoreNullsIT.class);
     
-    private String WITH_NULLS;
-    private String WITHOUT_NULLS;
-    private String IMMUTABLE_WITH_NULLS;
-    private String IMMUTABLE_WITHOUT_NULLS;
-    private Connection conn;
-    private Statement stmt;
-
-    @Before
-    public void setUp() throws SQLException {
-        WITH_NULLS = generateUniqueName();
-        WITHOUT_NULLS = generateUniqueName();
-        IMMUTABLE_WITH_NULLS = generateUniqueName();
-        IMMUTABLE_WITHOUT_NULLS = generateUniqueName();
-        conn = DriverManager.getConnection(getUrl());
-        conn.setAutoCommit(true);
-
-        stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE " + WITH_NULLS + " (" +
-                        "id SMALLINT NOT NULL PRIMARY KEY, " +
-                        "name VARCHAR) " +
-                "STORE_NULLS = true, VERSIONS = 1000, KEEP_DELETED_CELLS = 
false");
-        stmt.execute("CREATE TABLE " + WITHOUT_NULLS + " (" +
-                        "id SMALLINT NOT NULL PRIMARY KEY, " +
-                        "name VARCHAR) " +
-                "VERSIONS = 1000, KEEP_DELETED_CELLS = false");
-        stmt.execute("CREATE TABLE " + IMMUTABLE_WITH_NULLS + " ("
-                + "id SMALLINT NOT NULL PRIMARY KEY, name VARCHAR) "
-                + "STORE_NULLS = true, VERSIONS = 1, KEEP_DELETED_CELLS = 
false, IMMUTABLE_ROWS=true");
-        stmt.execute("CREATE TABLE " + IMMUTABLE_WITHOUT_NULLS + " ("
-                + "id SMALLINT NOT NULL PRIMARY KEY, name VARCHAR) "
-                + "VERSIONS = 1, KEEP_DELETED_CELLS = false, 
IMMUTABLE_ROWS=true");
+    private final boolean mutable;
+    private final boolean columnEncoded;
+    private final boolean storeNulls;
+    private final String ddlFormat;
+    
+    private String dataTableName;
+    
+    public StoreNullsIT(boolean mutable, boolean columnEncoded, boolean 
storeNulls) {
+        this.mutable = mutable;
+        this.columnEncoded = columnEncoded;
+        this.storeNulls = storeNulls;
+        
+        StringBuilder sb = new StringBuilder("CREATE TABLE %s (id SMALLINT NOT 
NULL PRIMARY KEY, name VARCHAR) VERSIONS = 1000, KEEP_DELETED_CELLS = false ");
+        if (!mutable) {
+            sb.append(",").append("IMMUTABLE_ROWS=true");
+        }
+        if (columnEncoded) {
+            sb.append(",").append("COLUMN_ENCODED_BYTES=4");
+        }
+        if (storeNulls) {
+            sb.append(",").append("STORE_NULLS=true");
+        }
+        this.ddlFormat = sb.toString();
     }
-
-    @After
-    public void tearDown() throws SQLException {
-        stmt.close();
-        conn.close();
+    
+    @Parameters(name="StoreNullsIT_mutable={0}, columnEncoded={1}, 
storeNulls={2}") // name is used by failsafe as file name in reports
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] { 
+                { false, false, false }, { false, false, true },
+                { false, true, false }, { false, true, true },
+                { true, false, false }, { true, false, true },
+                { true, true, false }, { true, true, true }});
+    }
+    
+    
+    @Before
+    public void setupTableNames() throws Exception {
+        dataTableName = generateUniqueName();
     }
 
     @Test
     public void testStoringNullsForImmutableTables() throws Exception {
-        stmt.executeUpdate("UPSERT INTO " + IMMUTABLE_WITH_NULLS + " VALUES 
(1, 'v1')");
-        stmt.executeUpdate("UPSERT INTO " + IMMUTABLE_WITHOUT_NULLS + " VALUES 
(1, 'v1')");
-        stmt.executeUpdate("UPSERT INTO " + IMMUTABLE_WITH_NULLS + " VALUES 
(2, null)");
-        stmt.executeUpdate("UPSERT INTO " + IMMUTABLE_WITHOUT_NULLS + " VALUES 
(2, null)");
-
-        ensureNullsStoredAsEmptyByteArrays(IMMUTABLE_WITH_NULLS);
-        ensureNullsStoredAsEmptyByteArrays(IMMUTABLE_WITHOUT_NULLS);
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(true);
+            stmt.execute(String.format(ddlFormat, dataTableName));
+            stmt.executeUpdate("UPSERT INTO " + dataTableName + " VALUES (1, 
'v1')");
+            stmt.executeUpdate("UPSERT INTO " + dataTableName + " VALUES (2, 
null)");
+            TestUtil.doMajorCompaction(conn, dataTableName);
+            ensureNullsStoredCorrectly(conn);
+        }
     }
 
-    private void ensureNullsStoredAsEmptyByteArrays(String tableName) throws 
Exception {
-        HTable htable = new HTable(getUtility().getConfiguration(), tableName);
+    private void ensureNullsStoredCorrectly(Connection conn) throws Exception {
+        ResultSet rs1 = conn.createStatement().executeQuery("SELECT NAME FROM 
"+dataTableName);
+        rs1.next();
+        assertEquals("v1", rs1.getString(1));
+        rs1.next();
+        assertNull(rs1.getString(1));
+        rs1.next();
+        
+        HTable htable = new HTable(getUtility().getConfiguration(), 
dataTableName);
         Scan s = new Scan();
         s.setRaw(true);
         ResultScanner scanner = htable.getScanner(s);
         // first row has a value for name
         Result rs = scanner.next();
+        PTable table = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, dataTableName));
+        PColumn nameColumn = table.getPColumnForColumnName("NAME");
+        byte[] qualifier = table.getStorageScheme()== 
StorageScheme.ONE_CELL_PER_COLUMN_FAMILY ? 
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES : 
nameColumn.getColumnQualifierBytes();
+        
assertTrue(rs.containsColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
qualifier));
         assertTrue(rs.size() == 2); // 2 because it also includes the empty 
key value column
-        PColumn nameColumn = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, tableName)).getPColumnForColumnName("NAME");
-        ArrayColumnExpression colExpression = new 
ArrayColumnExpression(nameColumn, "NAME", 
QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS);
+        KeyValueColumnExpression colExpression = table.getStorageScheme() == 
StorageScheme.ONE_CELL_PER_COLUMN_FAMILY ? new 
ArrayColumnExpression(nameColumn, "NAME", table.getEncodingScheme()) : new 
KeyValueColumnExpression(nameColumn);
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
         colExpression.evaluate(new ResultTuple(rs), ptr);
         assertEquals(new ImmutableBytesPtr(PVarchar.INSTANCE.toBytes("v1")), 
ptr);
-        
         rs = scanner.next();
-        assertTrue(rs.size() == 2); // 2 because it also includes the empty 
key value column
         
-        // assert null stored as empty 
+        if ( !mutable && !columnEncoded // we don't issue a put with empty 
value for immutable tables with cols stored per key value
+                || (mutable && !storeNulls)) { // for this case we use a 
delete to represent the null
+            
assertFalse(rs.containsColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
qualifier));
+            assertEquals(1, rs.size());
+        }
+        else { 
+            
assertTrue(rs.containsColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
qualifier));
+            assertEquals(2, rs.size()); 
+        }
+        // assert null stored correctly 
         ptr = new ImmutableBytesPtr();
-        colExpression.evaluate(new ResultTuple(rs), ptr);
-        assertEquals(new ImmutableBytesPtr(ByteUtil.EMPTY_BYTE_ARRAY), ptr);
+        if (colExpression.evaluate(new ResultTuple(rs), ptr)) {
+            assertEquals(new ImmutableBytesPtr(ByteUtil.EMPTY_BYTE_ARRAY), 
ptr);
+        }
         assertNull(scanner.next());
         scanner.close();
         htable.close();
@@ -140,93 +162,80 @@ public class StoreNullsIT extends ParallelStatsDisabledIT 
{
 
     @Test
     public void testQueryingHistory() throws Exception {
-        stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
-        stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 
'v1')");
-
-        Thread.sleep(10L);
-        long afterFirstInsert = System.currentTimeMillis();
-        Thread.sleep(10L);
-
-        stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, null)");
-        stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 
null)");
-        Thread.sleep(10L);
-
-        TestUtil.doMajorCompaction(conn, WITH_NULLS);
-        TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
-
-        Properties historicalProps = new Properties();
-        historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(true);
+            stmt.execute(String.format(ddlFormat, dataTableName));
+            stmt.executeUpdate("UPSERT INTO " + dataTableName + " VALUES (1, 
'v1')");
+            Thread.sleep(10L);
+            long afterFirstInsert = System.currentTimeMillis();
+            Thread.sleep(10L);
+            
+            stmt.executeUpdate("UPSERT INTO " + dataTableName + " VALUES (1, 
null)");
+            Thread.sleep(10L);
+            
+            TestUtil.doMajorCompaction(conn, dataTableName);
+            
+            Properties historicalProps = new Properties();
+            historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
                 Long.toString(afterFirstInsert));
-        Connection historicalConn = DriverManager.getConnection(getUrl(), 
historicalProps);
-        Statement historicalStmt = historicalConn.createStatement();
-
-        ResultSet rs = historicalStmt.executeQuery(
-            "SELECT name FROM " + WITH_NULLS + " WHERE id = 1");
-        assertTrue(rs.next());
-        assertEquals("v1", rs.getString(1));
-        rs.close();
+            Connection historicalConn = DriverManager.getConnection(getUrl(), 
historicalProps);
+            Statement historicalStmt = historicalConn.createStatement();
+            ResultSet rs = historicalStmt.executeQuery( "SELECT name FROM " + 
dataTableName + " WHERE id = 1");
+            
+            if (storeNulls || !mutable) { // store nulls is set to true if the 
table is immutable
+                assertTrue(rs.next());
+                assertEquals("v1", rs.getString(1));
+                rs.close();
+            } 
+            else {
+                // The single null wipes out all history for a field if 
STORE_NULLS is not enabled
+                assertTrue(rs.next());
+                assertNull(rs.getString(1));
+            }
+            
+            rs.close();
+            historicalStmt.close();
+            historicalConn.close();
+        }
 
-        // The single null wipes out all history for a field if STORE_NULLS is 
not enabled
-        rs = historicalStmt.executeQuery("SELECT name FROM " + WITHOUT_NULLS + 
" WHERE id = 1");
-        assertTrue(rs.next());
-        assertNull(rs.getString(1));
-        rs.close();
-
-        historicalStmt.close();
-        historicalConn.close();
     }
 
     // Row deletes should work in the same way regardless of what STORE_NULLS 
is set to
     @Test
     public void testDeletes() throws Exception {
-        stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
-        stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 
'v1')");
-
-        Thread.sleep(10L);
-        long afterFirstInsert = System.currentTimeMillis();
-        Thread.sleep(10L);
-
-        stmt.executeUpdate("DELETE FROM " + WITH_NULLS + " WHERE id = 1");
-        stmt.executeUpdate("DELETE FROM " + WITHOUT_NULLS + " WHERE id = 1");
-        Thread.sleep(10L);
-
-        TestUtil.doMajorCompaction(conn, WITH_NULLS);
-        TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
-
-        Properties historicalProps = new Properties();
-        historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
-                Long.toString(afterFirstInsert));
-        Connection historicalConn = DriverManager.getConnection(getUrl(), 
historicalProps);
-        Statement historicalStmt = historicalConn.createStatement();
-
-        // The row should be completely gone for both tables now
-
-        ResultSet rs = historicalStmt.executeQuery(
-            "SELECT name FROM " + WITH_NULLS + " WHERE id = 1");
-        assertFalse(rs.next());
-        rs.close();
-
-        rs = historicalStmt.executeQuery("SELECT name FROM " + WITHOUT_NULLS + 
" WHERE id = 1");
-        assertFalse(rs.next());
-        rs.close();
-    }
-
-    @Test
-    public void testSetStoreNullsDefaultViaConfig() throws SQLException {
-        Properties props = new Properties();
-        props.setProperty(QueryServices.DEFAULT_STORE_NULLS_ATTRIB, "true");
-        Connection storeNullsConn = DriverManager.getConnection(getUrl(), 
props);
-
-        Statement stmt = storeNullsConn.createStatement();
-        stmt.execute("CREATE TABLE with_nulls_default (" +
-                "id smallint primary key," +
-                "name varchar)");
-
-        ResultSet rs = stmt.executeQuery("SELECT store_nulls FROM 
SYSTEM.CATALOG " +
-                "WHERE table_name = 'WITH_NULLS_DEFAULT' AND store_nulls is 
not null");
-        assertTrue(rs.next());
-        assertTrue(rs.getBoolean(1));
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(true);
+            stmt.execute(String.format(ddlFormat, dataTableName));
+            stmt.executeUpdate("UPSERT INTO " + dataTableName + " VALUES (1, 
'v1')");
+    
+            Thread.sleep(10L);
+            long afterFirstInsert = System.currentTimeMillis();
+            Thread.sleep(10L);
+    
+            stmt.executeUpdate("DELETE FROM " + dataTableName + " WHERE id = 
1");
+            Thread.sleep(10L);
+    
+            TestUtil.doMajorCompaction(conn, dataTableName);
+    
+            Properties historicalProps = new Properties();
+            historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                    Long.toString(afterFirstInsert));
+            Connection historicalConn = DriverManager.getConnection(getUrl(), 
historicalProps);
+            Statement historicalStmt = historicalConn.createStatement();
+    
+            // The row should be completely gone for both tables now
+    
+            ResultSet rs = historicalStmt.executeQuery(
+                "SELECT name FROM " + dataTableName + " WHERE id = 1");
+            assertFalse(rs.next());
+            rs.close();
+    
+            rs = historicalStmt.executeQuery("SELECT name FROM " + 
dataTableName + " WHERE id = 1");
+            assertFalse(rs.next());
+            rs.close();
+        }
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsPropIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsPropIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsPropIT.java
new file mode 100644
index 0000000..26ff629
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsPropIT.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Test;
+
+public class StoreNullsPropIT extends ParallelStatsDisabledIT {
+
+    @Test
+    public void testSetStoreNullsDefaultViaConfig() throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.DEFAULT_STORE_NULLS_ATTRIB, "true");
+        Connection storeNullsConn = DriverManager.getConnection(getUrl(), 
props);
+
+        Statement stmt = storeNullsConn.createStatement();
+        stmt.execute("CREATE TABLE with_nulls_default (" +
+                "id smallint primary key," +
+                "name varchar)");
+
+        ResultSet rs = stmt.executeQuery("SELECT store_nulls FROM 
SYSTEM.CATALOG " +
+                "WHERE table_name = 'WITH_NULLS_DEFAULT' AND store_nulls is 
not null");
+        assertTrue(rs.next());
+        assertTrue(rs.getBoolean(1));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
index 6b394c1..f9ef0c2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
@@ -27,8 +27,8 @@ import com.google.common.collect.Maps;
 
 public class SysTableNamespaceMappedStatsCollectorIT extends StatsCollectorIT {
     
-    public SysTableNamespaceMappedStatsCollectorIT(boolean transactional, 
boolean userTableNamespaceMapped) {
-        super(transactional, userTableNamespaceMapped);
+    public SysTableNamespaceMappedStatsCollectorIT(boolean mutable, boolean 
transactional, boolean userTableNamespaceMapped, boolean columnEncoded) {
+        super(mutable, transactional, userTableNamespaceMapped, columnEncoded);
     }
     
     @BeforeClass

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index c8318ec..7615935 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -61,7 +61,6 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index 499f58c..d1ab61e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -614,11 +615,13 @@ public class MutableIndexIT extends 
ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore //TODO remove after PHOENIX-3585 is fixed
     public void testSplitDuringIndexScan() throws Exception {
         testSplitDuringIndexScan(false);
     }
     
     @Test
+    @Ignore //TODO remove after PHOENIX-3585 is fixed
     public void testSplitDuringIndexReverseScan() throws Exception {
         testSplitDuringIndexScan(true);
     }
@@ -676,6 +679,7 @@ public class MutableIndexIT extends ParallelStatsDisabledIT 
{
     }
 
     @Test
+    @Ignore //TODO remove after PHOENIX-3585 is fixed
     public void testIndexHalfStoreFileReader() throws Exception {
         Connection conn1 = getConnection();
         HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index fb4e3c3..2c3c663 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -368,6 +368,8 @@ public enum SQLExceptionCode {
     CANNOT_ALTER_TABLE_PROPERTY_ON_VIEW(1134, "XCL34", "Altering this table 
property on a view is not allowed"),
     
     IMMUTABLE_TABLE_PROPERTY_INVALID(1135, "XCL35", "IMMUTABLE table property 
cannot be used with CREATE IMMUTABLE TABLE statement "),
+    
+    MAX_COLUMNS_EXCEEDED(1136, "XCL36", "The number of columns exceed the 
maximum supported by the table's qualifier encoding scheme"),
 
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, 
sqlstate INT).

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index fd8ac8b..d345a07 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -306,6 +306,9 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] SYSTEM_MUTEX_NAME_BYTES = 
Bytes.toBytes(SYSTEM_MUTEX_NAME);
     public static final byte[] SYSTEM_MUTEX_FAMILY_NAME_BYTES = 
TABLE_FAMILY_BYTES;
     
+    public static final String COLUMN_ENCODED_BYTES = "COLUMN_ENCODED_BYTES";
+    public static final byte[] COLUMN_ENCODED_BYTES_BYTES = 
Bytes.toBytes(COLUMN_ENCODED_BYTES);
+    
     private final PhoenixConnection connection;
     private final ResultSet emptyResultSet;
     public static final int MAX_LOCAL_SI_VERSION_DISALLOW = 
VersionUtil.encodeVersion("0", "98", "8");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index eee4a5f..4085251 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -224,7 +224,7 @@ public interface QueryConstants {
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = 
new ImmutableBytesPtr(
             DEFAULT_COLUMN_FAMILY_BYTES);
     // column qualifier of the single key value used to store all columns for 
the COLUMNS_STORED_IN_SINGLE_CELL storage scheme
-    public static final String SINGLE_KEYVALUE_COLUMN_QUALIFIER = "0";
+    public static final String SINGLE_KEYVALUE_COLUMN_QUALIFIER = "1";
     public final static byte[] SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES = 
Bytes.toBytes(SINGLE_KEYVALUE_COLUMN_QUALIFIER);
     public static final ImmutableBytesPtr 
SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR = new ImmutableBytesPtr(
             SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 65d385e..3789138 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -226,6 +226,8 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String CLIENT_CACHE_ENCODING = 
"phoenix.table.client.cache.encoding";
     public static final String AUTO_UPGRADE_ENABLED = 
"phoenix.autoupgrade.enabled";
+    
+    public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB  = 
"phoenix.default.column.encoded.bytes.attrib";
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3b4d9cc..60599f9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -256,6 +256,8 @@ public class QueryServicesOptions {
     
     public static final String DEFAULT_CLIENT_CACHE_ENCODING = 
PTableRefFactory.Encoding.OBJECT.toString();
     public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
+    
+    public static final int DEFAULT_COLUMN_ENCODED_BYTES = 0;
 
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new 
HashSet<String>() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7c3876a..67b7663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -93,7 +93,6 @@ import static 
org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
 import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
-import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static 
org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_COLUMN_FAMILY;
 import static 
org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN;
@@ -199,6 +198,7 @@ import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRangeException;
 import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
@@ -977,7 +977,7 @@ public class MetaDataClient {
         }
         table = createTableInternal(statement, splits, parent, viewStatement, 
viewType, viewColumnConstants, isViewColumnReferenced, false, null, null, 
tableProps, commonFamilyProps);
 
-        if (table == null || table.getType() == PTableType.VIEW || 
table.isTransactional()) {
+        if (table == null || table.getType() == PTableType.VIEW /*|| 
table.isTransactional()*/) {
             return new MutationState(0,connection);
         }
         // Hack to get around the case when an SCN is specified on the 
connection.
@@ -1695,6 +1695,7 @@ public class MetaDataClient {
                     ? SchemaUtil.isNamespaceMappingEnabled(tableType, 
connection.getQueryServices().getProps())
                     : parent.isNamespaceMapped();
             boolean isLocalIndex = indexType == IndexType.LOCAL;
+            QualifierEncodingScheme encodingScheme = 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, 
transactional);
                 storeNulls = parent.getStoreNulls();
@@ -2033,7 +2034,6 @@ public class MetaDataClient {
             int pkPositionOffset = pkColumns.size();
             int position = positionOffset;
             StorageScheme storageScheme = ONE_CELL_PER_KEYVALUE_COLUMN;
-            QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
             EncodedCQCounter cqCounter = NULL_COUNTER;
             PTable viewPhysicalTable = null;
             if (tableType == PTableType.VIEW) {
@@ -2087,23 +2087,22 @@ public class MetaDataClient {
                     tableExists = false;
                 }
                 if (tableExists) {
-                    storageScheme = ONE_CELL_PER_KEYVALUE_COLUMN;
                     encodingScheme = NON_ENCODED_QUALIFIERS;
                 } else if (parent != null) {
-                    storageScheme = parent.getStorageScheme();
                     encodingScheme = parent.getEncodingScheme();
-                } else if (isImmutableRows) {
-                    storageScheme = ONE_CELL_PER_COLUMN_FAMILY;
-                    encodingScheme = FOUR_BYTE_QUALIFIERS;
-                    // since we are storing all columns of a column family in 
a single key value we can't use deletes to store nulls
-                    storeNulls = true;
                 } else {
-                    storageScheme = ONE_CELL_PER_KEYVALUE_COLUMN;
-                    encodingScheme = FOUR_BYTE_QUALIFIERS;
+                       Byte encodingSchemeSerializedByte = (Byte) 
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
+                    if (encodingSchemeSerializedByte == null) {
+                       encodingSchemeSerializedByte = 
(byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB,
 QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES);
+                    } 
+                    encodingScheme =  
QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte);
                 }
+                if (isImmutableRows && encodingScheme != 
NON_ENCODED_QUALIFIERS) {
+                    storageScheme = ONE_CELL_PER_COLUMN_FAMILY;
+                } 
                 cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new 
EncodedCQCounter() : NULL_COUNTER;
             }
-            
+                        
             Map<String, Integer> changedCqCounters = new 
HashMap<>(colDefs.size());
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = 
checkAndValidateRowTimestampCol(colDef, pkConstraint, 
rowTimeStampColumnAlreadyFound, tableType);
@@ -2137,7 +2136,15 @@ public class MetaDataClient {
                     }
                 }
                 Integer encodedCQ =  isPkColumn ? null : 
cqCounter.getNextQualifier(cqCounterFamily);
-                byte[] columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), 
encodedCQ, encodingScheme);
+                byte[] columnQualifierBytes = null;
+                try {
+                    columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), 
encodedCQ, encodingScheme);
+                }
+                catch (QualifierOutOfRangeException e) {
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
+                    .setSchemaName(schemaName)
+                    .setTableName(tableName).build().buildException();
+                }
                 PColumn column = newColumn(position++, colDef, pkConstraint, 
defaultFamilyName, false, columnQualifierBytes);
                 if (cqCounter.increment(cqCounterFamily)) {
                     changedCqCounters.put(cqCounterFamily, 
cqCounter.getNextQualifier(cqCounterFamily));
@@ -2177,8 +2184,6 @@ public class MetaDataClient {
                 }
             }
             
-            
-            
             // We need a PK definition for a TABLE or mapped VIEW
             if (!isPK && pkColumnsNames.isEmpty() && tableType != 
PTableType.VIEW && viewType != ViewType.MAPPED) {
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
@@ -3182,7 +3187,15 @@ public class MetaDataClient {
                                         
cqCounterToUse.getNextQualifier(familyName));
                                 }
                             }
-                            byte[] columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(),
 encodedCQ, table);
+                            byte[] columnQualifierBytes = null;
+                            try {
+                                columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(),
 encodedCQ, table);
+                            }
+                            catch (QualifierOutOfRangeException e) {
+                                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
+                                .setSchemaName(schemaName)
+                                
.setTableName(tableName).build().buildException();
+                            }
                             PColumn column = newColumn(position++, colDef, 
PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : 
table.getDefaultFamilyName().getString(), true, columnQualifierBytes);
                             columns.add(column);
                             String pkName = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 9962859..36df961 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -145,6 +145,27 @@ public enum TableProperty {
         }       
            
        },
+       
+       COLUMN_ENCODED_BYTES(PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES, 
COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, false, false, false) {
+           @Override
+        public Object getValue(Object value) {
+               if (value instanceof String) {
+                   String strValue = (String) value;
+                   if ("NONE".equalsIgnoreCase(strValue)) {
+                       return (byte)0;
+                   } 
+               } else {
+                   return value == null ? null : ((Number) value).byteValue();
+               }
+               return value;
+           }
+
+               @Override
+               public Object getPTableValue(PTable table) {
+                       return table.getEncodingScheme();
+               }       
+           
+       }
     ;
        
        private final String propertyName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 67aae72..6c8ac48 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -762,7 +762,7 @@ public class QueryOptimizerTest extends 
BaseConnectionlessQueryTest {
     public void testMinMaxQualifierRangeWithOrderByOnKVColumn() throws 
Exception {
         Connection conn = DriverManager.getConnection(getUrl());
         String tableName = "testMintestMinMaxQualifierRange".toUpperCase();
-        conn.createStatement().execute("CREATE TABLE " + tableName + " (k 
INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 VARCHAR)");
+        conn.createStatement().execute("CREATE TABLE " + tableName + " (k 
INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 VARCHAR) COLUMN_ENCODED_BYTES=4");
         PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
         ResultSet rs = stmt.executeQuery("SELECT K from " + tableName + " 
ORDER BY (v1)");
         assertQualifierRanges(rs, ENCODED_EMPTY_COLUMN_NAME, 
ENCODED_CQ_COUNTER_INITIAL_VALUE);
@@ -778,7 +778,7 @@ public class QueryOptimizerTest extends 
BaseConnectionlessQueryTest {
     public void testMinMaxQualifierRangeWithNoOrderBy() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
         String tableName = "testMintestMinMaxQualifierRange".toUpperCase();
-        conn.createStatement().execute("CREATE TABLE " + tableName + " (k 
INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 VARCHAR)");
+        conn.createStatement().execute("CREATE TABLE " + tableName + " (k 
INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 VARCHAR) COLUMN_ENCODED_BYTES=4");
         PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
         ResultSet rs = stmt.executeQuery("SELECT K from " + tableName);
         assertQualifierRanges(rs, ENCODED_CQ_COUNTER_INITIAL_VALUE, 
ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index 8553b73..276d946 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -127,11 +127,11 @@ public class MutationStateTest {
     private void assertTable(String tableName1,List<KeyValue> 
keyValues1,String tableName2,List<KeyValue> keyValues2) {
         assertTrue("MUTATION_TEST1".equals(tableName1));
         
assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(111),CellUtil.cloneRow(keyValues1.get(0))));
-        
assertTrue("app1".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues1.get(1)))));
+        
assertTrue("app1".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues1.get(0)))));
 
         assertTrue("MUTATION_TEST2".equals(tableName2));
         
assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(222),CellUtil.cloneRow(keyValues2.get(0))));
-        
assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(1)))));
+        
assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(0)))));
 
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 41d6e7b..a87b4f2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -147,7 +147,8 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.distributed.TransactionService;
 import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.utils.Networks;
@@ -450,14 +451,18 @@ public abstract class BaseTest {
         
     }
     
-    protected static void setupTxManager() throws SQLException, IOException {
+    protected static void setTxnConfigs() throws IOException {
         config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
         config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times");
         config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
         config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, 
Networks.getRandomPort());
         config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, 
tmpFolder.newFolder().getAbsolutePath());
         config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 
DEFAULT_TXN_TIMEOUT_SECONDS);
-
+        config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+        config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
+    }
+    
+    protected static void setupTxManager() throws SQLException, IOException {
         ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
         zkClient = ZKClientServices.delegate(
           ZKClients.reWatchOnExpire(
@@ -473,7 +478,7 @@ public abstract class BaseTest {
         zkClient.startAndWait();
 
         DiscoveryService discovery = new ZKDiscoveryService(zkClient);
-        txManager = new TransactionManager(config, new 
InMemoryTransactionStateStorage(), new TxMetricsCollector());
+        txManager = new TransactionManager(config, new 
HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new 
TxMetricsCollector()), new TxMetricsCollector());
         txService = new TransactionService(config, zkClient, discovery, 
Providers.of(txManager));
         txService.startAndWait();
     }
@@ -502,8 +507,9 @@ public abstract class BaseTest {
     /**
      * Set up the test hbase cluster.
      * @return url to be used by clients to connect to the cluster.
+     * @throws IOException 
      */
-    protected static String setUpTestCluster(@Nonnull Configuration conf, 
ReadOnlyProps overrideProps) {
+    protected static String setUpTestCluster(@Nonnull Configuration conf, 
ReadOnlyProps overrideProps) throws IOException {
         boolean isDistributedCluster = isDistributedClusterModeEnabled(conf);
         if (!isDistributedCluster) {
             return initMiniCluster(conf, overrideProps);
@@ -558,8 +564,9 @@ public abstract class BaseTest {
     }
     
     protected static void setUpTestDriver(ReadOnlyProps serverProps, 
ReadOnlyProps clientProps) throws Exception {
+        setTxnConfigs();
         String url = checkClusterInitialized(serverProps);
-        checkTxManagerInitialized(clientProps);
+        checkTxManagerInitialized(serverProps);
         if (driver == null) {
             driver = initAndRegisterTestDriver(url, clientProps);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
index 1d71ec0..76479d6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
@@ -95,8 +95,8 @@ public class ConnectionlessTest {
         "    entity_history_id char(12) not null,\n" + 
         "    created_by varchar,\n" + 
         "    created_date date\n" +
-        "    CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, 
entity_history_id) ) " +
-        (saltBuckets == null ? "" : (PhoenixDatabaseMetaData.SALT_BUCKETS + 
"=" + saltBuckets));
+        "    CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, 
entity_history_id) ) COLUMN_ENCODED_BYTES=4 " +
+        (saltBuckets == null ? "" : " , " + 
(PhoenixDatabaseMetaData.SALT_BUCKETS + "=" + saltBuckets));
         Properties props = new Properties();
         Connection conn = DriverManager.getConnection(getUrl(), props);
         PreparedStatement statement = conn.prepareStatement(dmlStmt);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d9035c/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 3c9a1bc..ead712b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -74,6 +74,7 @@ import org.apache.phoenix.compile.StatementContext;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ByteBasedLikeExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
@@ -108,6 +109,7 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PLongColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
@@ -769,15 +771,26 @@ public class TestUtil {
     
         // We simply write a marker row, request a major compaction, and then 
wait until the marker
         // row is gone
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), 
tableName));
         ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-        try (HTableInterface htable = 
services.getTable(Bytes.toBytes(tableName))) {
+        MutationState mutationState = pconn.getMutationState();
+        if (table.isTransactional()) {
+            mutationState.startTransaction();
+        }
+        try (HTableInterface htable = mutationState.getHTable(table)) {
             byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
-        
+           
             Put put = new Put(markerRowKey);
-            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
HConstants.EMPTY_BYTE_ARRAY,
-                    HConstants.EMPTY_BYTE_ARRAY);
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES, 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
             htable.put(put);
-            htable.delete(new Delete(markerRowKey));
+            Delete delete = new Delete(markerRowKey);
+            delete.deleteColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+            htable.delete(delete);
+            htable.close();
+            if (table.isTransactional()) {
+                mutationState.commit();
+            }
         
             HBaseAdmin hbaseAdmin = services.getAdmin();
             hbaseAdmin.flush(tableName);
@@ -786,19 +799,28 @@ public class TestUtil {
         
             boolean compactionDone = false;
             while (!compactionDone) {
-                Thread.sleep(2000L);
+                Thread.sleep(6000L);
                 Scan scan = new Scan();
                 scan.setStartRow(markerRowKey);
                 scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
                 scan.setRaw(true);
         
-                ResultScanner scanner = htable.getScanner(scan);
-                List<Result> results = Lists.newArrayList(scanner);
-                LOG.info("Results: " + results);
-                compactionDone = results.isEmpty();
-                scanner.close();
-        
+                try (HTableInterface htableForRawScan = 
services.getTable(Bytes.toBytes(tableName))) {
+                    ResultScanner scanner = htableForRawScan.getScanner(scan);
+                    List<Result> results = Lists.newArrayList(scanner);
+                    LOG.info("Results: " + results);
+                    compactionDone = results.isEmpty();
+                    scanner.close();
+                }
                 LOG.info("Compaction done: " + compactionDone);
+                
+                // need to run compaction after the next txn snapshot has been 
written so that compaction can remove deleted rows
+                if (!compactionDone && table.isTransactional()) {
+                    hbaseAdmin = services.getAdmin();
+                    hbaseAdmin.flush(tableName);
+                    hbaseAdmin.majorCompact(tableName);
+                    hbaseAdmin.close();
+                }
             }
         }
     }

Reply via email to