PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a6db0392 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a6db0392 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a6db0392 Branch: refs/heads/4.x-HBase-1.2 Commit: a6db039268964534715eab31fcc985861de4d0a9 Parents: bab1e46 Author: Thomas D'Silva <tdsi...@apache.org> Authored: Fri Nov 17 19:11:43 2017 +0000 Committer: James Taylor <jtay...@salesforce.com> Committed: Sat Dec 16 16:42:54 2017 -0800 ---------------------------------------------------------------------- .../apache/phoenix/end2end/MutationStateIT.java | 144 +++++++++++++++++++ .../org/apache/phoenix/end2end/QueryMoreIT.java | 42 ------ .../apache/phoenix/compile/DeleteCompiler.java | 6 +- .../apache/phoenix/compile/UpsertCompiler.java | 4 +- .../apache/phoenix/execute/MutationState.java | 50 +++++-- .../org/apache/phoenix/util/KeyValueUtil.java | 51 ++----- 6 files changed, 201 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6db0392/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java new file mode 100644 index 0000000..2d5f360 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -0,0 +1,144 @@ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.junit.Test; + +public class MutationStateIT extends ParallelStatsDisabledIT { + + private static final String DDL = + " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, " + + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK " + + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE"; + + private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { + PreparedStatement stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + for (int i = 0; i < 10000; i++) { + stmt.setString(1, "AAAA" + i); + stmt.setString(2, "BBBB" + i); + stmt.setInt(3, 1); + stmt.execute(); + } + } + + @Test + public void testMaxMutationSize() throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); + PhoenixConnection connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + String fullTableName = generateUniqueName(); + try (Statement stmt = connection.createStatement()) { + stmt.execute( + "CREATE TABLE " + fullTableName + DDL); + } + try { + upsertRows(connection, fullTableName); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), + e.getErrorCode()); + } + + // set the max mutation size (bytes) to a low value + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); + connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + try { + upsertRows(connection, fullTableName); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), + e.getErrorCode()); + } + } + + @Test + public void testMutationEstimatedSize() throws Exception { + PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); + conn.setAutoCommit(false); + String fullTableName = generateUniqueName(); + try (Statement stmt = conn.createStatement()) { + stmt.execute( + "CREATE TABLE " + fullTableName + DDL); + } + + // upserting rows should increase the mutation state size + MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); + long prevEstimatedSize = state.getEstimatedSize(); + upsertRows(conn, fullTableName); + assertTrue("Mutation state size should have increased", + state.getEstimatedSize() > prevEstimatedSize); + + + // after commit or rollback the size should be zero + conn.commit(); + assertEquals("Mutation state size should be zero after commit", 0, + state.getEstimatedSize()); + upsertRows(conn, fullTableName); + conn.rollback(); + assertEquals("Mutation state size should be zero after rollback", 0, + state.getEstimatedSize()); + + // upsert one row + PreparedStatement stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.execute(); + assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0); + + prevEstimatedSize = state.getEstimatedSize(); + // upserting the same row twice should not increase the size + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.execute(); + assertEquals( + "Mutation state size should only increase 4 bytes (size of the new statement index)", + prevEstimatedSize + 4, state.getEstimatedSize()); + + prevEstimatedSize = state.getEstimatedSize(); + // changing the value of one column of a row to a larger value should increase the estimated size + stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.setString(4, "random text string random text string random text string"); + stmt.execute(); + assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize()); + + prevEstimatedSize = state.getEstimatedSize(); + // changing the value of one column of a row to a smaller value should decrease the estimated size + stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.setString(4, ""); + stmt.execute(); + assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6db0392/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index 77cb19f..9109c12 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.Date; @@ -39,7 +38,6 @@ import java.util.Properties; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; @@ -510,46 +508,6 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { assertEquals(4L, connection.getMutationState().getBatchCount()); } - @Test - public void testMaxMutationSize() throws Exception { - Properties connectionProperties = new Properties(); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); - PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); - String fullTableName = generateUniqueName(); - try (Statement stmt = connection.createStatement()) { - stmt.execute("CREATE TABLE " + fullTableName + "(\n" + - " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + - " SCORE DOUBLE NOT NULL,\n" + - " ENTITY_ID CHAR(15) NOT NULL\n" + - " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" + - " ORGANIZATION_ID,\n" + - " SCORE DESC,\n" + - " ENTITY_ID DESC\n" + - " )\n" + - ") MULTI_TENANT=TRUE"); - } - try { - upsertRows(connection, fullTableName); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode()); - } - - // set the max mutation size (bytes) to a low value - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); - connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); - try { - upsertRows(connection, fullTableName); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode()); - } - } - private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName + " (organization_id, entity_id, score) values (?,?,?)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6db0392/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 8d9a5b6..f9ca300 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -207,7 +207,7 @@ public class DeleteCompiler { // row key will already have its value. // Check for otherTableRefs being empty required when deleting directly from the index if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) { - mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } for (int i = 0; i < otherTableRefs.size(); i++) { PTable otherTable = otherTableRefs.get(i).getTable(); @@ -221,7 +221,7 @@ public class DeleteCompiler { } else { indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); } - indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -647,7 +647,7 @@ public class DeleteCompiler { Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); while (iterator.hasNext()) { mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), - new RowMutationState(PRow.DELETE_MARKER, + new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6db0392/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index bc3e289..a51fd4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -119,6 +119,7 @@ public class UpsertCompiler { PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { + long columnValueSize = 0; Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -148,6 +149,7 @@ public class UpsertCompiler { } } else { columnValues.put(column, value); + columnValueSize += (column.getEstimatedSize() + value.length); } } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); @@ -166,7 +168,7 @@ public class UpsertCompiler { regionPrefix.length)); } } - mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); + mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6db0392/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 0faa20c..15e905a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -96,6 +96,7 @@ import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,9 +195,13 @@ public class MutationState implements SQLCloseable { this.mutations.put(table, mutations); } this.numRows = mutations.size(); - this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations); + this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations); throwIfTooBig(); } + + public long getEstimatedSize() { + return estimatedSize; + } public long getMaxSize() { return maxSize; @@ -436,9 +441,16 @@ public class MutationState implements SQLCloseable { this.sizeOffset += newMutationState.sizeOffset; int oldNumRows = this.numRows; joinMutationState(newMutationState.mutations, this.mutations); - // here we increment the estimated size by the fraction of new rows we added from the newMutationState if (newMutationState.numRows>0) { - this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize; + // if we added all the rows from newMutationState we can just increment the + // estimatedSize by newMutationState.estimatedSize + if (newMutationState.numRows == this.numRows-oldNumRows) { + this.estimatedSize += newMutationState.estimatedSize; + } + // we merged the two mutation states so we need to recalculate the size + else { + this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations); + } } if (!newMutationState.txMutations.isEmpty()) { if (txMutations.isEmpty()) { @@ -974,8 +986,6 @@ public class MutationState implements SQLCloseable { long mutationCommitTime = 0; long numFailedMutations = 0;; long startTime = 0; - long startNumRows = numRows; - long startEstimatedSize = estimatedSize; do { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); @@ -1021,13 +1031,13 @@ public class MutationState implements SQLCloseable { GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); numFailedMutations = 0; + // Remove batches as we process them + mutations.remove(origTableRef); if (tableInfo.isDataTable()) { numRows -= numMutations; - // decrement estimated size by the fraction of rows we sent to hbase - estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize; + // recalculate the estimated size + estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations); } - // Remove batches as we process them - mutations.remove(origTableRef); } catch (Exception e) { mutationCommitTime = System.currentTimeMillis() - startTime; serverTimestamp = ServerUtil.parseServerTimestamp(e); @@ -1426,8 +1436,9 @@ public class MutationState implements SQLCloseable { private int[] statementIndexes; @Nonnull private final RowTimestampColInfo rowTsColInfo; private byte[] onDupKeyBytes; + private long colValuesSize; - public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, + public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, byte[] onDupKeyBytes) { checkNotNull(columnValues); checkNotNull(rowTsColInfo); @@ -1435,6 +1446,12 @@ public class MutationState implements SQLCloseable { this.statementIndexes = new int[] {statementIndex}; this.rowTsColInfo = rowTsColInfo; this.onDupKeyBytes = onDupKeyBytes; + this.colValuesSize = colValuesSize; + } + + public long calculateEstimatedSize() { + return colValuesSize + statementIndexes.length * SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE + + (onDupKeyBytes != null ? onDupKeyBytes.length : 0); } byte[] getOnDupKeyBytes() { @@ -1453,7 +1470,16 @@ public class MutationState implements SQLCloseable { // If we already have a row and the new row has an ON DUPLICATE KEY clause // ignore the new values (as that's what the server will do). if (newRow.onDupKeyBytes == null) { - getColumnValues().putAll(newRow.getColumnValues()); + // increment the column value size by the new row column value size + colValuesSize+=newRow.colValuesSize; + for (Map.Entry<PColumn,byte[]> entry : newRow.columnValues.entrySet()) { + PColumn col = entry.getKey(); + byte[] oldValue = columnValues.put(col, entry.getValue()); + if (oldValue!=null) { + // decrement column value size by the size of all column values that were replaced + colValuesSize-=(col.getEstimatedSize() + oldValue.length); + } + } } // Concatenate ON DUPLICATE KEY bytes to allow multiple // increments of the same row in the same commit batch. @@ -1465,7 +1491,7 @@ public class MutationState implements SQLCloseable { RowTimestampColInfo getRowTimestampColInfo() { return rowTsColInfo; } - + } public ReadMetricQueue getReadMetricQueue() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6db0392/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index 2dfe1b9..318c9d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -30,14 +30,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; /** * @@ -187,47 +183,26 @@ public class KeyValueUtil { } /** - * Estimates the storage size of a row + * Estimates the size of rows stored in RowMutationState (in memory) * @param mutations map from table to row to RowMutationState * @return estimated row size */ public static long - getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) { + getEstimatedRowMutationSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableMutationMap) { long size = 0; - PTable table = tableRef.getTable(); - // iterate over rows - for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) { - int rowLength = rowEntry.getKey().getLength(); - Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); - switch (table.getImmutableStorageScheme()) { - case ONE_CELL_PER_COLUMN: - // iterate over columns - for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { - PColumn pColumn = colValueEntry.getKey(); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - pColumn.getFamilyName().getBytes().length, - pColumn.getColumnQualifierBytes().length, - colValueEntry.getValue().length); - } - break; - case SINGLE_CELL_ARRAY_WITH_OFFSETS: - // we store all the column values in a single key value that contains all the - // column values followed by an offset array - size += - PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, - colValueMap); - break; + // iterate over table + for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : tableMutationMap.entrySet()) { + // iterate over rows + for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue().entrySet()) { + size += calculateRowMutationSize(rowEntry); } - // count the empty key value - Pair<byte[], byte[]> emptyKeyValueInfo = - EncodedColumnsUtil.getEmptyKeyValueInfo(table); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), - emptyKeyValueInfo.getFirst().length, - emptyKeyValueInfo.getSecond().length); } return size; } + + private static long calculateRowMutationSize(Entry<ImmutableBytesPtr, RowMutationState> rowEntry) { + int rowLength = rowEntry.getKey().getLength(); + long colValuesLength = rowEntry.getValue().calculateEstimatedSize(); + return (rowLength + colValuesLength); + } }