Revert "PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations (addendum)"
This reverts commit 4e0c0a33ed8b401f7785dde8979041dd5ab9a1f4. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/355ee522 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/355ee522 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/355ee522 Branch: refs/heads/system-catalog Commit: 355ee522c1d4ff07cf9fbb0a9a01e43e3f702730 Parents: d46d4e5 Author: Thomas D'Silva <tdsi...@apache.org> Authored: Tue Nov 28 18:37:55 2017 -0800 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Tue Nov 28 18:37:55 2017 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/PartialCommitIT.java | 5 +- .../apache/phoenix/compile/DeleteCompiler.java | 11 +- .../apache/phoenix/compile/UpsertCompiler.java | 7 +- .../apache/phoenix/execute/MutationState.java | 127 +++++++------------ .../java/org/apache/phoenix/util/IndexUtil.java | 4 +- .../org/apache/phoenix/util/KeyValueUtil.java | 5 +- 6 files changed, 61 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index e5b57e3..10fd7f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -33,6 +33,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -51,8 +52,8 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseOwnClusterIT; -import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.monitoring.GlobalMetric; import org.apache.phoenix.monitoring.MetricType; @@ -284,7 +285,7 @@ public class PartialCommitIT extends BaseOwnClusterIT { private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { Connection con = driver.connect(url, new Properties()); PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); - final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator()); + final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(phxCon, null) { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index a06e2ca..f9ca300 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.Cell; @@ -42,7 +43,6 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.ValueGetter; @@ -91,6 +91,7 @@ import org.apache.phoenix.util.ScanUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.sun.istack.NotNull; public class DeleteCompiler { @@ -120,14 +121,14 @@ public class DeleteCompiler { final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - MultiRowMutationState mutations = new MultiRowMutationState(batchSize); - List<MultiRowMutationState> indexMutations = null; + Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); + List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. if (!otherTableRefs.isEmpty()) { indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); for (int i = 0; i < otherTableRefs.size(); i++) { - indexMutations.add(new MultiRowMutationState(batchSize)); + indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize)); } } List<PColumn> pkColumns = table.getPKColumns(); @@ -643,7 +644,7 @@ public class DeleteCompiler { // keys for our ranges ScanRanges ranges = context.getScanRanges(); Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); - MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount()); + Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); while (iterator.hasNext()) { mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, 0, http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index a81a427..a51fd4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -47,7 +47,6 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.execute.MutationState.RowTimestampColInfo; import org.apache.phoenix.expression.Determinism; @@ -117,7 +116,7 @@ import com.google.common.collect.Sets; public class UpsertCompiler { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, - PTable table, MultiRowMutationState mutation, + PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { long columnValueSize = 0; @@ -198,7 +197,7 @@ public class UpsertCompiler { } } int rowCount = 0; - MultiRowMutationState mutation = new MultiRowMutationState(batchSize); + Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); PTable table = tableRef.getTable(); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; @@ -1178,7 +1177,7 @@ public class UpsertCompiler { throw new IllegalStateException(); } } - MultiRowMutationState mutation = new MultiRowMutationState(1); + Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; if (table.getIndexType() == IndexType.LOCAL) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 7462baa..b5a55b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -123,7 +123,7 @@ public class MutationState implements SQLCloseable { private final long batchSize; private final long batchSizeBytes; private long batchCount = 0L; - private final Map<TableRef, MultiRowMutationState> mutations; + private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); private long sizeOffset; @@ -131,7 +131,7 @@ public class MutationState implements SQLCloseable { private long estimatedSize = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; - private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); + private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); final PhoenixTransactionContext phoenixTransactionContext; @@ -159,12 +159,12 @@ public class MutationState implements SQLCloseable { } private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext); + this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext); this.sizeOffset = sizeOffset; } MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, - Map<TableRef, MultiRowMutationState> mutations, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; @@ -189,7 +189,7 @@ public class MutationState implements SQLCloseable { } } - public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { + public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); if (!mutations.isEmpty()) { this.mutations.put(table, mutations); @@ -350,7 +350,7 @@ public class MutationState implements SQLCloseable { } public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null); + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null); state.sizeOffset = 0; return state; } @@ -372,12 +372,12 @@ public class MutationState implements SQLCloseable { return sizeOffset + numRows; } - private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, - Map<TableRef, MultiRowMutationState> dstMutations) { + private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; boolean incrementRowCount = dstMutations == this.mutations; - MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows); + Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) { @@ -389,12 +389,8 @@ public class MutationState implements SQLCloseable { Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. if (newRow != PRow.DELETE_MARKER) { - // decrement estimated size by the size of the old row - estimatedSize-=existingRowMutationState.calculateEstimatedSize(); // Merge existing column values with new column values existingRowMutationState.join(rowEntry.getValue()); - // increment estimated size by the size of the new row - estimatedSize+=existingRowMutationState.calculateEstimatedSize(); // Now that the existing row has been merged with the new row, replace it back // again (since it was merged with the new one above). existingRows.put(rowEntry.getKey(), existingRowMutationState); @@ -403,8 +399,6 @@ public class MutationState implements SQLCloseable { } else { if (incrementRowCount && !isIndex) { // Don't count index rows in row count numRows++; - // increment estimated size by the size of the new row - estimatedSize += rowEntry.getValue().calculateEstimatedSize(); } } } @@ -412,25 +406,22 @@ public class MutationState implements SQLCloseable { dstMutations.put(tableRef, existingRows); } else { // Size new map at batch size as that's what it'll likely grow to. - MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize()); + Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); newRows.putAll(srcRows); dstMutations.put(tableRef, newRows); if (incrementRowCount && !isIndex) { numRows += srcRows.size(); - // if we added all the rows from newMutationState we can just increment the - // estimatedSize by newMutationState.estimatedSize - estimatedSize += srcRows.estimatedSize; } } } - private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations, - Map<TableRef, MultiRowMutationState> dstMutations) { + private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); - MultiRowMutationState srcRows = entry.getValue(); + Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue(); joinMutationState(tableRef, srcRows, dstMutations); } } @@ -448,7 +439,19 @@ public class MutationState implements SQLCloseable { phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; + int oldNumRows = this.numRows; joinMutationState(newMutationState.mutations, this.mutations); + if (newMutationState.numRows>0) { + // if we added all the rows from newMutationState we can just increment the + // estimatedSize by newMutationState.estimatedSize + if (newMutationState.numRows == this.numRows-oldNumRows) { + this.estimatedSize += newMutationState.estimatedSize; + } + // we merged the two mutation states so we need to recalculate the size + else { + this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations); + } + } if (!newMutationState.txMutations.isEmpty()) { if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); @@ -486,7 +489,7 @@ public class MutationState implements SQLCloseable { return ptr; } - private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, + private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism @@ -521,10 +524,10 @@ public class MutationState implements SQLCloseable { // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { TableRef key = new TableRef(index); - MultiRowMutationState multiRowMutationState = mutations.remove(key); - if (multiRowMutationState!=null) { + Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); + if (rowToColumnMap!=null) { final List<Mutation> deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); + generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null); indexMutations.addAll(deleteMutations); } } @@ -543,14 +546,14 @@ public class MutationState implements SQLCloseable { } private void generateMutations(final TableRef tableRef, final long mutationTimestamp, - final long serverTimestamp, final MultiRowMutationState values, + final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values, final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) { final PTable table = tableRef.getTable(); boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator = values.entrySet().iterator(); long timestampToUse = mutationTimestamp; - MultiRowMutationState modifiedValues = new MultiRowMutationState(16); + Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap(); while (iterator.hasNext()) { Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes(); @@ -625,7 +628,7 @@ public class MutationState implements SQLCloseable { } public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { - final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator(); + final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Collections.emptyIterator(); } @@ -633,7 +636,7 @@ public class MutationState implements SQLCloseable { final long serverTimestamp = getTableTimestamp(tableTimestamp, scn); final long mutationTimestamp = getMutationTimestamp(scn); return new Iterator<Pair<byte[],List<Mutation>>>() { - private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next(); + private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { @@ -697,14 +700,14 @@ public class MutationState implements SQLCloseable { private long[] validateAll() throws SQLException { int i = 0; long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { TableRef tableRef = entry.getKey(); timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue()); } return timeStamps; } - private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException { + private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); @@ -916,7 +919,7 @@ public class MutationState implements SQLCloseable { sendAll = true; } - MultiRowMutationState multiRowMutationState; + Map<ImmutableBytesPtr, RowMutationState> valuesMap; Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap(); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { @@ -925,16 +928,16 @@ public class MutationState implements SQLCloseable { while (tableRefIterator.hasNext()) { // at this point we are going through mutations for each table final TableRef tableRef = tableRefIterator.next(); - multiRowMutationState = mutations.get(tableRef); - if (multiRowMutationState == null || multiRowMutationState.isEmpty()) { + valuesMap = mutations.get(tableRef); + if (valuesMap == null || valuesMap.isEmpty()) { continue; } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++]; + long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++]; Long scn = connection.getSCN(); long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; final PTable table = tableRef.getTable(); - Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll); + Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll); // build map from physical table to mutation list boolean isDataTable = true; while (mutationsIterator.hasNext()) { @@ -952,7 +955,7 @@ public class MutationState implements SQLCloseable { // involved in the transaction since none of them would have been // committed in the event of a failure. if (table.isTransactional()) { - addUncommittedStatementIndexes(multiRowMutationState.values()); + addUncommittedStatementIndexes(valuesMap.values()); if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); } @@ -961,7 +964,7 @@ public class MutationState implements SQLCloseable { // in the event that we need to replay the commit. // Copy TableRef so we have the original PTable and know when the // indexes have changed. - joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations); + joinMutationState(new TableRef(tableRef), valuesMap, txMutations); } } long serverTimestamp = HConstants.LATEST_TIMESTAMP; @@ -1185,7 +1188,7 @@ public class MutationState implements SQLCloseable { } private int[] getUncommittedStatementIndexes() { - for (MultiRowMutationState rowMutationMap : mutations.values()) { + for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) { addUncommittedStatementIndexes(rowMutationMap.values()); } return uncommittedStatementIndexes; @@ -1218,7 +1221,7 @@ public class MutationState implements SQLCloseable { } public void commit() throws SQLException { - Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); + Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); int retryCount = 0; do { boolean sendSuccessful=false; @@ -1428,46 +1431,6 @@ public class MutationState implements SQLCloseable { } } - public static class MultiRowMutationState { - private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState; - private long estimatedSize; - - public MultiRowMutationState(int size) { - this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size); - this.estimatedSize = 0; - } - - public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { - estimatedSize += rowMutationState.calculateEstimatedSize(); - return rowKeyToRowMutationState.put(ptr, rowMutationState); - } - - public void putAll(MultiRowMutationState other) { - estimatedSize += other.estimatedSize; - rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState); - } - - public boolean isEmpty() { - return rowKeyToRowMutationState.isEmpty(); - } - - public int size() { - return rowKeyToRowMutationState.size(); - } - - public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() { - return rowKeyToRowMutationState.entrySet(); - } - - public void clear(){ - rowKeyToRowMutationState.clear(); - } - - public Collection<RowMutationState> values() { - return rowKeyToRowMutationState.values(); - } - } - public static class RowMutationState { @Nonnull private Map<PColumn,byte[]> columnValues; private int[] statementIndexes; http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 74f91b4..b23ea1b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -73,7 +73,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.MutationState.MultiRowMutationState; +import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -296,7 +296,7 @@ public class IndexUtil { } public static List<Mutation> generateIndexData(final PTable table, PTable index, - final MultiRowMutationState multiRowMutationState, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) + final Map<ImmutableBytesPtr, RowMutationState> valuesMap, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) throws SQLException { try { final ImmutableBytesPtr ptr = new ImmutableBytesPtr(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/355ee522/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index df6a349..318c9d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -189,10 +188,10 @@ public class KeyValueUtil { * @return estimated row size */ public static long - getEstimatedRowMutationSize(Map<TableRef, MultiRowMutationState> tableMutationMap) { + getEstimatedRowMutationSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableMutationMap) { long size = 0; // iterate over table - for (Entry<TableRef, MultiRowMutationState> tableEntry : tableMutationMap.entrySet()) { + for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : tableMutationMap.entrySet()) { // iterate over rows for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue().entrySet()) { size += calculateRowMutationSize(rowEntry);