Repository: phoenix
Updated Branches:
  refs/heads/4.13-HBase-0.98 92546891e -> a561da9d0


PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>> mutations


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a561da9d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a561da9d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a561da9d

Branch: refs/heads/4.13-HBase-0.98
Commit: a561da9d0f59a2c5e956fb687f51498a2d374a5d
Parents: 9254689
Author: Thomas D'Silva <tdsi...@apache.org>
Authored: Fri Nov 17 11:11:43 2017 -0800
Committer: Thomas D'Silva <tdsi...@apache.org>
Committed: Fri Nov 17 15:53:28 2017 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/MutationStateIT.java | 144 +++++++++++++++++++
 .../org/apache/phoenix/end2end/QueryMoreIT.java |  42 ------
 .../apache/phoenix/compile/DeleteCompiler.java  |   6 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   4 +-
 .../apache/phoenix/execute/MutationState.java   |  50 +++++--
 .../org/apache/phoenix/util/KeyValueUtil.java   |  51 ++-----
 6 files changed, 201 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a561da9d/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
new file mode 100644
index 0000000..2d5f360
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -0,0 +1,144 @@
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Test;
+
+public class MutationStateIT extends ParallelStatsDisabledIT {
+
+    private static final String DDL =
+            " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, "
+            + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT 
PAGE_SNAPSHOT_PK "
+            + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) 
MULTI_TENANT=TRUE";
+
+    private void upsertRows(PhoenixConnection conn, String fullTableName) 
throws SQLException {
+        PreparedStatement stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score) values 
(?,?,?)");
+        for (int i = 0; i < 10000; i++) {
+            stmt.setString(1, "AAAA" + i);
+            stmt.setString(2, "BBBB" + i);
+            stmt.setInt(3, 1);
+            stmt.execute();
+        }
+    }
+
+    @Test
+    public void testMaxMutationSize() throws Exception {
+        Properties connectionProperties = new Properties();
+        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
+        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, 
"1000000");
+        PhoenixConnection connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), 
connectionProperties);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(
+                "CREATE TABLE " + fullTableName + DDL);
+        }
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(),
+                e.getErrorCode());
+        }
+
+        // set the max mutation size (bytes) to a low value
+        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, 
"1000");
+        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, 
"4");
+        connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), 
connectionProperties);
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(),
+                e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testMutationEstimatedSize() throws Exception {
+        PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl());
+        conn.setAutoCommit(false);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(
+                "CREATE TABLE " + fullTableName + DDL);
+        }
+
+        // upserting rows should increase the mutation state size
+        MutationState state = 
conn.unwrap(PhoenixConnection.class).getMutationState();
+        long prevEstimatedSize = state.getEstimatedSize();
+        upsertRows(conn, fullTableName);
+        assertTrue("Mutation state size should have increased",
+            state.getEstimatedSize() > prevEstimatedSize);
+        
+        
+        // after commit or rollback the size should be zero
+        conn.commit();
+        assertEquals("Mutation state size should be zero after commit", 0,
+            state.getEstimatedSize());
+        upsertRows(conn, fullTableName);
+        conn.rollback();
+        assertEquals("Mutation state size should be zero after rollback", 0,
+            state.getEstimatedSize());
+
+        // upsert one row
+        PreparedStatement stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score) values 
(?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.execute();
+        assertTrue("Mutation state size should be greater than zero ", 
state.getEstimatedSize()>0);
+
+        prevEstimatedSize = state.getEstimatedSize();
+        // upserting the same row twice should not increase the size
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.execute();
+        assertEquals(
+            "Mutation state size should only increase 4 bytes (size of the new 
statement index)",
+            prevEstimatedSize + 4, state.getEstimatedSize());
+        
+        prevEstimatedSize = state.getEstimatedSize();
+        // changing the value of one column of a row to a larger value should 
increase the estimated size 
+        stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score, tags) values 
(?,?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.setString(4, "random text string random text string random text 
string");
+        stmt.execute();
+        assertTrue("Mutation state size should increase", prevEstimatedSize+4 
< state.getEstimatedSize());
+        
+        prevEstimatedSize = state.getEstimatedSize();
+        // changing the value of one column of a row to a smaller value should 
decrease the estimated size 
+        stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score, tags) values 
(?,?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.setString(4, "");
+        stmt.execute();
+        assertTrue("Mutation state size should decrease", prevEstimatedSize+4 
> state.getEstimatedSize());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a561da9d/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 77cb19f..9109c12 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -39,7 +38,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -510,46 +508,6 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         assertEquals(4L, connection.getMutationState().getBatchCount());
     }
     
-    @Test
-    public void testMaxMutationSize() throws Exception {
-        Properties connectionProperties = new Properties();
-        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
-        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, 
"1000000");
-        PhoenixConnection connection = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), connectionProperties);
-        String fullTableName = generateUniqueName();
-        try (Statement stmt = connection.createStatement()) {
-            stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
-                "    ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
-                "    SCORE DOUBLE NOT NULL,\n" +
-                "    ENTITY_ID CHAR(15) NOT NULL\n" +
-                "    CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
-                "        ORGANIZATION_ID,\n" +
-                "        SCORE DESC,\n" +
-                "        ENTITY_ID DESC\n" +
-                "    )\n" +
-                ") MULTI_TENANT=TRUE");
-        }
-        try {
-            upsertRows(connection, fullTableName);
-            fail();
-        }
-        catch(SQLException e) {
-            
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), 
e.getErrorCode());
-        }
-        
-        // set the max mutation size (bytes) to a low value
-        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, 
"1000");
-        
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, 
"4");
-        connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), 
connectionProperties);
-        try {
-            upsertRows(connection, fullTableName);
-            fail();
-        }
-        catch(SQLException e) {
-            
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), 
e.getErrorCode());
-        }
-    }
-
     private void upsertRows(PhoenixConnection conn, String fullTableName) 
throws SQLException {
         PreparedStatement stmt = conn.prepareStatement("upsert into " + 
fullTableName +
                 " (organization_id, entity_id, score) values (?,?,?)");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a561da9d/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f038cda..4218c59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -207,7 +207,7 @@ public class DeleteCompiler {
                 // row key will already have its value.
                 // Check for otherTableRefs being empty required when deleting 
directly from the index
                 if (otherTableRefs.isEmpty() || table.getIndexType() != 
IndexType.LOCAL) {
-                    mutations.put(rowKeyPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
+                    mutations.put(rowKeyPtr, new 
RowMutationState(PRow.DELETE_MARKER, 0, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
                 }
                 for (int i = 0; i < otherTableRefs.size(); i++) {
                     PTable otherTable = otherTableRefs.get(i).getTable();
@@ -221,7 +221,7 @@ public class DeleteCompiler {
                     } else {
                         indexPtr.set(maintainers[i].buildRowKey(getter, 
rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP));
                     }
-                    indexMutations.get(i).put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
+                    indexMutations.get(i).put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 0, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of 
" + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -835,4 +835,4 @@ public class DeleteCompiler {
             };
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a561da9d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index f5e2ae0..dcae16f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -119,6 +119,7 @@ public class UpsertCompiler {
             PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
             PhoenixStatement statement, boolean useServerTimestamp, 
IndexMaintainer maintainer,
             byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) 
throws SQLException {
+        long columnValueSize = 0;
         Map<PColumn,byte[]> columnValues = 
Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set 
to an empty array
@@ -148,6 +149,7 @@ public class UpsertCompiler {
                 }
             } else {
                 columnValues.put(column, value);
+                columnValueSize += (column.getEstimatedSize() + value.length);
             }
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
@@ -165,7 +167,7 @@ public class UpsertCompiler {
                 ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, 
regionPrefix.length));
             }
         } 
-        mutation.put(ptr, new RowMutationState(columnValues, 
statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, 
onDupKeyBytes));
+        mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, 
statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, 
onDupKeyBytes));
     }
     
     public static MutationState upsertSelect(StatementContext childContext, 
TableRef tableRef, RowProjector projector,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a561da9d/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1e5a8ad..c342e4a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -94,6 +94,7 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.TraceScope;
@@ -195,9 +196,13 @@ public class MutationState implements SQLCloseable {
             this.mutations.put(table, mutations);
         }
         this.numRows = mutations.size();
-        this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, 
mutations);
+        this.estimatedSize = 
KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
         throwIfTooBig();
     }
+    
+    public long getEstimatedSize() {
+        return estimatedSize;
+    }
 
     public long getMaxSize() {
         return maxSize;
@@ -437,9 +442,16 @@ public class MutationState implements SQLCloseable {
         this.sizeOffset += newMutationState.sizeOffset;
         int oldNumRows = this.numRows;
         joinMutationState(newMutationState.mutations, this.mutations);
-        // here we increment the estimated size by the fraction of new rows we 
added from the newMutationState 
         if (newMutationState.numRows>0) {
-            this.estimatedSize += 
((double)(this.numRows-oldNumRows)/newMutationState.numRows) * 
newMutationState.estimatedSize;
+            // if we added all the rows from newMutationState we can just 
increment the
+            // estimatedSize by newMutationState.estimatedSize
+            if (newMutationState.numRows == this.numRows-oldNumRows) {
+                this.estimatedSize +=  newMutationState.estimatedSize;
+            }
+            // we merged the two mutation states so we need to recalculate the 
size
+            else {
+                this.estimatedSize = 
KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
+            }
         }
         if (!newMutationState.txMutations.isEmpty()) {
             if (txMutations.isEmpty()) {
@@ -975,8 +987,6 @@ public class MutationState implements SQLCloseable {
                 long mutationCommitTime = 0;
                 long numFailedMutations = 0;;
                 long startTime = 0;
-                long startNumRows = numRows;
-                long startEstimatedSize = estimatedSize;
                 do {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
@@ -1022,13 +1032,13 @@ public class MutationState implements SQLCloseable {
                         GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                         numFailedMutations = 0;
                         
+                        // Remove batches as we process them
+                        mutations.remove(origTableRef);
                         if (tableInfo.isDataTable()) {
                             numRows -= numMutations;
-                            // decrement estimated size by the fraction of 
rows we sent to hbase
-                            estimatedSize -= 
((double)numMutations/startNumRows)*startEstimatedSize;
+                            // recalculate the estimated size
+                            estimatedSize = 
KeyValueUtil.getEstimatedRowMutationSize(mutations);
                         }
-                        // Remove batches as we process them
-                        mutations.remove(origTableRef);
                     } catch (Exception e) {
                        mutationCommitTime = System.currentTimeMillis() - 
startTime;
                         serverTimestamp = ServerUtil.parseServerTimestamp(e);
@@ -1427,8 +1437,9 @@ public class MutationState implements SQLCloseable {
         private int[] statementIndexes;
         @Nonnull private final RowTimestampColInfo rowTsColInfo;
         private byte[] onDupKeyBytes;
+        private long colValuesSize;
         
-        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int 
statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
+        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, 
long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo 
rowTsColInfo,
                 byte[] onDupKeyBytes) {
             checkNotNull(columnValues);
             checkNotNull(rowTsColInfo);
@@ -1436,6 +1447,12 @@ public class MutationState implements SQLCloseable {
             this.statementIndexes = new int[] {statementIndex};
             this.rowTsColInfo = rowTsColInfo;
             this.onDupKeyBytes = onDupKeyBytes;
+            this.colValuesSize = colValuesSize;
+        }
+
+        public long calculateEstimatedSize() {
+            return colValuesSize + statementIndexes.length * 
SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE
+                    + (onDupKeyBytes != null ? onDupKeyBytes.length : 0);
         }
 
         byte[] getOnDupKeyBytes() {
@@ -1454,7 +1471,16 @@ public class MutationState implements SQLCloseable {
             // If we already have a row and the new row has an ON DUPLICATE 
KEY clause
             // ignore the new values (as that's what the server will do).
             if (newRow.onDupKeyBytes == null) {
-                getColumnValues().putAll(newRow.getColumnValues());
+                // increment the column value size by the new row column value 
size
+                colValuesSize+=newRow.colValuesSize;
+                for (Map.Entry<PColumn,byte[]> entry : 
newRow.columnValues.entrySet()) {
+                    PColumn col = entry.getKey();
+                    byte[] oldValue = columnValues.put(col, entry.getValue());
+                    if (oldValue!=null) {
+                        // decrement column value size by the size of all 
column values that were replaced
+                        colValuesSize-=(col.getEstimatedSize() + 
oldValue.length);
+                    }
+                }
             }
             // Concatenate ON DUPLICATE KEY bytes to allow multiple
             // increments of the same row in the same commit batch.
@@ -1466,7 +1492,7 @@ public class MutationState implements SQLCloseable {
         RowTimestampColInfo getRowTimestampColInfo() {
             return rowTsColInfo;
         }
-       
+
     }
     
     public ReadMetricQueue getReadMetricQueue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a561da9d/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index 2dfe1b9..318c9d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -30,14 +30,10 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
 
 /**
  * 
@@ -187,47 +183,26 @@ public class KeyValueUtil {
     }
 
     /**
-     * Estimates the storage size of a row
+     * Estimates the size of rows stored in RowMutationState (in memory)
      * @param mutations map from table to row to RowMutationState
      * @return estimated row size
      */
     public static long
-            getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, 
RowMutationState> mutations) {
+            getEstimatedRowMutationSize(Map<TableRef, Map<ImmutableBytesPtr, 
RowMutationState>> tableMutationMap) {
         long size = 0;
-        PTable table = tableRef.getTable();
-        // iterate over rows
-        for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
mutations.entrySet()) {
-            int rowLength = rowEntry.getKey().getLength();
-            Map<PColumn, byte[]> colValueMap = 
rowEntry.getValue().getColumnValues();
-            switch (table.getImmutableStorageScheme()) {
-            case ONE_CELL_PER_COLUMN:
-                // iterate over columns
-                for (Entry<PColumn, byte[]> colValueEntry : 
colValueMap.entrySet()) {
-                    PColumn pColumn = colValueEntry.getKey();
-                    size +=
-                            KeyValue.getKeyValueDataStructureSize(rowLength,
-                                pColumn.getFamilyName().getBytes().length,
-                                pColumn.getColumnQualifierBytes().length,
-                                colValueEntry.getValue().length);
-                }
-                break;
-            case SINGLE_CELL_ARRAY_WITH_OFFSETS:
-                // we store all the column values in a single key value that 
contains all the
-                // column values followed by an offset array
-                size +=
-                        PArrayDataTypeEncoder.getEstimatedByteSize(table, 
rowLength,
-                            colValueMap);
-                break;
+        // iterate over table
+        for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
tableEntry : tableMutationMap.entrySet()) {
+            // iterate over rows
+            for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
tableEntry.getValue().entrySet()) {
+                size += calculateRowMutationSize(rowEntry);
             }
-            // count the empty key value
-            Pair<byte[], byte[]> emptyKeyValueInfo =
-                    EncodedColumnsUtil.getEmptyKeyValueInfo(table);
-            size +=
-                    KeyValue.getKeyValueDataStructureSize(rowLength,
-                        SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
-                        emptyKeyValueInfo.getFirst().length,
-                        emptyKeyValueInfo.getSecond().length);
         }
         return size;
     }
+
+    private static long calculateRowMutationSize(Entry<ImmutableBytesPtr, 
RowMutationState> rowEntry) {
+        int rowLength = rowEntry.getKey().getLength();
+        long colValuesLength = rowEntry.getValue().calculateEstimatedSize();
+        return (rowLength + colValuesLength);
+    }
 }

Reply via email to