Repository: hbase Updated Branches: refs/heads/branch-1.1 10b219b13 -> c5cfea4e3
HBASE-15031 Fix merge of MVCC and SequenceID performance regression in branch-1.0 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5cfea4e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5cfea4e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5cfea4e Branch: refs/heads/branch-1.1 Commit: c5cfea4e37a8c01f9d96fe6b6ae759f42f348b35 Parents: 10b219b Author: stack <[email protected]> Authored: Mon Dec 28 16:12:35 2015 -0800 Committer: stack <[email protected]> Committed: Mon Dec 28 16:31:07 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Scan.java | 1 + .../hadoop/hbase/client/TestIncrement.java | 2 +- .../main/java/org/apache/hadoop/hbase/Tag.java | 29 +- .../hadoop/hbase/regionserver/HRegion.java | 567 ++++++++++++------- .../MultiVersionConsistencyControl.java | 2 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 4 +- .../hadoop/hbase/client/TestFromClientSide.java | 264 +-------- .../hbase/client/TestFromClientSide3.java | 5 +- .../hbase/client/TestFromClientSideNoCodec.java | 2 +- .../TestFromClientSideWithCoprocessor.java | 2 +- .../hbase/regionserver/TestAtomicOperation.java | 77 ++- .../hadoop/hbase/regionserver/TestTags.java | 2 +- 12 files changed, 441 insertions(+), 516 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index cf9dc33..f56d3f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -269,6 +269,7 @@ public class Scan extends Query { this.familyMap = get.getFamilyMap(); this.getScan = true; this.consistency = get.getConsistency(); + this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry<String, byte[]> attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index 8a2c447..39cde45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -29,7 +29,7 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestIncrement { @Test - public void test() { + public void testIncrementInstance() { final long expected = 13; Increment inc = new Increment(new byte [] {'r'}); int total = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index 2e7314d..d0719f0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -180,6 +181,7 @@ public class Tag { * @return the serialized tag data as bytes */ public static byte[] fromList(List<Tag> tags) { + if (tags == null || tags.size() <= 0) return null; int length = 0; for (Tag tag: tags) { length += tag.length; @@ -226,4 +228,29 @@ public class Tag { int getOffset() { return this.offset; } -} + + + /** + * @return A List<Tag> of any Tags found in <code>cell</code> else null. + */ + public static List<Tag> carryForwardTags(final Cell cell) { + return carryForwardTags(null, cell); + } + + /** + * @return Add to <code>tagsOrNull</code> any Tags <code>cell</code> is carrying or null if + * it is carrying no Tags AND the passed in <code>tagsOrNull</code> is null (else we return new + * List<Tag> with Tags found). + */ + public static List<Tag> carryForwardTags(final List<Tag> tagsOrNull, final Cell cell) { + List<Tag> tags = tagsOrNull; + if (cell.getTagsLength() <= 0) return tags; + Iterator<Tag> itr = + CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + if (tags == null) tags = new ArrayList<Tag>(); + while (itr.hasNext()) { + tags.add(itr.next()); + } + return tags; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 27b24b4..4d90875 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -215,6 +216,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; /** + * Set region to take the fast increment path. Constraint is that caller can only access the + * Cell via Increment; intermixing Increment with other Mutations will give indeterminate + * results. A Get with {@link IsolationLevel#READ_UNCOMMITTED} will get the latest increment + * or an Increment of zero will do the same. + */ + public static final String INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = + "hbase.increment.fast.but.narrow.consistency"; + private final boolean incrementFastButNarrowConsistency; + + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ @@ -756,6 +767,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + + // See #INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY for what this flag is about. + this.incrementFastButNarrowConsistency = + this.conf.getBoolean(INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false); } void setHTableSpecificConf() { @@ -3573,30 +3588,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List<Tag> newTags = new ArrayList<Tag>(); - Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } + List<Tag> newTags = Tag.carryForwardTags(null, cell); + newTags = carryForwardTTLTag(newTags, m); // Rewrite the cell with the updated set of tags - cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), @@ -6941,40 +6936,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long ts = Math.max(now, oldCell.getTimestamp()); // Process cell tags - List<Tag> newTags = new ArrayList<Tag>(); - - // Make a union of the set of tags in the old and new KVs - - if (oldCell.getTagsLength() > 0) { - Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(), - oldCell.getTagsOffset(), oldCell.getTagsLength()); - while (i.hasNext()) { - newTags.add(i.next()); - } - } - if (cell.getTagsLength() > 0) { - Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - while (i.hasNext()) { - newTags.add(i.next()); - } - } - - // Cell TTL handling - - if (append.getTTL() != Long.MAX_VALUE) { - // Add the new TTL tag - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); - } + List<Tag> tags = Tag.carryForwardTags(null, oldCell); + tags = Tag.carryForwardTags(tags, cell); + tags = carryForwardTTLTag(tags, append); // Rebuild tags - byte[] tagBytes = Tag.fromList(newTags); + byte[] tagBytes = Tag.fromList(tags); // allocate an empty cell once newCell = new KeyValue(row.length, cell.getFamilyLength(), cell.getQualifierLength(), ts, KeyValue.Type.Put, oldCell.getValueLength() + cell.getValueLength(), - tagBytes.length); + tagBytes == null? 0: tagBytes.length); // copy in row, family, and qualifier System.arraycopy(cell.getRowArray(), cell.getRowOffset(), newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); @@ -6993,8 +6966,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi newCell.getValueOffset() + oldCell.getValueLength(), cell.getValueLength()); // Copy in tag data - System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), - tagBytes.length); + if (tagBytes != null) { + System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), + tagBytes.length); + } idx++; } else { // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP @@ -7003,8 +6978,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Cell TTL handling if (append.getTTL() != Long.MAX_VALUE) { - List<Tag> newTags = new ArrayList<Tag>(1); - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); // Add the new TTL tag newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), @@ -7014,7 +6987,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - newTags); + carryForwardTTLTag(append)); } else { newCell = cell; } @@ -7130,185 +7103,217 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { - byte [] row = increment.getRow(); - checkRow(row, "increment"); - TimeRange tr = increment.getTimeRange(); - boolean flush = false; - Durability durability = getEffectiveDurability(increment.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List<Cell> allKVs = new ArrayList<Cell>(increment.size()); - Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); - - long size = 0; - long txid = 0; - checkReadOnly(); checkResources(); - // Lock row + checkRow(increment.getRow(), "increment"); startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); + try { + // Which Increment is it? Narrow increment-only consistency or slow (default) and general + // row-wide consistency. + + // So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is + // that the former holds the row lock until the sync completes; this allows us to reason that + // there are no other writers afoot when we read the current increment value. The row lock + // means that we do not need to wait on mvcc reads to catch up to writes before we proceed + // with the read, the root of the slowdown seen in HBASE-14460. The fast-path also does not + // wait on mvcc to complete before returning to the client. We also reorder the write so that + // the update of memstore happens AFTER sync returns; i.e. the write pipeline does less + // zigzagging now. + // + // See the comment on INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY + // for the constraints that apply when you take this code path; it is correct but only if + // Increments are used mutating an Increment Cell; mixing concurrent Put+Delete and Increment + // will yield indeterminate results. + return this.incrementFastButNarrowConsistency? + fastAndNarrowConsistencyIncrement(increment, nonceGroup, nonce): + slowButConsistentIncrement(increment, nonceGroup, nonce); + } finally { + if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); + closeRegionOperation(Operation.INCREMENT); + } + } + + /** + * The bulk of this method is a bulk-and-paste of the slowButConsistentIncrement but with some + * reordering to enable the fast increment (reordering allows us to also drop some state + * carrying Lists and variables so the flow here is more straight-forward). We copy-and-paste + * because cannot break down the method further into smaller pieces. Too much state. Will redo + * in trunk and tip of branch-1 to undo duplication here and in append, checkAnd*, etc. For why + * this route is 'faster' than the alternative slowButConsistentIncrement path, see the comment + * in calling method. + * @return Resulting increment + * @throws IOException + */ + private Result fastAndNarrowConsistencyIncrement(Increment increment, long nonceGroup, + long nonce) + throws IOException { + long accumulatedResultSize = 0; RowLock rowLock = null; - WriteEntry writeEntry = null; WALKey walKey = null; - long mvccNum = 0; - List<Cell> memstoreCells = new ArrayList<Cell>(); - boolean doRollBackMemstore = false; + // This is all kvs accumulated during this increment processing. Includes increments where the + // increment is zero: i.e. client just wants to get current state of the increment w/o + // changing it. These latter increments by zero are NOT added to the WAL. + List<Cell> allKVs = new ArrayList<Cell>(increment.size()); + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + long txid = 0; + rowLock = getRowLock(increment.getRow()); try { - rowLock = getRowLock(row); + lock(this.updatesLock.readLock()); try { - lock(this.updatesLock.readLock()); - try { - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.waitForPreviousTransactionsComplete(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); - if (r != null) { - return r; + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; + } + // Process increments a Store/family at a time. + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + WALEdit walEdits = null; + // Accumulate edits for memstore to add later after we've added to WAL. + Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>(); + for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { + byte [] columnFamilyName = entry.getKey(); + List<Cell> increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we get. + List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName, + sort(increments, store.getComparator()), now, + MultiVersionConsistencyControl.NO_WRITE_NUMBER, allKVs, + IsolationLevel.READ_UNCOMMITTED); + if (!results.isEmpty()) { + forMemStore.put(store, results); + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.getCells().addAll(results); } } - // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry<byte [], List<Cell>> family: - increment.getFamilyCellMap().entrySet()) { - - Store store = stores.get(family.getKey()); - List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); - - // Sort the cells so that they match the order that they - // appear in the Get results. Otherwise, we won't be able to - // find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - Collections.sort(family.getValue(), store.getComparator()); - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell: family.getValue()) { - get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); - } - get.setTimeRange(tr.getMin(), tr.getMax()); - List<Cell> results = get(get, false); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - int idx = 0; - List<Cell> edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - List<Tag> newTags = new ArrayList<Tag>(); - - // Carry forward any tags that might have been added by a coprocessor - if (cell.getTagsLength() > 0) { - Iterator<Tag> itr = CellUtil.tagsIterator(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - while (itr.hasNext()) { - newTags.add(itr.next()); - } - } - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - if (c.getTagsLength() > 0) { - Iterator<Tag> itr = CellUtil.tagsIterator(c.getTagsArray(), - c.getTagsOffset(), c.getTagsLength()); - while (itr.hasNext()) { - newTags.add(itr.next()); - } - } - if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) - idx++; - } + } - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); + // Actually write to WAL now. If walEdits is non-empty, we write the WAL. + if (walEdits != null && !walEdits.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, getSequenceId(), true, null/*walEdits has the List to apply*/); + } else { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendEmptyEdit(this.wal, null/*walEdits has the List to apply*/); + } - // Add the TTL tag if the mutation carried one - if (increment.getTTL() != Long.MAX_VALUE) { - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL()))); - } + if (txid != 0) syncOrDefer(txid, effectiveDurability); - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); + // Tell MVCC about the new sequenceid. + WriteEntry we = mvcc.beginMemstoreInsertWithSeqNum(walKey.getSequenceId()); - CellUtil.setSequenceId(newKV, mvccNum); + // Now write to memstore. + for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { + Store store = entry.getKey(); + List<Cell> results = entry.getValue(); + if (store.getFamily().getMaxVersions() == 1) { + // Upsert if VERSIONS for this CF == 1. Use write sequence id rather than read point + // when doing fast increment. + accumulatedResultSize += store.upsert(results, walKey.getSequenceId()); + } else { + // Otherwise keep older versions around + for (Cell cell: results) { + Pair<Long, Cell> ret = store.add(cell); + accumulatedResultSize += ret.getFirst(); + } + } + } - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, increment, c, newKV); - } - allKVs.add(newKV); + // Tell mvcc this write is complete. + this.mvcc.advanceMemstore(we); + } finally { + this.updatesLock.readLock().unlock(); + } + } finally { + rowLock.release(); + } + // Request a cache flush. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); + return Result.create(allKVs); + } - if (!noWriteBack) { - kvs.add(newKV); + private Result slowButConsistentIncrement(Increment increment, long nonceGroup, long nonce) + throws IOException { + RowLock rowLock = null; + WriteEntry writeEntry = null; + WALKey walKey = null; + boolean doRollBackMemstore = false; + long accumulatedResultSize = 0; + List<Cell> allKVs = new ArrayList<Cell>(increment.size()); + List<Cell> memstoreCells = new ArrayList<Cell>(); + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + try { + rowLock = getRowLock(increment.getRow()); + long txid = 0; + try { + lock(this.updatesLock.readLock()); + try { + // Wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest increment) + this.mvcc.waitForPreviousTransactionsComplete(); + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; + } + // Now start my own transaction + long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); + // Process increments a Store/family at a time. + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + WALEdit walEdits = null; + for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { + byte [] columnFamilyName = entry.getKey(); + List<Cell> increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we get. + List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName, + sort(increments, store.getComparator()), now, mvccNum, allKVs, null); + if (!results.isEmpty()) { + // Prepare WAL updates + if (writeToWAL) { + // Handmade loop on arraylist is faster than enhanced for-loop. + // See http://developer.android.com/training/articles/perf-tips.html + int resultsSize = results.size(); + for (int i = 0; i < resultsSize; i++) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.add(results.get(i)); } } - } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } - } - - //Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); + // Now write to this Store's memstore. if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); + // Upsert if VERSIONS for this CF == 1 + accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); + memstoreCells.addAll(results); + // TODO: St.Ack 20151222 Why no rollback in this case? } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { + // Otherwise keep older versions around + for (Cell cell: results) { Pair<Long, Cell> ret = store.add(cell); - size += ret.getFirst(); + accumulatedResultSize += ret.getFirst(); memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), @@ -7317,7 +7322,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi recordMutationWithoutWal(increment.getFamilyCellMap()); } } - if(walKey == null){ + if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned walKey = this.appendEmptyEdit(this.wal, memstoreCells); } @@ -7329,32 +7334,154 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock = null; } // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); - } + if (txid != 0) syncOrDefer(txid, effectiveDurability); doRollBackMemstore = false; } finally { - if (rowLock != null) { - rowLock.release(); - } + if (rowLock != null) rowLock.release(); // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); - if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); - } else if (writeEntry != null) { - mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); + if (doRollBackMemstore) rollbackMemstore(memstoreCells); + if (writeEntry != null) mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); + } + // Request a cache flush. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); + return Result.create(allKVs); + } + + /** + * @return Sorted list of <code>cells</code> using <code>comparator</code> + */ + private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) { + Collections.sort(cells, comparator); + return cells; + } + + /** + * Apply increments to a column family. + * @param sortedIncrements The passed in increments to apply MUST be sorted so that they match + * the order that they appear in the Get results (get results will be sorted on return). + * Otherwise, we won't be able to find the existing values if the cells are not specified in + * order by the client since cells are in an array list. + * @islation Isolation level to use when running the 'get'. Pass null for default. + * @return Resulting increments after <code>sortedIncrements</code> have been applied to current + * values (if any -- else passed increment is the final result). + * @throws IOException + */ + private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName, + List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs, + final IsolationLevel isolation) + throws IOException { + List<Cell> results = new ArrayList<Cell>(sortedIncrements.size()); + byte [] row = increment.getRow(); + // Get previous values for all columns in this family + List<Cell> currentValues = + getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the increment amount + int idx = 0; + for (int i = 0; i < sortedIncrements.size(); i++) { + Cell inc = sortedIncrements.get(i); + long incrementAmount = getLongValue(inc); + // If increment amount == 0, then don't write this Increment to the WAL. + boolean writeBack = (incrementAmount != 0); + // Carry forward any tags that might have been added by a coprocessor. + List<Tag> tags = Tag.carryForwardTags(inc); + + Cell currentValue = null; + long ts = now; + if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) { + currentValue = currentValues.get(idx); + ts = Math.max(now, currentValue.getTimestamp()); + incrementAmount += getLongValue(currentValue); + // Carry forward all tags + tags = Tag.carryForwardTags(tags, currentValue); + if (i < (sortedIncrements.size() - 1) && + !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++; + } + + // Append new incremented KeyValue to list + byte [] qualifier = CellUtil.cloneQualifier(inc); + byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount); + tags = carryForwardTTLTag(tags, increment); + + Cell newValue = new KeyValue(row, 0, row.length, + columnFamilyName, 0, columnFamilyName.length, + qualifier, 0, qualifier.length, + ts, KeyValue.Type.Put, + incrementAmountInBytes, 0, incrementAmountInBytes.length, + tags); + + // Don't set an mvcc if none specified. The mvcc may be assigned later in case where we + // write the memstore AFTER we sync our edit to the log. + if (mvccNum != MultiVersionConsistencyControl.NO_WRITE_NUMBER) { + CellUtil.setSequenceId(newValue, mvccNum); + } + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newValue = coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue); } - closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); + allKVs.add(newValue); + if (writeBack) { + results.add(newValue); } } + return results; + } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); + /** + * @return Get the long out of the passed in Cell + * @throws DoNotRetryIOException + */ + private static long getLongValue(final Cell cell) throws DoNotRetryIOException { + int len = cell.getValueLength(); + if (len != Bytes.SIZEOF_LONG) { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); + } + return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); + } + + /** + * Do a specific Get on passed <code>columnFamily</code> and column qualifiers + * from <code>incrementCoordinates</code> only. + * @param increment + * @param columnFamily + * @param incrementCoordinates + * @return Return the Cells to Increment + * @throws IOException + */ + private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily, + final List<Cell> increments, final IsolationLevel isolation) + throws IOException { + Get get = new Get(increment.getRow()); + if (isolation != null) get.setIsolationLevel(isolation); + for (Cell cell: increments) { + get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); } - return increment.isReturnResults() ? Result.create(allKVs) : null; + TimeRange tr = increment.getTimeRange(); + get.setTimeRange(tr.getMin(), tr.getMax()); + return get(get, false); + } + + private static List<Tag> carryForwardTTLTag(final Mutation mutation) { + return carryForwardTTLTag(null, mutation); + } + + /** + * @return Carry forward the TTL tag if the increment is carrying one + */ + private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull, + final Mutation mutation) { + long ttl = mutation.getTTL(); + if (ttl == Long.MAX_VALUE) return tagsOrNull; + List<Tag> tags = tagsOrNull; + // If we are making the array in here, given we are the last thing checked, we'll be only thing + // in the array so set its size to '1' (I saw this being done in earlier version of + // tag-handling). + if (tags == null) tags = new ArrayList<Tag>(1); + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + return tags; } // http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index fee15dd..269c258 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private static final long NO_WRITE_NUMBER = 0; + static final long NO_WRITE_NUMBER = 0; private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index eab1e43..f31a910 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1234,7 +1234,9 @@ public class FSHLog implements WAL { // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); + entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, + /* Passing memstoreCells seems redundant when they are in edits.getCells already */ + (memstoreCells != null)? memstoreCells: edits == null? null: edits.getCells()); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 7a62873..1a10fe6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -51,7 +51,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -120,6 +119,7 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) @SuppressWarnings ("deprecation") public class TestFromClientSide { + // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide. final Log LOG = LogFactory.getLog(getClass()); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static byte [] ROW = Bytes.toBytes("testRow"); @@ -3139,7 +3139,7 @@ public class TestFromClientSide { equals(value, CellUtil.cloneValue(key))); } - private void assertIncrementKey(Cell key, byte [] row, byte [] family, + static void assertIncrementKey(Cell key, byte [] row, byte [] family, byte [] qualifier, long value) throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + @@ -3363,7 +3363,7 @@ public class TestFromClientSide { return stamps; } - private boolean equals(byte [] left, byte [] right) { + static boolean equals(byte [] left, byte [] right) { if (left == null && right == null) return true; if (left == null && right.length == 0) return true; if (right == null && left.length == 0) return true; @@ -4483,264 +4483,6 @@ public class TestFromClientSide { } @Test - public void testIncrementWithDeletes() throws Exception { - LOG.info("Starting testIncrementWithDeletes"); - final TableName TABLENAME = - TableName.valueOf("testIncrementWithDeletes"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - TEST_UTIL.flush(TABLENAME); - - Delete del = new Delete(ROW); - ht.delete(del); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - - Get get = new Get(ROW); - Result r = ht.get(get); - assertEquals(1, r.size()); - assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); - } - - @Test - public void testIncrementingInvalidValue() throws Exception { - LOG.info("Starting testIncrementingInvalidValue"); - final TableName TABLENAME = TableName.valueOf("testIncrementingInvalidValue"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - Put p = new Put(ROW); - // write an integer here (not a Long) - p.add(FAMILY, COLUMN, Bytes.toBytes(5)); - ht.put(p); - try { - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, COLUMN, 5); - try { - ht.increment(inc); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - } - - @Test - public void testIncrementInvalidArguments() throws Exception { - LOG.info("Starting testIncrementInvalidArguments"); - final TableName TABLENAME = TableName.valueOf("testIncrementInvalidArguments"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - try { - // try null row - ht.incrementColumnValue(null, FAMILY, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null family - ht.incrementColumnValue(ROW, null, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null qualifier - ht.incrementColumnValue(ROW, FAMILY, null, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - // try null row - try { - Increment incNoRow = new Increment((byte [])null); - incNoRow.addColumn(FAMILY, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } catch (NullPointerException npe) { - // success - } - // try null family - try { - Increment incNoFamily = new Increment(ROW); - incNoFamily.addColumn(null, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - // try null qualifier - try { - Increment incNoQualifier = new Increment(ROW); - incNoQualifier.addColumn(FAMILY, null, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - } - - @Test - public void testIncrementOutOfOrder() throws Exception { - LOG.info("Starting testIncrementOutOfOrder"); - final TableName TABLENAME = TableName.valueOf("testIncrementOutOfOrder"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") - }; - - Increment inc = new Increment(ROW); - for (int i=0; i<QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify expected results - Result r = ht.get(new Get(ROW)); - Cell [] kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); - - // Now try multiple columns again - inc = new Increment(ROW); - for (int i=0; i<QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify - r = ht.get(new Get(ROW)); - kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); - } - - @Test - public void testIncrementOnSameColumn() throws Exception { - LOG.info("Starting testIncrementOnSameColumn"); - final byte[] TABLENAME = Bytes.toBytes("testIncrementOnSameColumn"); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte[][] QUALIFIERS = - new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; - - Increment inc = new Increment(ROW); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify expected results - Result r = ht.get(new Get(ROW)); - Cell[] kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); - - // Now try multiple columns again - inc = new Increment(ROW); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify - r = ht.get(new Get(ROW)); - kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); - - ht.close(); - } - - @Test - public void testIncrement() throws Exception { - LOG.info("Starting testIncrement"); - final TableName TABLENAME = TableName.valueOf("testIncrement"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] ROWS = new byte [][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), - Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), - Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") - }; - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), - Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), - Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") - }; - - // Do some simple single-column increments - - // First with old API - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4); - - // Now increment things incremented with old and do some new - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, QUALIFIERS[1], 1); - inc.addColumn(FAMILY, QUALIFIERS[3], 1); - inc.addColumn(FAMILY, QUALIFIERS[4], 1); - ht.increment(inc); - - // Verify expected results - Result r = ht.get(new Get(ROW)); - Cell [] kvs = r.rawCells(); - assertEquals(5, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3); - assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5); - assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1); - - // Now try multiple columns by different amounts - inc = new Increment(ROWS[0]); - for (int i=0;i<QUALIFIERS.length;i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], i+1); - } - ht.increment(inc); - // Verify - r = ht.get(new Get(ROWS[0])); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i=0;i<QUALIFIERS.length;i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1); - } - - // Re-increment them - inc = new Increment(ROWS[0]); - for (int i=0;i<QUALIFIERS.length;i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], i+1); - } - ht.increment(inc); - // Verify - r = ht.get(new Get(ROWS[0])); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i=0;i<QUALIFIERS.length;i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); - } - } - - - @Test public void testClientPoolRoundRobin() throws IOException { final TableName tableName = TableName.valueOf("testClientPoolRoundRobin"); http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index dd29e9e..1e4ee21 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.util.ArrayList; @@ -32,14 +31,14 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java index ae96849..66fb69c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java @@ -99,4 +99,4 @@ public class TestFromClientSideNoCodec { String codec = AbstractRpcClient.getDefaultCodec(c); assertTrue(codec == null || codec.length() == 0); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java index 2671af7..cd2409e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java @@ -27,7 +27,7 @@ import org.junit.experimental.categories.Category; /** * Test all client operations with a coprocessor that - * just implements the default flush/compact/scan policy + * just implements the default flush/compact/scan policy. */ @Category(LargeTests.class) public class TestFromClientSideWithCoprocessor extends TestFromClientSide { http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index cd63679..3b97721 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -79,7 +80,7 @@ public class TestAtomicOperation { static final Log LOG = LogFactory.getLog(TestAtomicOperation.class); @Rule public TestName name = new TestName(); - Region region = null; + HRegion region = null; private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); // Test names @@ -134,16 +135,35 @@ public class TestAtomicOperation { } /** - * Test multi-threaded increments. + * Test multi-threaded increments. Take the fast but narrow consistency path through HRegion. */ @Test - public void testIncrementMultiThreads() throws IOException { + public void testIncrementMultiThreadsFastPath() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + String oldValue = conf.get(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); + conf.setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, true); + try { + testIncrementMultiThreads(true); + } finally { + if (oldValue != null) conf.set(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, oldValue); + } + } + + /** + * Test multi-threaded increments. Take the slow but consistent path through HRegion. + */ + @Test + public void testIncrementMultiThreadsSlowPath() throws IOException { + testIncrementMultiThreads(false); + } + private void testIncrementMultiThreads(final boolean fast) throws IOException { LOG.info("Starting test testIncrementMultiThreads"); // run a with mixed column families (1 and 3 versions) initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); - // create 100 threads, each will increment by its own quantity + // Create 100 threads, each will increment by its own quantity. All 100 threads update the + // same row over two column families. int numThreads = 100; int incrementsPerThread = 1000; Incrementer[] all = new Incrementer[numThreads]; @@ -167,9 +187,10 @@ public class TestAtomicOperation { } catch (InterruptedException e) { } } - assertICV(row, fam1, qual1, expectedTotal); - assertICV(row, fam1, qual2, expectedTotal*2); - assertICV(row, fam2, qual3, expectedTotal*3); + + assertICV(row, fam1, qual1, expectedTotal, fast); + assertICV(row, fam1, qual2, expectedTotal*2, fast); + assertICV(row, fam2, qual3, expectedTotal*3, fast); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -178,9 +199,11 @@ public class TestAtomicOperation { private void assertICV(byte [] row, byte [] familiy, byte[] qualifier, - long amount) throws IOException { + long amount, + boolean fast) throws IOException { // run a get and see? Get get = new Get(row); + if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); get.addColumn(familiy, qualifier); Result result = region.get(get); assertEquals(1, result.size()); @@ -211,20 +234,24 @@ public class TestAtomicOperation { } /** - * A thread that makes a few increment calls + * A thread that makes increment calls always on the same row, this.row against two column + * families on this row. */ public static class Incrementer extends Thread { - private final Region region; + private final HRegion region; private final int numIncrements; private final int amount; + private final boolean fast; - public Incrementer(Region region, - int threadNumber, int amount, int numIncrements) { + public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) { + super("Incrementer." + threadNumber); this.region = region; this.numIncrements = numIncrements; this.amount = amount; + this.fast = region.getBaseConf(). + getBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false); setDaemon(true); } @@ -237,13 +264,13 @@ public class TestAtomicOperation { inc.addColumn(fam1, qual2, amount*2); inc.addColumn(fam2, qual3, amount*3); inc.setDurability(Durability.ASYNC_WAL); - region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); - - // verify: Make sure we only see completed increments - Get g = new Get(row); - Result result = region.get(g); - assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); - assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); + Result result = region.increment(inc); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, + Bytes.toLong(result.getValue(fam1, qual2))); + long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; + long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); + assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, + fam1Increment, fam2Increment); } catch (IOException e) { e.printStackTrace(); } @@ -499,13 +526,13 @@ public class TestAtomicOperation { } public static class AtomicOperation extends Thread { - protected final Region region; + protected final HRegion region; protected final int numOps; protected final AtomicLong timeStamps; protected final AtomicInteger failures; protected final Random r = new Random(); - public AtomicOperation(Region region, int numOps, AtomicLong timeStamps, + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { this.region = region; this.numOps = numOps; @@ -568,8 +595,8 @@ public class TestAtomicOperation { } private class PutThread extends TestThread { - private Region region; - PutThread(TestContext ctx, Region region) { + private HRegion region; + PutThread(TestContext ctx, HRegion region) { super(ctx); this.region = region; } @@ -585,8 +612,8 @@ public class TestAtomicOperation { } private class CheckAndPutThread extends TestThread { - private Region region; - CheckAndPutThread(TestContext ctx, Region region) { + private HRegion region; + CheckAndPutThread(TestContext ctx, HRegion region) { super(ctx); this.region = region; } http://git-wip-us.apache.org/repos/asf/hbase/blob/c5cfea4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index 94e2028..4c7a204 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -549,7 +549,7 @@ public class TestTags { public static class TestCoprocessorForTags extends BaseRegionObserver { - public static boolean checkTagPresence = false; + public static volatile boolean checkTagPresence = false; public static List<Tag> tags = null; @Override
