Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 a7f9607b8 -> 774a31e3d
PHOENIX-3612 Make tracking of max allowed number of mutations bytes based instead of row based Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/774a31e3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/774a31e3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/774a31e3 Branch: refs/heads/4.x-HBase-0.98 Commit: 774a31e3da4e517c11f047495dcc2f71259c7848 Parents: a7f9607 Author: Thomas <[email protected]> Authored: Fri Jun 2 10:52:39 2017 -0700 Committer: Thomas <[email protected]> Committed: Fri Jun 2 11:37:25 2017 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/QueryMoreIT.java | 46 +++++++++++++- .../apache/phoenix/execute/PartialCommitIT.java | 4 +- .../apache/phoenix/compile/DeleteCompiler.java | 16 ++--- .../apache/phoenix/compile/PostDDLCompiler.java | 4 +- .../compile/PostLocalIndexDDLCompiler.java | 2 +- .../apache/phoenix/compile/UpsertCompiler.java | 14 +++-- .../phoenix/exception/SQLExceptionCode.java | 5 +- .../apache/phoenix/execute/MutationState.java | 59 +++++++++++------- .../apache/phoenix/jdbc/PhoenixConnection.java | 9 +-- .../apache/phoenix/jdbc/PhoenixStatement.java | 6 +- .../query/ConnectionlessQueryServicesImpl.java | 2 +- .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 1 + .../apache/phoenix/schema/MetaDataClient.java | 50 +++++++-------- .../schema/types/PArrayDataTypeEncoder.java | 65 ++++++++++++++++++++ .../org/apache/phoenix/util/KeyValueUtil.java | 58 +++++++++++++++++ .../apache/phoenix/jdbc/PhoenixDriverTest.java | 6 +- 17 files changed, 270 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index bfccb63..8397e4d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.Date; @@ -38,6 +39,7 @@ import java.util.Properties; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; @@ -479,7 +481,7 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { public void testMutationBatch() throws Exception { Properties connectionProperties = new Properties(); connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "10"); - connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024"); + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128"); PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); String fullTableName = generateUniqueName(); try (Statement stmt = connection.createStatement()) { @@ -500,13 +502,53 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { // set the batch size (rows) to 1 connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1"); - connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024"); + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128"); connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); upsertRows(connection, fullTableName); connection.commit(); // each row should be in its own batch assertEquals(4L, connection.getMutationState().getBatchCount()); } + + @Test + public void testMaxMutationSize() throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); + PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + String fullTableName = generateUniqueName(); + try (Statement stmt = connection.createStatement()) { + stmt.execute("CREATE TABLE " + fullTableName + "(\n" + + " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + + " SCORE DOUBLE NOT NULL,\n" + + " ENTITY_ID CHAR(15) NOT NULL\n" + + " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" + + " ORGANIZATION_ID,\n" + + " SCORE DESC,\n" + + " ENTITY_ID DESC\n" + + " )\n" + + ") MULTI_TENANT=TRUE"); + } + try { + upsertRows(connection, fullTableName); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode()); + } + + // set the max mutation size (bytes) to a low value + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); + connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + try { + upsertRows(connection, fullTableName); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode()); + } + } private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName + http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index a5555f3..cd0c371 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -270,8 +270,8 @@ public class PartialCommitIT extends BaseOwnClusterIT { // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(phxCon, null) { @Override - protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this, mutations, null, null); + protected MutationState newMutationState(int maxSize, int maxSizeBytes) { + return new MutationState(maxSize, maxSizeBytes, this, mutations, null, null); }; }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 71dc76a..de8b2ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -116,6 +116,7 @@ public class DeleteCompiler { final boolean isAutoCommit = connection.getAutoCommit(); ConnectionQueryServices services = connection.getQueryServices(); final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null; @@ -174,10 +175,10 @@ public class DeleteCompiler { rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection); + MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, maxSizeBytes, connection); connection.getMutationState().join(state); for (int i = 0; i < indexTableRefs.size(); i++) { - MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, connection); + MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); connection.getMutationState().join(indexState); } connection.getMutationState().send(); @@ -190,10 +191,10 @@ public class DeleteCompiler { // If auto commit is true, this last batch will be committed upon return int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0; - MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection); + MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection); for (int i = 0; i < indexTableRefs.size(); i++) { // To prevent the counting of these index rows, we have a negative for remainingRows. - MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, connection); + MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); state.join(indexState); } return state; @@ -496,6 +497,7 @@ public class DeleteCompiler { } final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); final StatementContext context = plan.getContext(); // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all. @@ -522,7 +524,7 @@ public class DeleteCompiler { while (iterator.hasNext()) { mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } - return new MutationState(tableRef, mutation, 0, maxSize, connection); + return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); } @Override @@ -627,7 +629,7 @@ public class DeleteCompiler { try { Tuple row = iterator.next(); final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); - return new MutationState(maxSize, connection) { + return new MutationState(maxSize, maxSizeBytes, connection) { @Override public long getUpdateCount() { return mutationCount; @@ -716,7 +718,7 @@ public class DeleteCompiler { } // Return total number of rows that have been delete. In the case of auto commit being off // the mutations will all be in the mutation state of the current connection. - MutationState state = new MutationState(maxSize, connection, totalRowCount); + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount); // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. state.setReadMetricQueue(plan.getContext().getReadMetricsQueue()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 0b3de6e..e5ed6a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -140,7 +140,7 @@ public class PostDDLCompiler { @Override public MutationState execute() throws SQLException { if (tableRefs.isEmpty()) { - return new MutationState(0, connection); + return new MutationState(0, 1000, connection); } boolean wasAutoCommit = connection.getAutoCommit(); try { @@ -319,7 +319,7 @@ public class PostDDLCompiler { } final long count = totalMutationCount; - return new MutationState(1, connection) { + return new MutationState(1, 1000, connection) { @Override public long getUpdateCount() { return count; http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java index 7e3c3b2..f34c5a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -108,7 +108,7 @@ public class PostLocalIndexDDLCompiler { // The contract is to return a MutationState that contains the number of rows modified. In this // case, it's the number of rows in the data table which corresponds to the number of index // rows that were added. - return new MutationState(0, connection, rowCount); + return new MutationState(0, 0, connection, rowCount); } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 69dab66..ca15e4f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -180,6 +180,9 @@ public class UpsertCompiler { ConnectionQueryServices services = connection.getQueryServices(); int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + int maxSizeBytes = + services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); boolean isAutoCommit = connection.getAutoCommit(); int numSplColumns = @@ -240,14 +243,14 @@ public class UpsertCompiler { rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection); + MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); connection.getMutationState().join(state); connection.getMutationState().send(); mutation.clear(); } } // If auto commit is true, this last batch will be committed upon return - return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); + return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, maxSizeBytes, connection); } } @@ -316,6 +319,7 @@ public class UpsertCompiler { final PhoenixConnection connection = statement.getConnection(); ConnectionQueryServices services = connection.getQueryServices(); final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); List<ColumnName> columnNodes = upsert.getColumns(); TableRef tableRefToBe = null; PTable table = null; @@ -751,7 +755,7 @@ public class UpsertCompiler { new MetaDataClient(connection).buildIndex(index, tableRef, scan.getTimeRange().getMax(), scan.getTimeRange().getMax() + 1); } - return new MutationState(maxSize, connection) { + return new MutationState(maxSize, maxSizeBytes, connection) { @Override public long getUpdateCount() { return mutationCount; @@ -844,7 +848,7 @@ public class UpsertCompiler { } // Return total number of rows that have been updated. In the case of auto commit being off // the mutations will all be in the mutation state of the current connection. - MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount); + MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount); /* * All the metrics collected for measuring the reads done by the parallel mutating iterators * is included in the ReadMetricHolder of the statement context. Include these metrics in the @@ -1085,7 +1089,7 @@ public class UpsertCompiler { viewConstants = IndexUtil.getViewConstants(parentTable); } setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0); - return new MutationState(tableRef, mutation, 0, maxSize, connection); + return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index ecbb285..841fd5d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -432,7 +432,10 @@ public enum SQLExceptionCode { 727, "43M11", " ASYNC option is not allowed.. "), NEW_CONNECTION_THROTTLED(728, "410M1", "Could not create connection " + "because this client already has the maximum number" + - " of connections to the target cluster."); + " of connections to the target cluster."), + + MAX_MUTATION_SIZE_EXCEEDED(729, "LIM01", "MutationState size is bigger than maximum allowed number of rows"), + MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes"); private final int errorCode; private final String sqlState; http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 9d2770d..87767cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -128,6 +128,7 @@ public class MutationState implements SQLCloseable { private final PhoenixConnection connection; private final long maxSize; + private final long maxSizeBytes; private final long batchSize; private final long batchSizeBytes; private long batchCount = 0L; @@ -146,35 +147,36 @@ public class MutationState implements SQLCloseable { private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; - public MutationState(long maxSize, PhoenixConnection connection) { - this(maxSize,connection, null, null); + public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { + this(maxSize, maxSizeBytes, connection, null, null); } - public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) { - this(maxSize,connection, null, txContext); + public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, TransactionContext txContext) { + this(maxSize, maxSizeBytes, connection, null, txContext); } public MutationState(MutationState mutationState) { - this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null); + this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, mutationState.getTransaction(), null); } - public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { - this(maxSize, connection, null, null, sizeOffset); + public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, long sizeOffset) { + this(maxSize, maxSizeBytes, connection, null, null, sizeOffset); } - private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) { - this(maxSize,connection, tx, txContext, 0); + private MutationState(long maxSize, long maxSizeBytes,PhoenixConnection connection, Transaction tx, TransactionContext txContext) { + this(maxSize, maxSizeBytes, connection, tx, txContext, 0); } - private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); + private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { + this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); this.sizeOffset = sizeOffset; } - MutationState(long maxSize, PhoenixConnection connection, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, - Transaction tx, TransactionContext txContext) { + MutationState(long maxSize, long maxSizeBytes, + PhoenixConnection connection, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, Transaction tx, TransactionContext txContext) { this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; this.connection = connection; this.batchSize = connection.getMutateBatchSize(); this.batchSizeBytes = connection.getMutateBatchSizeBytes(); @@ -201,8 +203,8 @@ public class MutationState implements SQLCloseable { } } - public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, null, null, sizeOffset); + public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { + this(maxSize, maxSizeBytes, connection, null, null, sizeOffset); this.mutations.put(table, mutations); this.numRows = mutations.size(); this.tx = connection.getMutationState().getTransaction(); @@ -213,6 +215,10 @@ public class MutationState implements SQLCloseable { return maxSize; } + public long getMaxSizeBytes() { + return maxSizeBytes; + } + /** * Commit a write fence when creating an index so that we can detect * when a data table transaction is started before the create index @@ -436,16 +442,23 @@ public class MutationState implements SQLCloseable { return false; } - public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null); + public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null); state.sizeOffset = 0; return state; } - private void throwIfTooBig() { + private void throwIfTooBig() throws SQLException { if (numRows > maxSize) { - // TODO: throw SQLException ? - throw new IllegalArgumentException("MutationState size of " + numRows + " is bigger than max allowed size of " + maxSize); + resetState(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build() + .buildException(); + } + long estimatedSize = KeyValueUtil.getEstimatedRowSize(mutations); + if (estimatedSize > maxSizeBytes) { + resetState(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED) + .build().buildException(); } } @@ -512,7 +525,7 @@ public class MutationState implements SQLCloseable { * * @param newMutationState the newer mutation state */ - public void join(MutationState newMutationState) { + public void join(MutationState newMutationState) throws SQLException { if (this == newMutationState) { // Doesn't make sense return; } @@ -1177,7 +1190,7 @@ public class MutationState implements SQLCloseable { List<Mutation> currentList = Lists.newArrayList(); long currentBatchSizeBytes = 0L; for (Mutation mutation : allMutationList) { - long mutationSizeBytes = mutation.heapSize(); + long mutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation); if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) { if (currentList.size() > 0) { mutationBatchList.add(currentList); http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 5af418d..09796aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -264,6 +264,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT); String numberPattern = this.services.getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT); int maxSize = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + int maxSizeBytes = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); Format dateFormat = DateUtil.getDateFormatter(datePattern); Format timeFormat = DateUtil.getDateFormatter(timePattern); Format timestampFormat = DateUtil.getDateFormatter(timestampPattern); @@ -294,7 +295,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } }; this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps()); - this.mutationState = mutationState == null ? newMutationState(maxSize) : new MutationState(mutationState); + this.mutationState = mutationState == null ? newMutationState(maxSize, maxSizeBytes) : new MutationState(mutationState); this.metaData = metaData; this.metaData.pruneTables(pruner); this.metaData.pruneFunctions(pruner); @@ -480,8 +481,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return metaData.getTableRef(key); } - protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this); + protected MutationState newMutationState(int maxSize, int maxSizeBytes) { + return new MutationState(maxSize, maxSizeBytes, this); } public MutationState getMutationState() { @@ -657,7 +658,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea .build().buildException(); } this.mutationState.rollback(); - this.mutationState = new MutationState(this.mutationState.getMaxSize(), this, txContext); + this.mutationState = new MutationState(this.mutationState.getMaxSize(), this.mutationState.getMaxSizeBytes(), this, txContext); // Write data to HBase after each statement execution as the commit may not // come through Phoenix APIs. http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 8b00113..eadb108 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -873,7 +873,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } catch(IOException e) { throw new SQLException(e); } - return new MutationState(0, context.getConnection()); + return new MutationState(0, 0, context.getConnection()); } }; @@ -977,7 +977,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } catch(IOException e) { throw new SQLException(e); } - return new MutationState(0, context.getConnection()); + return new MutationState(0, 0, context.getConnection()); } }; @@ -1258,7 +1258,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { PhoenixConnection phxConn = stmt.getConnection(); Properties props = new Properties(); phxConn.getQueryServices().upgradeSystemTables(phxConn.getURL(), props); - return MutationState.emptyMutationState(-1, phxConn); + return MutationState.emptyMutationState(-1, -1, phxConn); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 76b69fb..9d6712a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -352,7 +352,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple @Override public MutationState updateData(MutationPlan plan) throws SQLException { - return new MutationState(0, plan.getContext().getConnection()); + return new MutationState(0, 0, plan.getContext().getConnection()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 4b871d5..7c37930 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -85,6 +85,7 @@ public interface QueryServices extends SQLCloseable { public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin"; public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; + public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = "phoenix.mutate.maxSizeBytes"; public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index c01e454..b8e92a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -126,6 +126,7 @@ public class QueryServicesOptions { public static final String DEFAULT_DATE_FORMAT_TIMEZONE = DateUtil.DEFAULT_TIME_ZONE_ID; public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true; public static final int DEFAULT_MAX_MUTATION_SIZE = 500000; + public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 Mb public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also. http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 32597a1..d76f2c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1041,7 +1041,7 @@ public class MetaDataClient { table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, false, null, null, tableProps, commonFamilyProps); if (table == null || table.getType() == PTableType.VIEW /*|| table.isTransactional()*/) { - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } // Hack to get around the case when an SCN is specified on the connection. // In this case, we won't see the table we just created yet, so we hack @@ -1115,7 +1115,7 @@ public class MetaDataClient { } } final long count = rowCount; - return new MutationState(1, connection) { + return new MutationState(1, 1000, connection) { @Override public long getUpdateCount() { return count; @@ -1374,17 +1374,17 @@ public class MetaDataClient { public MutationState declareCursor(DeclareCursorStatement statement, QueryPlan queryPlan) throws SQLException { CursorUtil.declareCursor(statement, queryPlan); - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } public MutationState open(OpenStatement statement) throws SQLException { CursorUtil.openCursor(statement, connection); - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } public MutationState close(CloseStatement statement) throws SQLException { CursorUtil.closeCursor(statement); - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } /** @@ -1596,7 +1596,7 @@ public class MetaDataClient { } } if (table == null) { - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } if (logger.isInfoEnabled()) logger.info("Created index " + table.getName().getString() + " at " + table.getTimeStamp()); @@ -1605,7 +1605,7 @@ public class MetaDataClient { QueryServicesOptions.DEFAULT_INDEX_ASYNC_BUILD_ENABLED); // In async process, we return immediately as the MR job needs to be triggered . if(statement.isAsync() && asyncIndexBuildEnabled) { - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } // If our connection is at a fixed point-in-time, we need to open a new @@ -1627,11 +1627,11 @@ public class MetaDataClient { connection.getQueryServices().dropSequence(tenantId, schemaName, sequenceName, timestamp); } catch (SequenceNotFoundException e) { if (statement.ifExists()) { - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } throw e; } - return new MutationState(1, connection); + return new MutationState(1, 1000, connection); } public MutationState createSequence(CreateSequenceStatement statement, long startWith, @@ -1662,11 +1662,11 @@ public class MetaDataClient { startWith, incrementBy, cacheSize, minValue, maxValue, cycle, timestamp); } catch (SequenceAlreadyExistsException e) { if (ifNotExists) { - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } throw e; } - return new MutationState(1, connection); + return new MutationState(1, 1000, connection); } public MutationState createFunction(CreateFunctionStatement stmt) throws SQLException { @@ -1728,7 +1728,7 @@ public class MetaDataClient { } finally { connection.setAutoCommit(wasAutoCommit); } - return new MutationState(1, connection); + return new MutationState(1, 1000, connection); } private static ColumnDef findColumnDefOrNull(List<ColumnDef> colDefs, ColumnName colName) { @@ -2787,7 +2787,7 @@ public class MetaDataClient { PFunction function = connection.getMetaDataCache().getFunction(new PTableKey(tenantId, functionName)); if (function.isTemporaryFunction()) { connection.removeFunction(tenantId, functionName, clientTimeStamp); - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } } catch(FunctionNotFoundException e) { @@ -2807,7 +2807,7 @@ public class MetaDataClient { connection.removeFunction(tenantId, functionName, result.getMutationTime()); break; } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } finally { connection.setAutoCommit(wasAutoCommit); } @@ -2905,7 +2905,7 @@ public class MetaDataClient { } break; } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } finally { connection.setAutoCommit(wasAutoCommit); } @@ -3557,7 +3557,7 @@ public class MetaDataClient { if (!ifNotExists) { throw new ColumnAlreadyExistsException(schemaName, tableName, SchemaUtil.findExistingColumn(result.getTable(), columns)); } - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } // Only update client side cache if we aren't adding a PK column to a table with indexes or // transitioning a table from non transactional to transactional. @@ -3606,7 +3606,7 @@ public class MetaDataClient { MutationPlan plan = new PostDDLCompiler(connection).compile(Collections.singletonList(new TableRef(null, table, ts, false)), emptyCF, projectCF == null ? null : Collections.singletonList(projectCF), null, ts); return connection.getQueryServices().updateData(plan); } - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } catch (ConcurrentTableMutationException e) { if (retried) { throw e; @@ -3721,7 +3721,7 @@ public class MetaDataClient { columnRef = resolver.resolveColumn(null, column.getFamilyName(), column.getColumnName()); } catch (ColumnNotFoundException e) { if (statement.ifExists()) { - return new MutationState(0,connection); + return new MutationState(0, 0, connection); } throw e; } @@ -3825,7 +3825,7 @@ public class MetaDataClient { if (!statement.ifExists()) { throw new ColumnNotFoundException(schemaName, tableName, Bytes.toString(result.getFamilyName()), Bytes.toString(result.getColumnName())); } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } // If we've done any index metadata updates, don't bother trying to update // client-side cache as it would be too painful. Just let it pull it over from @@ -3918,7 +3918,7 @@ public class MetaDataClient { // Return the last MutationState return state; } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } catch (ConcurrentTableMutationException e) { if (retried) { throw e; @@ -4026,12 +4026,12 @@ public class MetaDataClient { TableRef dataTableRef = FromCompiler.getResolver(dataTableNode, connection).getTables().get(0); return buildIndex(index, dataTableRef); } - return new MutationState(1, connection); + return new MutationState(1, 1000, connection); } catch (TableNotFoundException e) { if (!statement.ifExists()) { throw e; } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } finally { connection.setAutoCommit(wasAutoCommit); } @@ -4117,7 +4117,7 @@ public class MetaDataClient { } finally { connection.setAutoCommit(wasAutoCommit); } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } private void validateSchema(String schemaName) throws SQLException { @@ -4156,7 +4156,7 @@ public class MetaDataClient { connection.removeSchema(schema, result.getMutationTime()); break; } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } finally { connection.setAutoCommit(wasAutoCommit); } @@ -4171,6 +4171,6 @@ public class MetaDataClient { .resolveSchema(useSchemaStatement.getSchemaName()); connection.setSchema(useSchemaStatement.getSchemaName()); } - return new MutationState(0, connection); + return new MutationState(0, 0, connection); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java index bb293bb..b5a7e04 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java @@ -20,11 +20,19 @@ package org.apache.phoenix.schema.types; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnValueEncoder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; @@ -167,4 +175,61 @@ public class PArrayDataTypeEncoder implements ColumnValueEncoder { return null; } + /** + * @param colValueMap map from column to value + * @return estimated encoded size + */ + public static int getEstimatedByteSize(PTable table, int rowLength, + Map<PColumn, byte[]> colValueMap) { + // iterate over column familiies + int rowSize = 0; + for (PColumnFamily family : table.getColumnFamilies()) { + Collection<PColumn> columns = family.getColumns(); + // we add a non null value to the start so that we can represent absent values in the array with negative offsets + int numColumns = columns.size() + 1; + int cellSize = 1; + int nulls = 0; + int maxOffset = 0; + // iterate over columns + for (PColumn column : columns) { + if (colValueMap.containsKey(column)) { + byte[] colValue = colValueMap.get(column); + // the column value is null + if (colValue == null || colValue.length == 0) { + ++nulls; + maxOffset = cellSize; + } else { + // count the bytes written to serialize nulls + if (nulls > 0) { + cellSize += (1 + Math.ceil(nulls / 255)); + nulls = 0; + } + maxOffset = cellSize; + cellSize += colValue.length; + } + } + // the column value is absent + else { + ++nulls; + maxOffset = cellSize; + } + } + // count the bytes used for the offset array + cellSize += + PArrayDataType.useShortForOffsetArray(maxOffset, + PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION) + ? numColumns * Bytes.SIZEOF_SHORT + : numColumns * Bytes.SIZEOF_INT; + cellSize += 4; + // count the bytes used for header information + cellSize += 5; + // add the size of the single cell containing all column values + rowSize += + KeyValue.getKeyValueDataStructureSize(rowLength, + family.getName().getBytes().length, + QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES.length, cellSize); + } + return rowSize; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index d16521b..4234df5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.hbase.Cell; @@ -29,7 +30,14 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; /** * @@ -177,4 +185,54 @@ public class KeyValueUtil { } return size; } + + /** + * Estimates the storage size of a row + * @param mutations map from table to row to RowMutationState + * @return estimated row size + */ + public static long + getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { + long size = 0; + // iterate over tables + for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations + .entrySet()) { + PTable table = tableEntry.getKey().getTable(); + // iterate over rows + for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue() + .entrySet()) { + int rowLength = rowEntry.getKey().getLength(); + Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); + switch (table.getImmutableStorageScheme()) { + case ONE_CELL_PER_COLUMN: + // iterate over columns + for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { + PColumn pColumn = colValueEntry.getKey(); + size += + KeyValue.getKeyValueDataStructureSize(rowLength, + pColumn.getFamilyName().getBytes().length, + pColumn.getColumnQualifierBytes().length, + colValueEntry.getValue().length); + } + break; + case SINGLE_CELL_ARRAY_WITH_OFFSETS: + // we store all the column values in a single key value that contains all the + // column values followed by an offset array + size += + PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, + colValueMap); + break; + } + // count the empty key value + Pair<byte[], byte[]> emptyKeyValueInfo = + EncodedColumnsUtil.getEmptyKeyValueInfo(table); + size += + KeyValue.getKeyValueDataStructureSize(rowLength, + SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), + emptyKeyValueInfo.getFirst().length, + emptyKeyValueInfo.getSecond().length); + } + } + return size; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java index d8f9df6..e7afb30 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java @@ -60,7 +60,7 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest { } @Test - public void testMaxMutationSizeSetCorrectly() throws Exception { + public void testMaxMutationSizeSetCorrectly() throws SQLException { Properties connectionProperties = new Properties(); connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB,"100"); connectionProperties.setProperty(QueryServices.IMMUTABLE_ROWS_ATTRIB,"100"); @@ -75,7 +75,9 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest { stmt.execute(); } fail("Upsert should have failed since the number of upserts (200) is greater than the MAX_MUTATION_SIZE_ATTRIB (100)"); - } catch (IllegalArgumentException expected) {} + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode()); + } } @Test
