Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 6cadbab92 -> aeb33b9fb
PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations(Thomas D'Silva) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aeb33b9f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aeb33b9f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aeb33b9f Branch: refs/heads/5.x-HBase-2.0 Commit: aeb33b9fbae9d19da199fed8e54d60939bdd57d8 Parents: 6cadbab Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Tue Feb 20 16:56:43 2018 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Tue Feb 20 16:56:43 2018 +0530 ---------------------------------------------------------------------- .../apache/phoenix/end2end/MutationStateIT.java | 144 +++++++++++++++++ .../org/apache/phoenix/end2end/QueryMoreIT.java | 42 ----- .../apache/phoenix/execute/PartialCommitIT.java | 3 +- .../apache/phoenix/compile/DeleteCompiler.java | 18 +-- .../apache/phoenix/compile/UpsertCompiler.java | 11 +- .../apache/phoenix/execute/MutationState.java | 159 +++++++++++++------ .../java/org/apache/phoenix/util/IndexUtil.java | 4 +- .../phoenix/util/PhoenixKeyValueUtil.java | 48 ++---- 8 files changed, 289 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java new file mode 100644 index 0000000..2d5f360 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -0,0 +1,144 @@ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.junit.Test; + +public class MutationStateIT extends ParallelStatsDisabledIT { + + private static final String DDL = + " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, " + + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK " + + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE"; + + private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { + PreparedStatement stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + for (int i = 0; i < 10000; i++) { + stmt.setString(1, "AAAA" + i); + stmt.setString(2, "BBBB" + i); + stmt.setInt(3, 1); + stmt.execute(); + } + } + + @Test + public void testMaxMutationSize() throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); + PhoenixConnection connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + String fullTableName = generateUniqueName(); + try (Statement stmt = connection.createStatement()) { + stmt.execute( + "CREATE TABLE " + fullTableName + DDL); + } + try { + upsertRows(connection, fullTableName); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), + e.getErrorCode()); + } + + // set the max mutation size (bytes) to a low value + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); + connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + try { + upsertRows(connection, fullTableName); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), + e.getErrorCode()); + } + } + + @Test + public void testMutationEstimatedSize() throws Exception { + PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); + conn.setAutoCommit(false); + String fullTableName = generateUniqueName(); + try (Statement stmt = conn.createStatement()) { + stmt.execute( + "CREATE TABLE " + fullTableName + DDL); + } + + // upserting rows should increase the mutation state size + MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); + long prevEstimatedSize = state.getEstimatedSize(); + upsertRows(conn, fullTableName); + assertTrue("Mutation state size should have increased", + state.getEstimatedSize() > prevEstimatedSize); + + + // after commit or rollback the size should be zero + conn.commit(); + assertEquals("Mutation state size should be zero after commit", 0, + state.getEstimatedSize()); + upsertRows(conn, fullTableName); + conn.rollback(); + assertEquals("Mutation state size should be zero after rollback", 0, + state.getEstimatedSize()); + + // upsert one row + PreparedStatement stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.execute(); + assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0); + + prevEstimatedSize = state.getEstimatedSize(); + // upserting the same row twice should not increase the size + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.execute(); + assertEquals( + "Mutation state size should only increase 4 bytes (size of the new statement index)", + prevEstimatedSize + 4, state.getEstimatedSize()); + + prevEstimatedSize = state.getEstimatedSize(); + // changing the value of one column of a row to a larger value should increase the estimated size + stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.setString(4, "random text string random text string random text string"); + stmt.execute(); + assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize()); + + prevEstimatedSize = state.getEstimatedSize(); + // changing the value of one column of a row to a smaller value should decrease the estimated size + stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.setString(4, ""); + stmt.execute(); + assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index 77cb19f..9109c12 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.Date; @@ -39,7 +38,6 @@ import java.util.Properties; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; @@ -510,46 +508,6 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { assertEquals(4L, connection.getMutationState().getBatchCount()); } - @Test - public void testMaxMutationSize() throws Exception { - Properties connectionProperties = new Properties(); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); - PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); - String fullTableName = generateUniqueName(); - try (Statement stmt = connection.createStatement()) { - stmt.execute("CREATE TABLE " + fullTableName + "(\n" + - " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + - " SCORE DOUBLE NOT NULL,\n" + - " ENTITY_ID CHAR(15) NOT NULL\n" + - " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" + - " ORGANIZATION_ID,\n" + - " SCORE DESC,\n" + - " ENTITY_ID DESC\n" + - " )\n" + - ") MULTI_TENANT=TRUE"); - } - try { - upsertRows(connection, fullTableName); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode()); - } - - // set the max mutation size (bytes) to a low value - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); - connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); - try { - upsertRows(connection, fullTableName); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode()); - } - } - private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName + " (organization_id, entity_id, score) values (?,?,?)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index 2ceac55..58dcceb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.phoenix.end2end.BaseOwnClusterIT; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -284,7 +285,7 @@ public class PartialCommitIT extends BaseOwnClusterIT { private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { Connection con = driver.connect(url, new Properties()); PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); - final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); + final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator()); // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(phxCon, null) { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 53fc398..a635c69 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -44,6 +44,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.ValueGetter; @@ -92,7 +93,6 @@ import org.apache.phoenix.util.ScanUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.sun.istack.NotNull; public class DeleteCompiler { @@ -122,14 +122,14 @@ public class DeleteCompiler { final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); - List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null; + MultiRowMutationState mutations = new MultiRowMutationState(batchSize); + List<MultiRowMutationState> indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. if (!otherTableRefs.isEmpty()) { indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); for (int i = 0; i < otherTableRefs.size(); i++) { - indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize)); + indexMutations.add(new MultiRowMutationState(batchSize)); } } List<PColumn> pkColumns = table.getPKColumns(); @@ -208,7 +208,7 @@ public class DeleteCompiler { // row key will already have its value. // Check for otherTableRefs being empty required when deleting directly from the index if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) { - mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } for (int i = 0; i < otherTableRefs.size(); i++) { PTable otherTable = otherTableRefs.get(i).getTable(); @@ -222,7 +222,7 @@ public class DeleteCompiler { } else { indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); } - indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -239,7 +239,7 @@ public class DeleteCompiler { connection.getMutationState().send(); mutations.clear(); if (indexMutations != null) { - for (Map<ImmutableBytesPtr, RowMutationState> multiRowMutationState : indexMutations) { + for (MultiRowMutationState multiRowMutationState : indexMutations) { multiRowMutationState.clear(); } } @@ -651,10 +651,10 @@ public class DeleteCompiler { // keys for our ranges ScanRanges ranges = context.getScanRanges(); Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); - Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); + MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount()); while (iterator.hasNext()) { mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), - new RowMutationState(PRow.DELETE_MARKER, + new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7e83ad5..d827cbe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -47,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.execute.MutationState.RowTimestampColInfo; import org.apache.phoenix.expression.Determinism; @@ -116,9 +117,10 @@ import com.google.common.collect.Sets; public class UpsertCompiler { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, - PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, + PTable table, MultiRowMutationState mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { + long columnValueSize = 0; Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -148,6 +150,7 @@ public class UpsertCompiler { } } else { columnValues.put(column, value); + columnValueSize += (column.getEstimatedSize() + value.length); } } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); @@ -166,7 +169,7 @@ public class UpsertCompiler { regionPrefix.length)); } } - mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); + mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, @@ -195,7 +198,7 @@ public class UpsertCompiler { } } int rowCount = 0; - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); + MultiRowMutationState mutation = new MultiRowMutationState(batchSize); PTable table = tableRef.getTable(); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; @@ -1180,7 +1183,7 @@ public class UpsertCompiler { throw new IllegalStateException(); } } - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1); + MultiRowMutationState mutation = new MultiRowMutationState(1); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; if (table.getIndexType() == IndexType.LOCAL) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index e9547f2..510e609 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -97,6 +97,7 @@ import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,7 +124,7 @@ public class MutationState implements SQLCloseable { private final long batchSize; private final long batchSizeBytes; private long batchCount = 0L; - private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; + private final Map<TableRef, MultiRowMutationState> mutations; private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); private long sizeOffset; @@ -131,7 +132,7 @@ public class MutationState implements SQLCloseable { private long estimatedSize = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; - private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); + private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); final PhoenixTransactionContext phoenixTransactionContext; @@ -159,12 +160,12 @@ public class MutationState implements SQLCloseable { } private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext); + this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext); this.sizeOffset = sizeOffset; } MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, + Map<TableRef, MultiRowMutationState> mutations, boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; @@ -189,15 +190,19 @@ public class MutationState implements SQLCloseable { } } - public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { + public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); if (!mutations.isEmpty()) { this.mutations.put(table, mutations); } this.numRows = mutations.size(); - this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowSize(table, mutations); + this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(this.mutations); throwIfTooBig(); } + + public long getEstimatedSize() { + return estimatedSize; + } public long getMaxSize() { return maxSize; @@ -346,7 +351,7 @@ public class MutationState implements SQLCloseable { } public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null); + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null); state.sizeOffset = 0; return state; } @@ -368,12 +373,12 @@ public class MutationState implements SQLCloseable { return sizeOffset + numRows; } - private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { + private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, + Map<TableRef, MultiRowMutationState> dstMutations) { PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; boolean incrementRowCount = dstMutations == this.mutations; - Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows); + MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) { @@ -385,8 +390,12 @@ public class MutationState implements SQLCloseable { Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. if (newRow != PRow.DELETE_MARKER) { + // decrement estimated size by the size of the old row + estimatedSize-=existingRowMutationState.calculateEstimatedSize(); // Merge existing column values with new column values existingRowMutationState.join(rowEntry.getValue()); + // increment estimated size by the size of the new row + estimatedSize+=existingRowMutationState.calculateEstimatedSize(); // Now that the existing row has been merged with the new row, replace it back // again (since it was merged with the new one above). existingRows.put(rowEntry.getKey(), existingRowMutationState); @@ -395,6 +404,8 @@ public class MutationState implements SQLCloseable { } else { if (incrementRowCount && !isIndex) { // Don't count index rows in row count numRows++; + // increment estimated size by the size of the new row + estimatedSize += rowEntry.getValue().calculateEstimatedSize(); } } } @@ -402,22 +413,25 @@ public class MutationState implements SQLCloseable { dstMutations.put(tableRef, existingRows); } else { // Size new map at batch size as that's what it'll likely grow to. - Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); + MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize()); newRows.putAll(srcRows); dstMutations.put(tableRef, newRows); if (incrementRowCount && !isIndex) { numRows += srcRows.size(); + // if we added all the rows from newMutationState we can just increment the + // estimatedSize by newMutationState.estimatedSize + estimatedSize += srcRows.estimatedSize; } } } - private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { + private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations, + Map<TableRef, MultiRowMutationState> dstMutations) { // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) { + for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); - Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue(); + MultiRowMutationState srcRows = entry.getValue(); joinMutationState(tableRef, srcRows, dstMutations); } } @@ -435,12 +449,7 @@ public class MutationState implements SQLCloseable { phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; - int oldNumRows = this.numRows; joinMutationState(newMutationState.mutations, this.mutations); - // here we increment the estimated size by the fraction of new rows we added from the newMutationState - if (newMutationState.numRows>0) { - this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize; - } if (!newMutationState.txMutations.isEmpty()) { if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); @@ -478,7 +487,7 @@ public class MutationState implements SQLCloseable { return ptr; } - private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, + private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism @@ -513,10 +522,10 @@ public class MutationState implements SQLCloseable { // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { TableRef key = new TableRef(index); - Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); - if (rowToColumnMap!=null) { + MultiRowMutationState multiRowMutationState = mutations.remove(key); + if (multiRowMutationState!=null) { final List<Mutation> deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null); + generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); indexMutations.addAll(deleteMutations); } } @@ -535,14 +544,14 @@ public class MutationState implements SQLCloseable { } private void generateMutations(final TableRef tableRef, final long mutationTimestamp, - final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values, + final long serverTimestamp, final MultiRowMutationState values, final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) { final PTable table = tableRef.getTable(); boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator = values.entrySet().iterator(); long timestampToUse = mutationTimestamp; - Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap(); + MultiRowMutationState modifiedValues = new MultiRowMutationState(16); while (iterator.hasNext()) { Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes(); @@ -617,7 +626,7 @@ public class MutationState implements SQLCloseable { } public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { - final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); + final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Collections.emptyIterator(); } @@ -625,7 +634,7 @@ public class MutationState implements SQLCloseable { final long serverTimestamp = getTableTimestamp(tableTimestamp, scn); final long mutationTimestamp = getMutationTimestamp(scn); return new Iterator<Pair<byte[],List<Mutation>>>() { - private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); + private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { @@ -689,14 +698,14 @@ public class MutationState implements SQLCloseable { private long[] validateAll() throws SQLException { int i = 0; long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { + for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) { TableRef tableRef = entry.getKey(); timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue()); } return timeStamps; } - private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { + private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); @@ -907,7 +916,7 @@ public class MutationState implements SQLCloseable { sendAll = true; } - Map<ImmutableBytesPtr, RowMutationState> valuesMap; + MultiRowMutationState multiRowMutationState; Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap(); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { @@ -916,16 +925,16 @@ public class MutationState implements SQLCloseable { while (tableRefIterator.hasNext()) { // at this point we are going through mutations for each table final TableRef tableRef = tableRefIterator.next(); - valuesMap = mutations.get(tableRef); - if (valuesMap == null || valuesMap.isEmpty()) { + multiRowMutationState = mutations.get(tableRef); + if (multiRowMutationState == null || multiRowMutationState.isEmpty()) { continue; } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++]; + long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++]; Long scn = connection.getSCN(); long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; final PTable table = tableRef.getTable(); - Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll); + Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll); // build map from physical table to mutation list boolean isDataTable = true; while (mutationsIterator.hasNext()) { @@ -943,7 +952,7 @@ public class MutationState implements SQLCloseable { // involved in the transaction since none of them would have been // committed in the event of a failure. if (table.isTransactional()) { - addUncommittedStatementIndexes(valuesMap.values()); + addUncommittedStatementIndexes(multiRowMutationState.values()); if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); } @@ -952,7 +961,7 @@ public class MutationState implements SQLCloseable { // in the event that we need to replay the commit. // Copy TableRef so we have the original PTable and know when the // indexes have changed. - joinMutationState(new TableRef(tableRef), valuesMap, txMutations); + joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations); } } long serverTimestamp = HConstants.LATEST_TIMESTAMP; @@ -974,8 +983,6 @@ public class MutationState implements SQLCloseable { long mutationCommitTime = 0; long numFailedMutations = 0;; long startTime = 0; - long startNumRows = numRows; - long startEstimatedSize = estimatedSize; do { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); @@ -1022,13 +1029,13 @@ public class MutationState implements SQLCloseable { GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); numFailedMutations = 0; + // Remove batches as we process them + mutations.remove(origTableRef); if (tableInfo.isDataTable()) { numRows -= numMutations; - // decrement estimated size by the fraction of rows we sent to hbase - estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize; + // recalculate the estimated size + estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations); } - // Remove batches as we process them - mutations.remove(origTableRef); } catch (Exception e) { mutationCommitTime = System.currentTimeMillis() - startTime; serverTimestamp = ServerUtil.parseServerTimestamp(e); @@ -1179,7 +1186,7 @@ public class MutationState implements SQLCloseable { } private int[] getUncommittedStatementIndexes() { - for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) { + for (MultiRowMutationState rowMutationMap : mutations.values()) { addUncommittedStatementIndexes(rowMutationMap.values()); } return uncommittedStatementIndexes; @@ -1212,7 +1219,7 @@ public class MutationState implements SQLCloseable { } public void commit() throws SQLException { - Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); + Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); int retryCount = 0; do { boolean sendSuccessful=false; @@ -1422,13 +1429,54 @@ public class MutationState implements SQLCloseable { } } + public static class MultiRowMutationState { + private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState; + private long estimatedSize; + + public MultiRowMutationState(int size) { + this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size); + this.estimatedSize = 0; + } + + public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { + estimatedSize += rowMutationState.calculateEstimatedSize(); + return rowKeyToRowMutationState.put(ptr, rowMutationState); + } + + public void putAll(MultiRowMutationState other) { + estimatedSize += other.estimatedSize; + rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState); + } + + public boolean isEmpty() { + return rowKeyToRowMutationState.isEmpty(); + } + + public int size() { + return rowKeyToRowMutationState.size(); + } + + public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() { + return rowKeyToRowMutationState.entrySet(); + } + + public void clear(){ + rowKeyToRowMutationState.clear(); + } + + public Collection<RowMutationState> values() { + return rowKeyToRowMutationState.values(); + } + } + public static class RowMutationState { @Nonnull private Map<PColumn,byte[]> columnValues; private int[] statementIndexes; @Nonnull private final RowTimestampColInfo rowTsColInfo; private byte[] onDupKeyBytes; + private long colValuesSize; - public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, + public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, byte[] onDupKeyBytes) { checkNotNull(columnValues); checkNotNull(rowTsColInfo); @@ -1436,6 +1484,12 @@ public class MutationState implements SQLCloseable { this.statementIndexes = new int[] {statementIndex}; this.rowTsColInfo = rowTsColInfo; this.onDupKeyBytes = onDupKeyBytes; + this.colValuesSize = colValuesSize; + } + + public long calculateEstimatedSize() { + return colValuesSize + statementIndexes.length * SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE + + (onDupKeyBytes != null ? onDupKeyBytes.length : 0); } byte[] getOnDupKeyBytes() { @@ -1454,7 +1508,16 @@ public class MutationState implements SQLCloseable { // If we already have a row and the new row has an ON DUPLICATE KEY clause // ignore the new values (as that's what the server will do). if (newRow.onDupKeyBytes == null) { - getColumnValues().putAll(newRow.getColumnValues()); + // increment the column value size by the new row column value size + colValuesSize+=newRow.colValuesSize; + for (Map.Entry<PColumn,byte[]> entry : newRow.columnValues.entrySet()) { + PColumn col = entry.getKey(); + byte[] oldValue = columnValues.put(col, entry.getValue()); + if (oldValue!=null) { + // decrement column value size by the size of all column values that were replaced + colValuesSize-=(col.getEstimatedSize() + oldValue.length); + } + } } // Concatenate ON DUPLICATE KEY bytes to allow multiple // increments of the same row in the same commit batch. @@ -1466,7 +1529,7 @@ public class MutationState implements SQLCloseable { RowTimestampColInfo getRowTimestampColInfo() { return rowTsColInfo; } - + } public ReadMetricQueue getReadMetricQueue() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 5e97ce6..c6cbe3e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -73,7 +73,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -296,7 +296,7 @@ public class IndexUtil { } public static List<Mutation> generateIndexData(final PTable table, PTable index, - final Map<ImmutableBytesPtr, RowMutationState> valuesMap, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) + final MultiRowMutationState multiRowMutationState, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) throws SQLException { try { final ImmutableBytesPtr ptr = new ImmutableBytesPtr(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aeb33b9f/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index 84525fd..ce5cb55 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -188,42 +189,15 @@ public class PhoenixKeyValueUtil { * @param mutations map from table to row to RowMutationState * @return estimated row size */ - public static long - getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) { + public static long getEstimatedRowMutationSize( + Map<TableRef, MultiRowMutationState> tableMutationMap) { long size = 0; - PTable table = tableRef.getTable(); - // iterate over rows - for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) { - int rowLength = rowEntry.getKey().getLength(); - Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); - switch (table.getImmutableStorageScheme()) { - case ONE_CELL_PER_COLUMN: - // iterate over columns - for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { - PColumn pColumn = colValueEntry.getKey(); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - pColumn.getFamilyName().getBytes().length, - pColumn.getColumnQualifierBytes().length, - colValueEntry.getValue().length); - } - break; - case SINGLE_CELL_ARRAY_WITH_OFFSETS: - // we store all the column values in a single key value that contains all the - // column values followed by an offset array - size += - PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, - colValueMap); - break; + // iterate over table + for (Entry<TableRef, MultiRowMutationState> tableEntry : tableMutationMap.entrySet()) { + // iterate over rows + for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue().entrySet()) { + size += calculateRowMutationSize(rowEntry); } - // count the empty key value - Pair<byte[], byte[]> emptyKeyValueInfo = - EncodedColumnsUtil.getEmptyKeyValueInfo(table); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), - emptyKeyValueInfo.getFirst().length, - emptyKeyValueInfo.getSecond().length); } return size; } @@ -237,4 +211,10 @@ public class PhoenixKeyValueUtil { } return KeyValueUtil.copyToNewKeyValue(c); } + + private static long calculateRowMutationSize(Entry<ImmutableBytesPtr, RowMutationState> rowEntry) { + int rowLength = rowEntry.getKey().getLength(); + long colValuesLength = rowEntry.getValue().calculateEstimatedSize(); + return (rowLength + colValuesLength); + } }
