Revert "--whitespace=fix" Revert bad commit. This reverts commit 930f68c0b976a600066b838283a0f3dce050256f.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/492db89d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/492db89d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/492db89d Branch: refs/heads/branch-1.2 Commit: 492db89d42e490dff0b521f0b1d623d1ac7af9f4 Parents: ab0651e Author: stack <[email protected]> Authored: Fri Jan 15 14:50:10 2016 -0800 Committer: stack <[email protected]> Committed: Fri Jan 15 14:50:10 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Scan.java | 2 - .../hadoop/hbase/client/TestIncrement.java | 2 +- .../main/java/org/apache/hadoop/hbase/Tag.java | 29 +- .../hadoop/hbase/regionserver/HRegion.java | 556 +++++++------------ .../MultiVersionConcurrencyControl.java | 5 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 14 +- .../hadoop/hbase/IncrementPerformanceTest.java | 129 ----- .../hadoop/hbase/client/TestFromClientSide.java | 263 ++++++++- .../hbase/client/TestFromClientSide3.java | 5 +- .../hbase/client/TestFromClientSideNoCodec.java | 2 +- .../TestFromClientSideWithCoprocessor.java | 2 +- ...tIncrementFromClientSideWithCoprocessor.java | 49 -- .../client/TestIncrementsFromClientSide.java | 433 --------------- .../mapreduce/TestTableInputFormatScanBase.java | 5 +- .../hbase/regionserver/TestAtomicOperation.java | 31 +- .../hbase/regionserver/TestRegionIncrement.java | 254 --------- .../hadoop/hbase/regionserver/TestTags.java | 2 +- 17 files changed, 493 insertions(+), 1290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index b13837d..4825cca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -220,7 +220,6 @@ public class Scan extends Query { filter = scan.getFilter(); // clone? loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue(); consistency = scan.getConsistency(); - this.setIsolationLevel(scan.getIsolationLevel()); reversed = scan.isReversed(); small = scan.isSmall(); allowPartialResults = scan.getAllowPartialResults(); @@ -263,7 +262,6 @@ public class Scan extends Query { this.familyMap = get.getFamilyMap(); this.getScan = true; this.consistency = get.getConsistency(); - this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry<String, byte[]> attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index 39cde45..8a2c447 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -29,7 +29,7 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestIncrement { @Test - public void testIncrementInstance() { + public void test() { final long expected = 13; Increment inc = new Increment(new byte [] {'r'}); int total = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index d0719f0..2e7314d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -181,7 +180,6 @@ public class Tag { * @return the serialized tag data as bytes */ public static byte[] fromList(List<Tag> tags) { - if (tags == null || tags.size() <= 0) return null; int length = 0; for (Tag tag: tags) { length += tag.length; @@ -228,29 +226,4 @@ public class Tag { int getOffset() { return this.offset; } - - - /** - * @return A List<Tag> of any Tags found in <code>cell</code> else null. - */ - public static List<Tag> carryForwardTags(final Cell cell) { - return carryForwardTags(null, cell); - } - - /** - * @return Add to <code>tagsOrNull</code> any Tags <code>cell</code> is carrying or null if - * it is carrying no Tags AND the passed in <code>tagsOrNull</code> is null (else we return new - * List<Tag> with Tags found). - */ - public static List<Tag> carryForwardTags(final List<Tag> tagsOrNull, final Cell cell) { - List<Tag> tags = tagsOrNull; - if (cell.getTagsLength() <= 0) return tags; - Iterator<Tag> itr = - CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - if (tags == null) tags = new ArrayList<Tag>(); - while (itr.hasNext()) { - tags.add(itr.next()); - } - return tags; - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f1566af..cfd057a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -151,8 +150,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -216,16 +215,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; /** - * Set region to take the fast increment path. Constraint is that caller can only access the - * Cell via Increment; intermixing Increment with other Mutations will give indeterminate - * results. A Get with {@link IsolationLevel#READ_UNCOMMITTED} will get the latest increment - * or an Increment of zero will do the same. - */ - public static final String INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = - "hbase.increment.fast.but.narrow.consistency"; - private final boolean incrementFastButNarrowConsistency; - - /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ @@ -756,10 +745,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); - - // See #INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY for what this flag is about. - this.incrementFastButNarrowConsistency = - this.conf.getBoolean(INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false); } void setHTableSpecificConf() { @@ -3607,10 +3592,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List<Tag> newTags = Tag.carryForwardTags(null, cell); - newTags = carryForwardTTLTag(newTags, m); + List<Tag> newTags = new ArrayList<Tag>(); + Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), + cell.getTagsOffset(), cell.getTagsLength()); + + // Carry forward existing tags + + while (tagIterator.hasNext()) { + + // Add any filters or tag specific rewrites here + + newTags.add(tagIterator.next()); + } + + // Cell TTL handling + + // Check again if we need to add a cell TTL because early out logic + // above may change when there are more tag based features in core. + if (m.getTTL() != Long.MAX_VALUE) { + // Add a cell TTL tag + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); + } // Rewrite the cell with the updated set of tags + cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), @@ -7165,18 +7170,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Process cell tags // Make a union of the set of tags in the old and new KVs - List<Tag> tags = Tag.carryForwardTags(null, oldCell); - tags = Tag.carryForwardTags(tags, cell); - tags = carryForwardTTLTag(tags, mutate); + List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>()); + newTags = carryForwardTags(cell, newTags); + + // Cell TTL handling + + if (mutate.getTTL() != Long.MAX_VALUE) { + // Add the new TTL tag + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); + } // Rebuild tags - byte[] tagBytes = Tag.fromList(tags); + byte[] tagBytes = Tag.fromList(newTags); // allocate an empty cell once newCell = new KeyValue(row.length, cell.getFamilyLength(), cell.getQualifierLength(), ts, KeyValue.Type.Put, oldCell.getValueLength() + cell.getValueLength(), - tagBytes == null? 0: tagBytes.length); + tagBytes.length); // copy in row, family, and qualifier System.arraycopy(cell.getRowArray(), cell.getRowOffset(), newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); @@ -7195,10 +7206,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi newCell.getValueOffset() + oldCell.getValueLength(), cell.getValueLength()); // Copy in tag data - if (tagBytes != null) { - System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), - tagBytes.length); - } + System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), + tagBytes.length); idx++; } else { // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP @@ -7207,6 +7216,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Cell TTL handling if (mutate.getTTL() != Long.MAX_VALUE) { + List<Tag> newTags = new ArrayList<Tag>(1); + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); // Add the new TTL tag newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), @@ -7216,7 +7227,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - carryForwardTTLTag(mutate)); + newTags); } else { newCell = cell; } @@ -7351,218 +7362,174 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException { Operation op = Operation.INCREMENT; + byte [] row = mutation.getRow(); + checkRow(row, op.toString()); + boolean flush = false; + Durability durability = getEffectiveDurability(mutation.getDurability()); + boolean writeToWAL = durability != Durability.SKIP_WAL; + WALEdit walEdits = null; + List<Cell> allKVs = new ArrayList<Cell>(mutation.size()); + + Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); + long size = 0; + long txid = 0; checkReadOnly(); checkResources(); - checkRow(mutation.getRow(), op.toString()); + // Lock row startRegionOperation(op); this.writeRequestsCount.increment(); - try { - // Which Increment is it? Narrow increment-only consistency or slow (default) and general - // row-wide consistency. - - // So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is - // that the former holds the row lock until the sync completes; this allows us to reason that - // there are no other writers afoot when we read the current increment value. The row lock - // means that we do not need to wait on mvcc reads to catch up to writes before we proceed - // with the read, the root of the slowdown seen in HBASE-14460. The fast-path also does not - // wait on mvcc to complete before returning to the client. We also reorder the write so that - // the update of memstore happens AFTER sync returns; i.e. the write pipeline does less - // zigzagging now. - // - // See the comment on INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY - // for the constraints that apply when you take this code path; it is correct but only if - // Increments are used mutating an Increment Cell; mixing concurrent Put+Delete and Increment - // will yield indeterminate results. - return this.incrementFastButNarrowConsistency? - fastAndNarrowConsistencyIncrement(mutation, nonceGroup, nonce): - slowButConsistentIncrement(mutation, nonceGroup, nonce); - } finally { - if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); - closeRegionOperation(Operation.INCREMENT); - } - } - - /** - * The bulk of this method is a bulk-and-paste of the slowButConsistentIncrement but with some - * reordering to enable the fast increment (reordering allows us to also drop some state - * carrying Lists and variables so the flow here is more straight-forward). We copy-and-paste - * because cannot break down the method further into smaller pieces. Too much state. Will redo - * in trunk and tip of branch-1 to undo duplication here and in append, checkAnd*, etc. For why - * this route is 'faster' than the alternative slowButConsistentIncrement path, see the comment - * in calling method. - * @return Resulting increment - * @throws IOException - */ - private Result fastAndNarrowConsistencyIncrement(Increment increment, long nonceGroup, - long nonce) - throws IOException { - long accumulatedResultSize = 0; - WALKey walKey = null; - long txid = 0; - // This is all kvs accumulated during this increment processing. Includes increments where the - // increment is zero: i.e. client just wants to get current state of the increment w/o - // changing it. These latter increments by zero are NOT added to the WAL. - List<Cell> allKVs = new ArrayList<Cell>(increment.size()); - Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); - RowLock rowLock = getRowLock(increment.getRow()); - try { - lock(this.updatesLock.readLock()); - try { - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); - if (r != null) return r; - } - long now = EnvironmentEdgeManager.currentTime(); - final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; - WALEdit walEdits = null; - // Process increments a Store/family at a time. - // Accumulate edits for memstore to add later after we've added to WAL. - Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>(); - for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { - byte [] columnFamilyName = entry.getKey(); - List<Cell> increments = entry.getValue(); - Store store = this.stores.get(columnFamilyName); - // Do increment for this store; be sure to 'sort' the increments first so increments - // match order in which we get back current Cells when we get. - List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName, - sort(increments, store.getComparator()), now, - MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, - IsolationLevel.READ_UNCOMMITTED); - if (!results.isEmpty()) { - forMemStore.put(store, results); - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) walEdits = new WALEdit(); - walEdits.getCells().addAll(results); - } - } - } - - // Actually write to WAL now. If walEdits is non-empty, we write the WAL. - if (walEdits != null && !walEdits.isEmpty()) { - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, - getMVCC()); - txid = - this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); - } else { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = appendEmptyEdit(this.wal); - } - - if (txid != 0) syncOrDefer(txid, effectiveDurability); - - // Now write to memstore. - for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { - Store store = entry.getKey(); - List<Cell> results = entry.getValue(); - if (store.getFamily().getMaxVersions() == 1) { - // Upsert if VERSIONS for this CF == 1. Use write sequence id rather than read point - // when doing fast increment. - accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); - } else { - // Otherwise keep older versions around - for (Cell cell: results) { - accumulatedResultSize += store.add(cell); - } - } - } - - // Tell mvcc this write is complete. - this.mvcc.complete(walKey.getWriteEntry()); - walKey = null; - } finally { - this.updatesLock.readLock().unlock(); - } - } finally { - // walKey is not null if above processing failed... cleanup the mvcc transaction. - if (walKey != null) this.mvcc.complete(walKey.getWriteEntry()); - rowLock.release(); - } - // Request a cache flush. Do it outside update lock. - if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); - return increment.isReturnResults() ? Result.create(allKVs) : null; - } - - private Result slowButConsistentIncrement(Increment increment, long nonceGroup, long nonce) - throws IOException { RowLock rowLock = null; WALKey walKey = null; + MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean doRollBackMemstore = false; - long accumulatedResultSize = 0; - List<Cell> allKVs = new ArrayList<Cell>(increment.size()); - List<Cell> memstoreCells = new ArrayList<Cell>(); - Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + TimeRange tr = mutation.getTimeRange(); try { - rowLock = getRowLock(increment.getRow()); - long txid = 0; + rowLock = getRowLock(row); + assert rowLock != null; try { lock(this.updatesLock.readLock()); try { - // Wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest increment) - this.mvcc.await(); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.await(); if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); - if (r != null) return r; + Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); + if (r != null) { + return r; + } } long now = EnvironmentEdgeManager.currentTime(); - final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; - WALEdit walEdits = null; - // Process increments a Store/family at a time. - // Accumulate edits for memstore to add later after we've added to WAL. - Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>(); - for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { - byte [] columnFamilyName = entry.getKey(); - List<Cell> increments = entry.getValue(); - Store store = this.stores.get(columnFamilyName); - // Do increment for this store; be sure to 'sort' the increments first so increments - // match order in which we get back current Cells when we get. - List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName, - sort(increments, store.getComparator()), now, - MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, null); - if (!results.isEmpty()) { - forMemStore.put(store, results); - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) walEdits = new WALEdit(); - walEdits.getCells().addAll(results); + // Process each family + for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) { + Store store = stores.get(family.getKey()); + List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); + + List<Cell> results = doGet(store, row, family, tr); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the increment amount + + // Avoid as much copying as possible. We may need to rewrite and + // consolidate tags. Bytes are only copied once. + // Would be nice if KeyValue had scatter/gather logic + int idx = 0; + // HERE WE DIVERGE FROM APPEND + List<Cell> edits = family.getValue(); + for (int i = 0; i < edits.size(); i++) { + Cell cell = edits.get(i); + long amount = Bytes.toLong(CellUtil.cloneValue(cell)); + boolean noWriteBack = (amount == 0); + + List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>()); + + Cell c = null; + long ts = now; + if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { + c = results.get(idx); + ts = Math.max(now, c.getTimestamp()); + if(c.getValueLength() == Bytes.SIZEOF_LONG) { + amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); + } else { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new org.apache.hadoop.hbase.DoNotRetryIOException( + "Attempted to increment field that isn't 64 bits wide"); + } + // Carry tags forward from previous version + newTags = carryForwardTags(c, newTags); + if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { + idx++; + } } + + // Append new incremented KeyValue to list + byte[] q = CellUtil.cloneQualifier(cell); + byte[] val = Bytes.toBytes(amount); + + // Add the TTL tag if the mutation carried one + if (mutation.getTTL() != Long.MAX_VALUE) { + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); + } + + Cell newKV = new KeyValue(row, 0, row.length, + family.getKey(), 0, family.getKey().length, + q, 0, q.length, + ts, + KeyValue.Type.Put, + val, 0, val.length, + newTags); + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newKV = coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, mutation, c, newKV); + } + allKVs.add(newKV); + + if (!noWriteBack) { + kvs.add(newKV); + + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } + } + } + + //store the kvs to the temporary memstore before writing WAL + if (!kvs.isEmpty()) { + tempMemstore.put(store, kvs); } } - // Actually write to WAL now. If walEdits is non-empty, we write the WAL. + + // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, - getMVCC()); - txid = - this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); - } else { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), + WALKey.NO_SEQUENCE_ID, + nonceGroup, + nonce, + mvcc); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, true); + } else { + recordMutationWithoutWal(mutation.getFamilyCellMap()); + } + } + if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned walKey = this.appendEmptyEdit(this.wal); } - // Now write to memstore, a family at a time. - for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { - Store store = entry.getKey(); - List<Cell> results = entry.getValue(); - if (store.getFamily().getMaxVersions() == 1) { - // Upsert if VERSIONS for this CF == 1 - accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); - // TODO: St.Ack 20151222 Why no rollback in this case? - } else { - // Otherwise keep older versions around - for (Cell cell: results) { - accumulatedResultSize += store.add(cell); - doRollBackMemstore = true; + // now start my own transaction + writeEntry = walKey.getWriteEntry(); + + // Actually write to Memstore now + if (!tempMemstore.isEmpty()) { + for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + // Is this right? It immediately becomes visible? St.Ack 20150907 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (Cell cell : entry.getValue()) { + CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); + size += store.add(cell); + doRollBackMemstore = true; + } } } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7572,165 +7539,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock = null; } // sync the transaction log outside the rowlock - if(txid != 0) { + if(txid != 0){ syncOrDefer(txid, durability); } - mvcc.completeAndWait(walKey.getWriteEntry()); - walKey = null; doRollBackMemstore = false; } finally { if (rowLock != null) { rowLock.release(); } // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) rollbackMemstore(memstoreCells); - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + if (doRollBackMemstore) { + for(List<Cell> cells: tempMemstore.values()) { + rollbackMemstore(cells); + } + if (writeEntry != null) mvcc.complete(writeEntry); + } else if (writeEntry != null) { + mvcc.completeAndWait(writeEntry); + } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { this.metricsRegion.updateIncrement(); } } - // Request a cache flush. Do it outside update lock. - if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); - return increment.isReturnResults() ? Result.create(allKVs) : null; - } - - /** - * @return Sorted list of <code>cells</code> using <code>comparator</code> - */ - private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) { - Collections.sort(cells, comparator); - return cells; - } - - /** - * Apply increments to a column family. - * @param sortedIncrements The passed in increments to apply MUST be sorted so that they match - * the order that they appear in the Get results (get results will be sorted on return). - * Otherwise, we won't be able to find the existing values if the cells are not specified in - * order by the client since cells are in an array list. - * @islation Isolation level to use when running the 'get'. Pass null for default. - * @return Resulting increments after <code>sortedIncrements</code> have been applied to current - * values (if any -- else passed increment is the final result). - * @throws IOException - */ - private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName, - List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs, - final IsolationLevel isolation) - throws IOException { - List<Cell> results = new ArrayList<Cell>(sortedIncrements.size()); - byte [] row = increment.getRow(); - // Get previous values for all columns in this family - List<Cell> currentValues = - getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation); - // Iterate the input columns and update existing values if they were found, otherwise - // add new column initialized to the increment amount - int idx = 0; - for (int i = 0; i < sortedIncrements.size(); i++) { - Cell inc = sortedIncrements.get(i); - long incrementAmount = getLongValue(inc); - // If increment amount == 0, then don't write this Increment to the WAL. - boolean writeBack = (incrementAmount != 0); - // Carry forward any tags that might have been added by a coprocessor. - List<Tag> tags = Tag.carryForwardTags(inc); - - Cell currentValue = null; - long ts = now; - if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) { - currentValue = currentValues.get(idx); - ts = Math.max(now, currentValue.getTimestamp()); - incrementAmount += getLongValue(currentValue); - // Carry forward all tags - tags = Tag.carryForwardTags(tags, currentValue); - if (i < (sortedIncrements.size() - 1) && - !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++; - } - - // Append new incremented KeyValue to list - byte [] qualifier = CellUtil.cloneQualifier(inc); - byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount); - tags = carryForwardTTLTag(tags, increment); - - Cell newValue = new KeyValue(row, 0, row.length, - columnFamilyName, 0, columnFamilyName.length, - qualifier, 0, qualifier.length, - ts, KeyValue.Type.Put, - incrementAmountInBytes, 0, incrementAmountInBytes.length, - tags); - - // Don't set an mvcc if none specified. The mvcc may be assigned later in case where we - // write the memstore AFTER we sync our edit to the log. - if (mvccNum != MultiVersionConcurrencyControl.NO_WRITE_NUMBER) { - CellUtil.setSequenceId(newValue, mvccNum); - } - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newValue = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue); - } - allKVs.add(newValue); - if (writeBack) { - results.add(newValue); - } - } - return results; - } - - /** - * @return Get the long out of the passed in Cell - * @throws DoNotRetryIOException - */ - private static long getLongValue(final Cell cell) throws DoNotRetryIOException { - int len = cell.getValueLength(); - if (len != Bytes.SIZEOF_LONG) { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); + if (flush) { + // Request a cache flush. Do it outside update lock. + requestFlush(); } - return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); - } - - /** - * Do a specific Get on passed <code>columnFamily</code> and column qualifiers - * from <code>incrementCoordinates</code> only. - * @param increment - * @param columnFamily - * @param incrementCoordinates - * @return Return the Cells to Increment - * @throws IOException - */ - private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily, - final List<Cell> increments, final IsolationLevel isolation) - throws IOException { - Get get = new Get(increment.getRow()); - if (isolation != null) get.setIsolationLevel(isolation); - for (Cell cell: increments) { - get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); - } - TimeRange tr = increment.getTimeRange(); - get.setTimeRange(tr.getMin(), tr.getMax()); - return get(get, false); - } - - private static List<Tag> carryForwardTTLTag(final Mutation mutation) { - return carryForwardTTLTag(null, mutation); - } - - /** - * @return Carry forward the TTL tag if the increment is carrying one - */ - private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull, - final Mutation mutation) { - long ttl = mutation.getTTL(); - if (ttl == Long.MAX_VALUE) return tagsOrNull; - List<Tag> tags = tagsOrNull; - // If we are making the array in here, given we are the last thing checked, we'll be only thing - // in the array so set its size to '1' (I saw this being done in earlier version of - // tag-handling). - if (tags == null) tags = new ArrayList<Tag>(1); - tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); - return tags; + return mutation.isReturnResults() ? Result.create(allKVs) : null; } // http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index da9c57a..eba99e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.util.ClassSize; @InterfaceAudience.Private public class MultiVersionConcurrencyControl { private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class); - static final long NO_WRITE_NUMBER = 0; final AtomicLong readPoint = new AtomicLong(0); final AtomicLong writePoint = new AtomicLong(0); @@ -156,7 +155,7 @@ public class MultiVersionConcurrencyControl { * changes completely) so we can clean up the outstanding transaction. * * How much is the read point advanced? - * + * * Let S be the set of all write numbers that are completed. Set the read point to the highest * numbered write of S. * @@ -280,4 +279,4 @@ public class MultiVersionConcurrencyControl { ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index e189a30..9ae72e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -281,6 +281,8 @@ public class FSHLog implements WAL { private final int slowSyncNs; + private final static Object [] NO_ARGS = new Object []{}; + // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -507,16 +509,16 @@ public class FSHLog implements WAL { FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); this.logrollsize = (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - + float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, - conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, + conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if(maxLogsDefined){ LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); } - this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", - Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", + Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); this.lowReplicationRollLimit = @@ -571,7 +573,7 @@ public class FSHLog implements WAL { int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); return maxLogs; } - + /** * Get the backing files associated with this WAL. * @return may be null if there are no files. @@ -1084,6 +1086,8 @@ public class FSHLog implements WAL { long sequence = this.disruptor.getRingBuffer().next(); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); + // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the + // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); truck.loadPayload(entry, scope.detach()); http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java deleted file mode 100644 index bf3a44f..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -// import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.Timer; -import com.yammer.metrics.core.TimerContext; -import com.yammer.metrics.stats.Snapshot; - -/** - * Simple Increments Performance Test. Run this from main. It is to go against a cluster. - * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181, - * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by - * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as - * in -DtableName="newTableName". It prints out configuration it is running with at the start and - * on the end it prints out percentiles. - */ -public class IncrementPerformanceTest implements Tool { - private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class); - private static final byte [] QUALIFIER = new byte [] {'q'}; - private Configuration conf; - private final MetricName metricName = new MetricName(this.getClass(), "increment"); - private static final String TABLENAME = "tableName"; - private static final String COLUMN_FAMILY = "columnFamilyName"; - private static final String THREAD_COUNT = "threadCount"; - private static final int DEFAULT_THREAD_COUNT = 80; - private static final String INCREMENT_COUNT = "incrementCount"; - private static final int DEFAULT_INCREMENT_COUNT = 10000; - - IncrementPerformanceTest() {} - - public int run(final String [] args) throws Exception { - Configuration conf = getConf(); - final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME); - final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY)); - int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT); - final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT); - LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" + - getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName + - ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount + - ", incrementCount=" + incrementCount); - - ExecutorService service = Executors.newFixedThreadPool(threadCount); - Set<Future<?>> futures = new HashSet<Future<?>>(); - final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter - while (integer.incrementAndGet() <= threadCount) { - futures.add(service.submit(new Runnable() { - @Override - public void run() { - HTable table; - try { - // ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(TABLE_NAME)); - table = new HTable(getConf(), tableName.getName()); - } catch (Exception e) { - throw new RuntimeException(e); - } - Timer timer = Metrics.newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); - for (int i = 0; i < incrementCount; i++) { - byte[] row = Bytes.toBytes(i); - TimerContext context = timer.time(); - try { - table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l); - } catch (IOException e) { - // swallow..it's a test. - } finally { - context.stop(); - } - } - } - })); - } - - for(Future<?> future : futures) future.get(); - service.shutdown(); - Snapshot s = Metrics.newTimer(this.metricName, - TimeUnit.MILLISECONDS, TimeUnit.SECONDS).getSnapshot(); - LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(), - s.get95thPercentile(), s.get99thPercentile())); - return 0; - } - - @Override - public Configuration getConf() { - return this.conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) throws Exception { - System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 28c354f..81253a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -51,6 +51,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -3138,7 +3139,7 @@ public class TestFromClientSide { equals(value, CellUtil.cloneValue(key))); } - static void assertIncrementKey(Cell key, byte [] row, byte [] family, + private void assertIncrementKey(Cell key, byte [] row, byte [] family, byte [] qualifier, long value) throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + @@ -3362,7 +3363,7 @@ public class TestFromClientSide { return stamps; } - static boolean equals(byte [] left, byte [] right) { + private boolean equals(byte [] left, byte [] right) { if (left == null && right == null) return true; if (left == null && right.length == 0) return true; if (right == null && left.length == 0) return true; @@ -4482,6 +4483,264 @@ public class TestFromClientSide { } @Test + public void testIncrementWithDeletes() throws Exception { + LOG.info("Starting testIncrementWithDeletes"); + final TableName TABLENAME = + TableName.valueOf("testIncrementWithDeletes"); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + TEST_UTIL.flush(TABLENAME); + + Delete del = new Delete(ROW); + ht.delete(del); + + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + + Get get = new Get(ROW); + Result r = ht.get(get); + assertEquals(1, r.size()); + assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); + } + + @Test + public void testIncrementingInvalidValue() throws Exception { + LOG.info("Starting testIncrementingInvalidValue"); + final TableName TABLENAME = TableName.valueOf("testIncrementingInvalidValue"); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + Put p = new Put(ROW); + // write an integer here (not a Long) + p.add(FAMILY, COLUMN, Bytes.toBytes(5)); + ht.put(p); + try { + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + fail("Should have thrown DoNotRetryIOException"); + } catch (DoNotRetryIOException iox) { + // success + } + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, COLUMN, 5); + try { + ht.increment(inc); + fail("Should have thrown DoNotRetryIOException"); + } catch (DoNotRetryIOException iox) { + // success + } + } + + @Test + public void testIncrementInvalidArguments() throws Exception { + LOG.info("Starting testIncrementInvalidArguments"); + final TableName TABLENAME = TableName.valueOf("testIncrementInvalidArguments"); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + try { + // try null row + ht.incrementColumnValue(null, FAMILY, COLUMN, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + try { + // try null family + ht.incrementColumnValue(ROW, null, COLUMN, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + try { + // try null qualifier + ht.incrementColumnValue(ROW, FAMILY, null, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + // try null row + try { + Increment incNoRow = new Increment((byte [])null); + incNoRow.addColumn(FAMILY, COLUMN, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } catch (NullPointerException npe) { + // success + } + // try null family + try { + Increment incNoFamily = new Increment(ROW); + incNoFamily.addColumn(null, COLUMN, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } + // try null qualifier + try { + Increment incNoQualifier = new Increment(ROW); + incNoQualifier.addColumn(FAMILY, null, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } + } + + @Test + public void testIncrementOutOfOrder() throws Exception { + LOG.info("Starting testIncrementOutOfOrder"); + final TableName TABLENAME = TableName.valueOf("testIncrementOutOfOrder"); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") + }; + + Increment inc = new Increment(ROW); + for (int i=0; i<QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify expected results + Result r = ht.get(new Get(ROW)); + Cell [] kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); + + // Now try multiple columns again + inc = new Increment(ROW); + for (int i=0; i<QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify + r = ht.get(new Get(ROW)); + kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); + } + + @Test + public void testIncrementOnSameColumn() throws Exception { + LOG.info("Starting testIncrementOnSameColumn"); + final byte[] TABLENAME = Bytes.toBytes("testIncrementOnSameColumn"); + HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte[][] QUALIFIERS = + new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; + + Increment inc = new Increment(ROW); + for (int i = 0; i < QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify expected results + Result r = ht.get(new Get(ROW)); + Cell[] kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); + + // Now try multiple columns again + inc = new Increment(ROW); + for (int i = 0; i < QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify + r = ht.get(new Get(ROW)); + kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); + + ht.close(); + } + + @Test + public void testIncrement() throws Exception { + LOG.info("Starting testIncrement"); + final TableName TABLENAME = TableName.valueOf("testIncrement"); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte [][] ROWS = new byte [][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), + Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), + Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") + }; + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), + Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), + Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") + }; + + // Do some simple single-column increments + + // First with old API + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1); + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2); + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3); + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4); + + // Now increment things incremented with old and do some new + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIERS[1], 1); + inc.addColumn(FAMILY, QUALIFIERS[3], 1); + inc.addColumn(FAMILY, QUALIFIERS[4], 1); + ht.increment(inc); + + // Verify expected results + Result r = ht.get(new Get(ROW)); + Cell [] kvs = r.rawCells(); + assertEquals(5, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3); + assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5); + assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1); + + // Now try multiple columns by different amounts + inc = new Increment(ROWS[0]); + for (int i=0;i<QUALIFIERS.length;i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], i+1); + } + ht.increment(inc); + // Verify + r = ht.get(new Get(ROWS[0])); + kvs = r.rawCells(); + assertEquals(QUALIFIERS.length, kvs.length); + for (int i=0;i<QUALIFIERS.length;i++) { + assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1); + } + + // Re-increment them + inc = new Increment(ROWS[0]); + for (int i=0;i<QUALIFIERS.length;i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], i+1); + } + ht.increment(inc); + // Verify + r = ht.get(new Get(ROWS[0])); + kvs = r.rawCells(); + assertEquals(QUALIFIERS.length, kvs.length); + for (int i=0;i<QUALIFIERS.length;i++) { + assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); + } + } + + + @Test public void testClientPoolRoundRobin() throws IOException { final TableName tableName = TableName.valueOf("testClientPoolRoundRobin"); http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 09c7e86..a0a8747 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.util.ArrayList; @@ -31,14 +32,14 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java index 66fb69c..ae96849 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java @@ -99,4 +99,4 @@ public class TestFromClientSideNoCodec { String codec = AbstractRpcClient.getDefaultCodec(c); assertTrue(codec == null || codec.length() == 0); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java index cd2409e..2671af7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java @@ -27,7 +27,7 @@ import org.junit.experimental.categories.Category; /** * Test all client operations with a coprocessor that - * just implements the default flush/compact/scan policy. + * just implements the default flush/compact/scan policy */ @Category(LargeTests.class) public class TestFromClientSideWithCoprocessor extends TestFromClientSide { http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java deleted file mode 100644 index a67cc45..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; -import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.junit.Before; -import org.junit.experimental.categories.Category; - -/** - * Test all {@link Increment} client operations with a coprocessor that - * just implements the default flush/compact/scan policy. - * - * This test takes a long time. The test it derives from is parameterized so we run through both - * options of the test. - */ -@Category(LargeTests.class) -public class TestIncrementFromClientSideWithCoprocessor extends TestIncrementsFromClientSide { - public TestIncrementFromClientSideWithCoprocessor(final boolean fast) { - super(fast); - } - - @Before - public void before() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName()); - conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests - super.before(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java deleted file mode 100644 index 54a54a0..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java +++ /dev/null @@ -1,433 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -/** - * Run Increment tests that use the HBase clients; {@link HTable}. - * - * Test is parameterized to run the slow and fast increment code paths. If fast, in the @before, we - * do a rolling restart of the single regionserver so that it can pick up the go fast configuration. - * Doing it this way should be faster than starting/stopping a cluster per test. - * - * Test takes a long time because spin up a cluster between each run -- ugh. - */ -@RunWith(Parameterized.class) -@Category(LargeTests.class) -@SuppressWarnings ("deprecation") -public class TestIncrementsFromClientSide { - final Log LOG = LogFactory.getLog(getClass()); - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte [] ROW = Bytes.toBytes("testRow"); - private static byte [] FAMILY = Bytes.toBytes("testFamily"); - // This test depends on there being only one slave running at at a time. See the @Before - // method where we do rolling restart. - protected static int SLAVES = 1; - private String oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY; - @Rule public TestName name = new TestName(); - @Parameters(name = "fast={0}") - public static Collection<Object []> data() { - return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); - } - private final boolean fast; - - public TestIncrementsFromClientSide(final boolean fast) { - this.fast = fast; - } - - @BeforeClass - public static void beforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - MultiRowMutationEndpoint.class.getName()); - conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests - // We need more than one region server in this test - TEST_UTIL.startMiniCluster(SLAVES); - } - - @Before - public void before() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - if (this.fast) { - // If fast is set, set our configuration and then do a rolling restart of the one - // regionserver so it picks up the new config. Doing this should be faster than starting - // and stopping a cluster for each test. - this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = - conf.get(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); - conf.setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); - HRegionServer rs = - TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer(); - TEST_UTIL.getHBaseCluster().startRegionServer(); - rs.stop("Restart"); - while(!rs.isStopped()) { - Threads.sleep(100); - LOG.info("Restarting " + rs); - } - TEST_UTIL.waitUntilNoRegionsInTransition(10000); - } - } - - @After - public void after() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - if (this.fast) { - if (this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY != null) { - conf.set(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, - this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); - } - } - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void afterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testIncrementWithDeletes() throws Exception { - LOG.info("Starting " + this.name.getMethodName()); - final TableName TABLENAME = - TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - TEST_UTIL.flush(TABLENAME); - - Delete del = new Delete(ROW); - ht.delete(del); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - - Get get = new Get(ROW); - if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - Result r = ht.get(get); - assertEquals(1, r.size()); - assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); - } - - @Test - public void testIncrementingInvalidValue() throws Exception { - LOG.info("Starting " + this.name.getMethodName()); - final TableName TABLENAME = - TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - Put p = new Put(ROW); - // write an integer here (not a Long) - p.add(FAMILY, COLUMN, Bytes.toBytes(5)); - ht.put(p); - try { - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, COLUMN, 5); - try { - ht.increment(inc); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - } - - @Test - public void testIncrementInvalidArguments() throws Exception { - LOG.info("Starting " + this.name.getMethodName()); - final TableName TABLENAME = - TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - try { - // try null row - ht.incrementColumnValue(null, FAMILY, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null family - ht.incrementColumnValue(ROW, null, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null qualifier - ht.incrementColumnValue(ROW, FAMILY, null, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - // try null row - try { - Increment incNoRow = new Increment((byte [])null); - incNoRow.addColumn(FAMILY, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } catch (NullPointerException npe) { - // success - } - // try null family - try { - Increment incNoFamily = new Increment(ROW); - incNoFamily.addColumn(null, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - // try null qualifier - try { - Increment incNoQualifier = new Increment(ROW); - incNoQualifier.addColumn(FAMILY, null, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - } - - @Test - public void testIncrementOutOfOrder() throws Exception { - LOG.info("Starting " + this.name.getMethodName()); - final TableName TABLENAME = - TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") - }; - - Increment inc = new Increment(ROW); - for (int i=0; i<QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify expected results - Get get = new Get(ROW); - if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - Result r = ht.get(get); - Cell [] kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); - - // Now try multiple columns again - inc = new Increment(ROW); - for (int i=0; i<QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify - r = ht.get(get); - kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); - } - - @Test - public void testIncrementOnSameColumn() throws Exception { - LOG.info("Starting " + this.name.getMethodName()); - final byte[] TABLENAME = Bytes.toBytes(filterStringSoTableNameSafe(this.name.getMethodName())); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte[][] QUALIFIERS = - new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; - - Increment inc = new Increment(ROW); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify expected results - Get get = new Get(ROW); - if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - Result r = ht.get(get); - Cell[] kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); - - // Now try multiple columns again - inc = new Increment(ROW); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify - r = ht.get(get); - kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); - - ht.close(); - } - - @Test - public void testIncrement() throws Exception { - LOG.info("Starting " + this.name.getMethodName()); - final TableName TABLENAME = - TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] ROWS = new byte [][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), - Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), - Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") - }; - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), - Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), - Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") - }; - - // Do some simple single-column increments - - // First with old API - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4); - - // Now increment things incremented with old and do some new - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, QUALIFIERS[1], 1); - inc.addColumn(FAMILY, QUALIFIERS[3], 1); - inc.addColumn(FAMILY, QUALIFIERS[4], 1); - ht.increment(inc); - - // Verify expected results - Get get = new Get(ROW); - if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - Result r = ht.get(get); - Cell [] kvs = r.rawCells(); - assertEquals(5, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3); - assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5); - assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1); - - // Now try multiple columns by different amounts - inc = new Increment(ROWS[0]); - for (int i=0;i<QUALIFIERS.length;i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], i+1); - } - ht.increment(inc); - // Verify - get = new Get(ROWS[0]); - if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - r = ht.get(get); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i=0;i<QUALIFIERS.length;i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1); - } - - // Re-increment them - inc = new Increment(ROWS[0]); - for (int i=0;i<QUALIFIERS.length;i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], i+1); - } - ht.increment(inc); - // Verify - r = ht.get(get); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i=0;i<QUALIFIERS.length;i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); - } - - // Verify that an Increment of an amount of zero, returns current count; i.e. same as for above - // test, that is: 2 * (i + 1). - inc = new Increment(ROWS[0]); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 0); - } - ht.increment(inc); - r = ht.get(get); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i = 0; i < QUALIFIERS.length; i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); - } - } - - - /** - * Call over to the adjacent class's method of same name. - */ - static void assertIncrementKey(Cell key, byte [] row, byte [] family, - byte [] qualifier, long value) throws Exception { - TestFromClientSide.assertIncrementKey(key, row, family, qualifier, value); - } - - public static String filterStringSoTableNameSafe(final String str) { - return str.replaceAll("\\[fast\\=(.*)\\]", ".FAST.is.$1"); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/492db89d/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index ab53e3e..8e451cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -235,8 +237,7 @@ public abstract class TestTableInputFormatScanBase { ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); job.setReducerClass(ScanReducer.class); job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, - new Path(TEST_UTIL.getDataTestDir(), job.getJobName())); + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); LOG.info("Started " + job.getJobName()); assertTrue(job.waitForCompletion(true)); LOG.info("After map/reduce completion - job " + jobName);
