HBASE-15091 Forward-port to 1.2 HBASE-15031 "Fix merge of MVCC and SequenceID performance regression in branch-1.0"
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a9c00834 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a9c00834 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a9c00834 Branch: refs/heads/branch-1 Commit: a9c008344f7555322c96f193519eefcca308dd7c Parents: d965d14 Author: stack <[email protected]> Authored: Wed Jan 20 11:15:30 2016 -0800 Committer: stack <[email protected]> Committed: Thu Jan 21 10:26:46 2016 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HColumnDescriptor.java | 14 +- .../hadoop/hbase/client/AsyncProcess.java | 18 +- .../hbase/client/ReversedScannerCallable.java | 5 +- .../org/apache/hadoop/hbase/client/Scan.java | 2 + .../hadoop/hbase/client/TestIncrement.java | 2 +- .../org/apache/hadoop/hbase/HConstants.java | 6 - .../java/org/apache/hadoop/hbase/KeyValue.java | 2 - .../main/java/org/apache/hadoop/hbase/Tag.java | 29 +- .../hadoop/hbase/io/compress/Compression.java | 4 +- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 6 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 3 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 6 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 18 +- .../hadoop/hbase/regionserver/HRegion.java | 672 ++++++++++++------- .../hbase/regionserver/KeyValueScanner.java | 7 + .../MultiVersionConcurrencyControl.java | 5 +- .../hadoop/hbase/regionserver/StoreScanner.java | 4 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 13 +- .../token/AuthenticationTokenSecretManager.java | 4 +- .../hadoop/hbase/IncrementPerformanceTest.java | 129 ++++ .../hadoop/hbase/client/TestFromClientSide.java | 263 +------- .../hbase/client/TestFromClientSide3.java | 5 +- .../hbase/client/TestFromClientSideNoCodec.java | 2 +- .../TestFromClientSideWithCoprocessor.java | 2 +- ...tIncrementFromClientSideWithCoprocessor.java | 49 ++ .../client/TestIncrementsFromClientSide.java | 433 ++++++++++++ .../mapreduce/TestTableInputFormatScanBase.java | 5 +- .../hbase/regionserver/TestAtomicOperation.java | 61 +- .../hbase/regionserver/TestRegionIncrement.java | 253 +++++++ .../hadoop/hbase/regionserver/TestTags.java | 2 +- 30 files changed, 1440 insertions(+), 584 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 480aff9..17ca37a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -881,7 +881,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> */ public int getTimeToLive() { String value = getValue(TTL); - return (value != null)? Integer.valueOf(value).intValue(): DEFAULT_TTL; + return (value != null)? Integer.parseInt(value): DEFAULT_TTL; } /** @@ -897,7 +897,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> */ public int getMinVersions() { String value = getValue(MIN_VERSIONS); - return (value != null)? Integer.valueOf(value).intValue(): 0; + return (value != null)? Integer.parseInt(value): 0; } /** @@ -916,7 +916,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> public boolean isBlockCacheEnabled() { String value = getValue(BLOCKCACHE); if (value != null) - return Boolean.valueOf(value).booleanValue(); + return Boolean.parseBoolean(value); return DEFAULT_BLOCKCACHE; } @@ -954,7 +954,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> public int getScope() { byte[] value = getValue(REPLICATION_SCOPE_BYTES); if (value != null) { - return Integer.valueOf(Bytes.toString(value)); + return Integer.parseInt(Bytes.toString(value)); } return DEFAULT_REPLICATION_SCOPE; } @@ -1024,7 +1024,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> private boolean setAndGetBoolean(final String key, final boolean defaultSetting) { String value = getValue(key); - if (value != null) return Boolean.valueOf(value).booleanValue(); + if (value != null) return Boolean.parseBoolean(value); return defaultSetting; } @@ -1343,7 +1343,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> } String value = getValue(HConstants.VERSIONS); this.cachedMaxVersions = (value != null)? - Integer.valueOf(value).intValue(): DEFAULT_VERSIONS; + Integer.parseInt(value): DEFAULT_VERSIONS; if (version > 10) { configuration.clear(); int numConfigs = in.readInt(); @@ -1542,7 +1542,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor> */ public short getDFSReplication() { String rf = getValue(DFS_REPLICATION); - return rf == null ? DEFAULT_DFS_REPLICATION : Short.valueOf(rf); + return rf == null ? DEFAULT_DFS_REPLICATION : Short.parseShort(rf); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 0d093b1..e895a13 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1581,23 +1581,27 @@ class AsyncProcess { } private String buildDetailedErrorMsg(String string, int index) { - String error = string + "; called for " + index + - ", actionsInProgress " + actionsInProgress.get() + "; replica gets: "; + StringBuilder error = new StringBuilder(string); + error.append("; called for "). + append(index). + append(", actionsInProgress "). + append(actionsInProgress.get()). + append("; replica gets: "); if (replicaGetIndices != null) { for (int i = 0; i < replicaGetIndices.length; ++i) { - error += replicaGetIndices[i] + ", "; + error.append(replicaGetIndices[i]).append(", "); } } else { - error += (hasAnyReplicaGets ? "all" : "none"); + error.append(hasAnyReplicaGets ? "all" : "none"); } - error += "; results "; + error.append("; results "); if (results != null) { for (int i = 0; i < results.length; ++i) { Object o = results[i]; - error += ((o == null) ? "null" : o.toString()) + ", "; + error.append(((o == null) ? "null" : o.toString())).append(", "); } } - return error; + return error.toString(); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index f400e83..e169f7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -148,6 +148,8 @@ public class ReversedScannerCallable extends ScannerCallable { * the specified range * @throws IOException */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification="I thought I'd fixed it but FB still complains; see below") private List<HRegionLocation> locateRegionsInRange(byte[] startKey, byte[] endKey, boolean reload) throws IOException { final boolean endKeyIsEndOfTable = Bytes.equals(endKey, @@ -166,9 +168,10 @@ public class ReversedScannerCallable extends ScannerCallable { if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) { regionList.add(regionLocation); } else { + // FindBugs: NP_NULL_ON_SOME_PATH Complaining about regionLocation throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) + " returns incorrect region " - + regionLocation.getRegionInfo()); + + (regionLocation != null? regionLocation.getRegionInfo(): null)); } currentKey = regionLocation.getRegionInfo().getEndKey(); } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 4825cca..b13837d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -220,6 +220,7 @@ public class Scan extends Query { filter = scan.getFilter(); // clone? loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue(); consistency = scan.getConsistency(); + this.setIsolationLevel(scan.getIsolationLevel()); reversed = scan.isReversed(); small = scan.isSmall(); allowPartialResults = scan.getAllowPartialResults(); @@ -262,6 +263,7 @@ public class Scan extends Query { this.familyMap = get.getFamilyMap(); this.getScan = true; this.consistency = get.getConsistency(); + this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry<String, byte[]> attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index 8a2c447..39cde45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -29,7 +29,7 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestIncrement { @Test - public void test() { + public void testIncrementInstance() { final long expected = 13; Increment inc = new Increment(new byte [] {'r'}); int total = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5ae5ebf..9b4f46b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; @@ -998,11 +997,6 @@ public final class HConstants { public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop"; - /** - * The byte array represents for NO_NEXT_INDEXED_KEY; - * The actual value is irrelevant because this is always compared by reference. - */ - public static final Cell NO_NEXT_INDEXED_KEY = new KeyValue(); /** delimiter used between portions of a region name */ public static final int DELIMITER = ','; public static final String HBASE_CONFIG_READ_ZOOKEEPER_CONFIG = http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index b1464d5..aa65f44 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.util.Bytes.len; import java.io.DataInput; import java.io.DataOutput; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import com.google.common.annotations.VisibleForTesting; http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index 2e7314d..d0719f0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -180,6 +181,7 @@ public class Tag { * @return the serialized tag data as bytes */ public static byte[] fromList(List<Tag> tags) { + if (tags == null || tags.size() <= 0) return null; int length = 0; for (Tag tag: tags) { length += tag.length; @@ -226,4 +228,29 @@ public class Tag { int getOffset() { return this.offset; } -} + + + /** + * @return A List<Tag> of any Tags found in <code>cell</code> else null. + */ + public static List<Tag> carryForwardTags(final Cell cell) { + return carryForwardTags(null, cell); + } + + /** + * @return Add to <code>tagsOrNull</code> any Tags <code>cell</code> is carrying or null if + * it is carrying no Tags AND the passed in <code>tagsOrNull</code> is null (else we return new + * List<Tag> with Tags found). + */ + public static List<Tag> carryForwardTags(final List<Tag> tagsOrNull, final Cell cell) { + List<Tag> tags = tagsOrNull; + if (cell.getTagsLength() <= 0) return tags; + Iterator<Tag> itr = + CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + if (tags == null) tags = new ArrayList<Tag>(); + while (itr.hasNext()) { + tags.add(itr.next()); + } + return tags; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java index e8e58da..c6ebff5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java @@ -237,7 +237,7 @@ public final class Compression { } }; - private final Configuration conf; + private final transient Configuration conf; // FindBugs: SE_BAD_FIELD so just made it transient private final String compressName; /** data input buffer size to absorb small reads from application. */ private static final int DATA_IBUF_SIZE = 1 * 1024; @@ -431,4 +431,4 @@ public final class Compression { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 1c315e9..6dd7fa2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -32,17 +32,17 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -229,7 +229,7 @@ public class HFileBlockIndex { if (rootLevelIndex < blockKeys.length - 1) { nextIndexedKey = new KeyValue.KeyOnlyKeyValue(blockKeys[rootLevelIndex + 1]); } else { - nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY; + nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY; } int lookupLevel = 1; // How many levels deep we are in our lookup. http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index f02bc3d..12c46e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.io.WritableUtils; @@ -597,7 +598,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } else { // The comparison with no_next_index_key has to be checked if (this.nextIndexedKey != null && - (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader + (this.nextIndexedKey == KeyValueScanner.NO_NEXT_INDEXED_KEY || reader .getComparator() .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) { // The reader shall continue to scan the current data block instead http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 28c4655..c14b3d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -94,18 +94,18 @@ public class HFileWriterV2 extends AbstractHFileWriter { } @Override - public Writer createWriter(FileSystem fs, Path path, + public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream, KVComparator comparator, HFileContext context) throws IOException { context.setIncludesTags(false);// HFile V2 does not deal with tags at all! - return new HFileWriterV2(conf, cacheConf, fs, path, ostream, + return new HFileWriterV2(conf, cacheConf, fs, path, ostream, comparator, context); } } /** Constructor that takes a path, creates and closes the output stream. */ public HFileWriterV2(Configuration conf, CacheConfig cacheConf, - FileSystem fs, Path path, FSDataOutputStream ostream, + FileSystem fs, Path path, FSDataOutputStream ostream, final KVComparator comparator, final HFileContext context) throws IOException { super(cacheConf, ostream == null ? createOutputStream(conf, fs, path, null) : ostream, http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index b2d8154..3659280 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1959,11 +1959,23 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { data = null; if (!channel.isOpen()) return; - try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE + try {socket.shutdownOutput();} catch(Exception ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace(ignored); + } + } if (channel.isOpen()) { - try {channel.close();} catch(Exception ignored) {} + try {channel.close();} catch(Exception ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace(ignored); + } + } + } + try {socket.close();} catch(Exception ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace(ignored); + } } - try {socket.close();} catch(Exception ignored) {} } private UserGroupInformation createUser(ConnectionHeader head) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5ad8fa3..1c35832 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -142,6 +143,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -150,8 +152,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -215,6 +217,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; /** + * Set region to take the fast increment path. Constraint is that caller can only access the + * Cell via Increment; intermixing Increment with other Mutations will give indeterminate + * results. A Get with {@link IsolationLevel#READ_UNCOMMITTED} will get the latest increment + * or an Increment of zero will do the same. + */ + public static final String INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = + "hbase.increment.fast.but.narrow.consistency"; + private final boolean incrementFastButNarrowConsistency; + + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ @@ -745,6 +757,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + + // See #INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY for what this flag is about. + this.incrementFastButNarrowConsistency = + this.conf.getBoolean(INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false); } void setHTableSpecificConf() { @@ -3667,30 +3683,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List<Tag> newTags = new ArrayList<Tag>(); - Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } + List<Tag> newTags = Tag.carryForwardTags(null, cell); + newTags = carryForwardTTLTag(newTags, m); // Rewrite the cell with the updated set of tags - cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), @@ -7214,7 +7210,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.writeRequestsCount.increment(); RowLock rowLock = null; WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); @@ -7256,67 +7251,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Process cell tags // Make a union of the set of tags in the old and new KVs - List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>()); - newTags = carryForwardTags(cell, newTags); + List<Tag> tags = Tag.carryForwardTags(null, oldCell); + tags = Tag.carryForwardTags(tags, cell); + tags = carryForwardTTLTag(tags, mutate); - // Cell TTL handling - - if (mutate.getTTL() != Long.MAX_VALUE) { - // Add the new TTL tag - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); - } + newCell = getNewCell(row, ts, cell, oldCell, Tag.fromList(tags)); - // Rebuild tags - byte[] tagBytes = Tag.fromList(newTags); - - // allocate an empty cell once - newCell = new KeyValue(row.length, cell.getFamilyLength(), - cell.getQualifierLength(), ts, KeyValue.Type.Put, - oldCell.getValueLength() + cell.getValueLength(), - tagBytes.length); - // copy in row, family, and qualifier - System.arraycopy(cell.getRowArray(), cell.getRowOffset(), - newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); - System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), - newCell.getFamilyArray(), newCell.getFamilyOffset(), - cell.getFamilyLength()); - System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), - newCell.getQualifierArray(), newCell.getQualifierOffset(), - cell.getQualifierLength()); - // copy in the value - System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(), - newCell.getValueArray(), newCell.getValueOffset(), - oldCell.getValueLength()); - System.arraycopy(cell.getValueArray(), cell.getValueOffset(), - newCell.getValueArray(), - newCell.getValueOffset() + oldCell.getValueLength(), - cell.getValueLength()); - // Copy in tag data - System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), - tagBytes.length); idx++; } else { // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP CellUtil.updateLatestStamp(cell, now); // Cell TTL handling - - if (mutate.getTTL() != Long.MAX_VALUE) { - List<Tag> newTags = new ArrayList<Tag>(1); - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); - // Add the new TTL tag - newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), - cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), - cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), - cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), - cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - newTags); - } else { - newCell = cell; - } + newCell = getNewCell(mutate, cell); } // Give coprocessors a chance to update the new cell @@ -7363,10 +7310,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendEmptyEdit(this.wal); } - - // now start my own transaction - writeEntry = walKey.getWriteEntry(); - + // Do a get on the write entry... this will block until sequenceid is assigned... w/o it, + // TestAtomicOperation fails. + walKey.getWriteEntry(); // Actually write to Memstore now if (!tempMemstore.isEmpty()) { @@ -7379,7 +7325,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); + // This stamping of sequenceid seems redundant; it is happening down in + // FSHLog when we consume edits off the ring buffer. + CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); size += store.add(cell); doRollBackMemstore = true; } @@ -7410,11 +7358,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock.release(); } // if the wal sync was unsuccessful, remove keys from memstore + WriteEntry we = walKey != null? walKey.getWriteEntry(): null; if (doRollBackMemstore) { rollbackMemstore(allKVs); - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); + if (we != null) mvcc.complete(we); + } else if (we != null) { + mvcc.completeAndWait(we); } closeRegionOperation(op); @@ -7432,6 +7381,57 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return mutate.isReturnResults() ? Result.create(allKVs) : null; } + private static Cell getNewCell(final byte [] row, final long ts, final Cell cell, + final Cell oldCell, final byte [] tagBytes) { + // allocate an empty cell once + Cell newCell = new KeyValue(row.length, cell.getFamilyLength(), + cell.getQualifierLength(), ts, KeyValue.Type.Put, + oldCell.getValueLength() + cell.getValueLength(), + tagBytes == null? 0: tagBytes.length); + // copy in row, family, and qualifier + System.arraycopy(cell.getRowArray(), cell.getRowOffset(), + newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); + System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), + newCell.getFamilyArray(), newCell.getFamilyOffset(), + cell.getFamilyLength()); + System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), + newCell.getQualifierArray(), newCell.getQualifierOffset(), + cell.getQualifierLength()); + // copy in the value + System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(), + newCell.getValueArray(), newCell.getValueOffset(), + oldCell.getValueLength()); + System.arraycopy(cell.getValueArray(), cell.getValueOffset(), + newCell.getValueArray(), + newCell.getValueOffset() + oldCell.getValueLength(), + cell.getValueLength()); + // Copy in tag data + if (tagBytes != null) { + System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), + tagBytes.length); + } + return newCell; + } + + private static Cell getNewCell(final Mutation mutate, final Cell cell) { + Cell newCell = null; + if (mutate.getTTL() != Long.MAX_VALUE) { + // Add the new TTL tag + newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), + carryForwardTTLTag(mutate)); + } else { + newCell = cell; + } + return newCell; + } + public Result increment(Increment increment) throws IOException { return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -7448,175 +7448,226 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException { Operation op = Operation.INCREMENT; - byte [] row = mutation.getRow(); - checkRow(row, op.toString()); - checkFamilies(mutation.getFamilyCellMap().keySet()); - boolean flush = false; - Durability durability = getEffectiveDurability(mutation.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List<Cell> allKVs = new ArrayList<Cell>(mutation.size()); - - Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); - long size = 0; - long txid = 0; checkReadOnly(); checkResources(); - // Lock row + checkRow(mutation.getRow(), op.toString()); + checkFamilies(mutation.getFamilyCellMap().keySet()); startRegionOperation(op); this.writeRequestsCount.increment(); + try { + // Which Increment is it? Narrow increment-only consistency or slow (default) and general + // row-wide consistency. + + // So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is + // that the former holds the row lock until the sync completes; this allows us to reason that + // there are no other writers afoot when we read the current increment value. The row lock + // means that we do not need to wait on mvcc reads to catch up to writes before we proceed + // with the read, the root of the slowdown seen in HBASE-14460. The fast-path also does not + // wait on mvcc to complete before returning to the client. We also reorder the write so that + // the update of memstore happens AFTER sync returns; i.e. the write pipeline does less + // zigzagging now. + // + // See the comment on INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY + // for the constraints that apply when you take this code path; it is correct but only if + // Increments are used mutating an Increment Cell; mixing concurrent Put+Delete and Increment + // will yield indeterminate results. + return this.incrementFastButNarrowConsistency? + fastAndNarrowConsistencyIncrement(mutation, nonceGroup, nonce): + slowButConsistentIncrement(mutation, nonceGroup, nonce); + } finally { + if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); + closeRegionOperation(op); + } + } + + /** + * The bulk of this method is a bulk-and-paste of the slowButConsistentIncrement but with some + * reordering to enable the fast increment (reordering allows us to also drop some state + * carrying Lists and variables so the flow here is more straight-forward). We copy-and-paste + * because cannot break down the method further into smaller pieces. Too much state. Will redo + * in trunk and tip of branch-1 to undo duplication here and in append, checkAnd*, etc. For why + * this route is 'faster' than the alternative slowButConsistentIncrement path, see the comment + * in calling method. + * @return Resulting increment + * @throws IOException + */ + private Result fastAndNarrowConsistencyIncrement(Increment increment, long nonceGroup, + long nonce) + throws IOException { + long accumulatedResultSize = 0; + WALKey walKey = null; + long txid = 0; + // This is all kvs accumulated during this increment processing. Includes increments where the + // increment is zero: i.e. client just wants to get current state of the increment w/o + // changing it. These latter increments by zero are NOT added to the WAL. + List<Cell> allKVs = new ArrayList<Cell>(increment.size()); + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + RowLock rowLock = getRowLock(increment.getRow()); + try { + lock(this.updatesLock.readLock()); + try { + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; + } + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + WALEdit walEdits = null; + // Process increments a Store/family at a time. + // Accumulate edits for memstore to add later after we've added to WAL. + Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>(); + for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { + byte [] columnFamilyName = entry.getKey(); + List<Cell> increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we get. + List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName, + sort(increments, store.getComparator()), now, + MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, + IsolationLevel.READ_UNCOMMITTED); + if (!results.isEmpty()) { + forMemStore.put(store, results); + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.getCells().addAll(results); + } + } + } + + // Actually write to WAL now. If walEdits is non-empty, we write the WAL. + if (walEdits != null && !walEdits.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, + getMVCC()); + txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); + } else { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = appendEmptyEdit(this.wal); + } + // Get WriteEntry. Will wait on assign of the sequence id. I seem to need this in + // hbase-1.2... post-12751. + walKey.getWriteEntry(); + + if (txid != 0) syncOrDefer(txid, effectiveDurability); + + // Now write to memstore. + for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { + Store store = entry.getKey(); + List<Cell> results = entry.getValue(); + if (store.getFamily().getMaxVersions() == 1) { + // Upsert if VERSIONS for this CF == 1. Use write sequence id rather than read point + // when doing fast increment. + accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); + } else { + // Otherwise keep older versions around + for (Cell cell: results) { + accumulatedResultSize += store.add(cell); + } + } + } + + // Tell mvcc this write is complete. + this.mvcc.complete(walKey.getWriteEntry()); + walKey = null; + } finally { + this.updatesLock.readLock().unlock(); + } + } finally { + // walKey is not null if above processing failed... cleanup the mvcc transaction. + if (walKey != null) this.mvcc.complete(walKey.getWriteEntry()); + rowLock.release(); + } + // Request a cache flush. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); + return increment.isReturnResults() ? Result.create(allKVs) : null; + } + + private Result slowButConsistentIncrement(Increment increment, long nonceGroup, long nonce) + throws IOException { RowLock rowLock = null; WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean doRollBackMemstore = false; - TimeRange tr = mutation.getTimeRange(); + long accumulatedResultSize = 0; + List<Cell> allKVs = new ArrayList<Cell>(increment.size()); + List<Cell> memstoreCells = new ArrayList<Cell>(); + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); try { - rowLock = getRowLock(row); - assert rowLock != null; + rowLock = getRowLock(increment.getRow()); + long txid = 0; try { lock(this.updatesLock.readLock()); try { - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.await(); + // Wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest increment) + this.mvcc.await(); if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); - if (r != null) { - return r; - } + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; } long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); - - List<Cell> results = doGet(store, row, family, tr); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - // HERE WE DIVERGE FROM APPEND - List<Cell> edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - - List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>()); - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - newTags = carryForwardTags(c, newTags); - if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { - idx++; - } - } - - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); - - // Add the TTL tag if the mutation carried one - if (mutation.getTTL() != Long.MAX_VALUE) { - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); - } - - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, mutation, c, newKV); - } - allKVs.add(newKV); - - if (!noWriteBack) { - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + WALEdit walEdits = null; + // Process increments a Store/family at a time. + // Accumulate edits for memstore to add later after we've added to WAL. + Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>(); + for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { + byte [] columnFamilyName = entry.getKey(); + List<Cell> increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we get. + List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName, + sort(increments, store.getComparator()), now, + MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, null); + if (!results.isEmpty()) { + forMemStore.put(store, results); + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.getCells().addAll(results); } } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } } - - // Actually write to WAL now + // Actually write to WAL now. If walEdits is non-empty, we write the WAL. if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), - WALKey.NO_SEQUENCE_ID, - nonceGroup, - nonce, - mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, true); - } else { - recordMutationWithoutWal(mutation.getFamilyCellMap()); - } - } - if (walKey == null) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, + getMVCC()); + txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); + } else { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned walKey = this.appendEmptyEdit(this.wal); } - - // now start my own transaction - writeEntry = walKey.getWriteEntry(); - - // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } + // Get WriteEntry. Will wait on assign of the sequence id. + walKey.getWriteEntry(); + + // Now write to memstore, a family at a time. + for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { + Store store = entry.getKey(); + List<Cell> results = entry.getValue(); + if (store.getFamily().getMaxVersions() == 1) { + // Upsert if VERSIONS for this CF == 1 + accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); + // TODO: St.Ack 20151222 Why no rollback in this case? + } else { + // Otherwise keep older versions around + for (Cell cell: results) { + // Why we need this? + CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); + accumulatedResultSize += store.add(cell); + doRollBackMemstore = true; } } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7626,7 +7677,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock = null; } // sync the transaction log outside the rowlock - if(txid != 0){ + if(txid != 0) { syncOrDefer(txid, durability); } doRollBackMemstore = false; @@ -7636,24 +7687,155 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - for(List<Cell> cells: tempMemstore.values()) { - rollbackMemstore(cells); - } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); + rollbackMemstore(memstoreCells); + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + } else { + if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry()); } - closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); + } + + // Request a cache flush. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); + return increment.isReturnResults() ? Result.create(allKVs) : null; + } + + /** + * @return Sorted list of <code>cells</code> using <code>comparator</code> + */ + private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) { + Collections.sort(cells, comparator); + return cells; + } + + /** + * Apply increments to a column family. + * @param sortedIncrements The passed in increments to apply MUST be sorted so that they match + * the order that they appear in the Get results (get results will be sorted on return). + * Otherwise, we won't be able to find the existing values if the cells are not specified in + * order by the client since cells are in an array list. + * @islation Isolation level to use when running the 'get'. Pass null for default. + * @return Resulting increments after <code>sortedIncrements</code> have been applied to current + * values (if any -- else passed increment is the final result). + * @throws IOException + */ + private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName, + List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs, + final IsolationLevel isolation) + throws IOException { + List<Cell> results = new ArrayList<Cell>(sortedIncrements.size()); + byte [] row = increment.getRow(); + // Get previous values for all columns in this family + List<Cell> currentValues = + getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the increment amount + int idx = 0; + for (int i = 0; i < sortedIncrements.size(); i++) { + Cell inc = sortedIncrements.get(i); + long incrementAmount = getLongValue(inc); + // If increment amount == 0, then don't write this Increment to the WAL. + boolean writeBack = (incrementAmount != 0); + // Carry forward any tags that might have been added by a coprocessor. + List<Tag> tags = Tag.carryForwardTags(inc); + + Cell currentValue = null; + long ts = now; + if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) { + currentValue = currentValues.get(idx); + ts = Math.max(now, currentValue.getTimestamp()); + incrementAmount += getLongValue(currentValue); + // Carry forward all tags + tags = Tag.carryForwardTags(tags, currentValue); + if (i < (sortedIncrements.size() - 1) && + !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++; + } + + // Append new incremented KeyValue to list + byte [] qualifier = CellUtil.cloneQualifier(inc); + byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount); + tags = carryForwardTTLTag(tags, increment); + + Cell newValue = new KeyValue(row, 0, row.length, + columnFamilyName, 0, columnFamilyName.length, + qualifier, 0, qualifier.length, + ts, KeyValue.Type.Put, + incrementAmountInBytes, 0, incrementAmountInBytes.length, + tags); + + // Don't set an mvcc if none specified. The mvcc may be assigned later in case where we + // write the memstore AFTER we sync our edit to the log. + if (mvccNum != MultiVersionConcurrencyControl.NO_WRITE_NUMBER) { + CellUtil.setSequenceId(newValue, mvccNum); + } + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newValue = coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue); + } + allKVs.add(newValue); + if (writeBack) { + results.add(newValue); } } + return results; + } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); + /** + * @return Get the long out of the passed in Cell + * @throws DoNotRetryIOException + */ + private static long getLongValue(final Cell cell) throws DoNotRetryIOException { + int len = cell.getValueLength(); + if (len != Bytes.SIZEOF_LONG) { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); + } + return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); + } + + /** + * Do a specific Get on passed <code>columnFamily</code> and column qualifiers + * from <code>incrementCoordinates</code> only. + * @param increment + * @param columnFamily + * @param incrementCoordinates + * @return Return the Cells to Increment + * @throws IOException + */ + private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily, + final List<Cell> increments, final IsolationLevel isolation) + throws IOException { + Get get = new Get(increment.getRow()); + if (isolation != null) get.setIsolationLevel(isolation); + for (Cell cell: increments) { + get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); + } + TimeRange tr = increment.getTimeRange(); + if (tr != null) { + get.setTimeRange(tr.getMin(), tr.getMax()); } - return mutation.isReturnResults() ? Result.create(allKVs) : null; + return get(get, false); + } + + private static List<Tag> carryForwardTTLTag(final Mutation mutation) { + return carryForwardTTLTag(null, mutation); + } + + /** + * @return Carry forward the TTL tag if the increment is carrying one + */ + private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull, + final Mutation mutation) { + long ttl = mutation.getTTL(); + if (ttl == Long.MAX_VALUE) return tagsOrNull; + List<Tag> tags = tagsOrNull; + // If we are making the array in here, given we are the last thing checked, we'll be only thing + // in the array so set its size to '1' (I saw this being done in earlier version of + // tag-handling). + if (tags == null) tags = new ArrayList<Tag>(1); + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + return tags; } // http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index e378234..7722d75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; /** @@ -30,6 +31,12 @@ import org.apache.hadoop.hbase.client.Scan; @InterfaceAudience.Private public interface KeyValueScanner { /** + * The byte array represents for NO_NEXT_INDEXED_KEY; + * The actual value is irrelevant because this is always compared by reference. + */ + public static final Cell NO_NEXT_INDEXED_KEY = new KeyValue(); + + /** * Look at the next Cell in this scanner, but do not iterate scanner. * @return the next Cell */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index eba99e0..da9c57a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.ClassSize; @InterfaceAudience.Private public class MultiVersionConcurrencyControl { private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class); + static final long NO_WRITE_NUMBER = 0; final AtomicLong readPoint = new AtomicLong(0); final AtomicLong writePoint = new AtomicLong(0); @@ -155,7 +156,7 @@ public class MultiVersionConcurrencyControl { * changes completely) so we can clean up the outstanding transaction. * * How much is the read point advanced? - * + * * Let S be the set of all write numbers that are completed. Set the read point to the highest * numbered write of S. * @@ -279,4 +280,4 @@ public class MultiVersionConcurrencyControl { ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index bcb866a..63bb4c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -652,7 +652,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case SEEK_NEXT_COL: { Cell nextIndexedKey = getNextIndexedKey(); - if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY + if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) { return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE; } @@ -662,7 +662,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case SEEK_NEXT_ROW: { Cell nextIndexedKey = getNextIndexedKey(); - if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY + if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) { return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 4329ce5..e189a30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -281,7 +281,6 @@ public class FSHLog implements WAL { private final int slowSyncNs; - private final static Object [] NO_ARGS = new Object []{}; // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -508,16 +507,16 @@ public class FSHLog implements WAL { FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); this.logrollsize = (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - + float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, - conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, + conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if(maxLogsDefined){ LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); } - this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", - Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", + Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); this.lowReplicationRollLimit = @@ -572,7 +571,7 @@ public class FSHLog implements WAL { int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); return maxLogs; } - + /** * Get the backing files associated with this WAL. * @return may be null if there are no files. @@ -1085,8 +1084,6 @@ public class FSHLog implements WAL { long sequence = this.disruptor.getRingBuffer().next(); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the - // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); truck.loadPayload(entry, scope.detach()); http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java index 72f4598..09b8846 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java @@ -55,6 +55,8 @@ import org.apache.zookeeper.KeeperException; * </p> */ @InterfaceAudience.Private [email protected](value="IS2_INCONSISTENT_SYNC", + justification="Complaint is about lastKeyUpdate... afraid to change it.") public class AuthenticationTokenSecretManager extends SecretManager<AuthenticationTokenIdentifier> { @@ -63,7 +65,7 @@ public class AuthenticationTokenSecretManager private static final Log LOG = LogFactory.getLog( AuthenticationTokenSecretManager.class); - private long lastKeyUpdate; + private long lastKeyUpdate; // FindBugs: IS2_INCONSISTENT_SYNC FIX!! private long keyUpdateInterval; private long tokenMaxLifetime; private ZKSecretWatcher zkWatcher; http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java new file mode 100644 index 0000000..bf3a44f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +// import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.core.TimerContext; +import com.yammer.metrics.stats.Snapshot; + +/** + * Simple Increments Performance Test. Run this from main. It is to go against a cluster. + * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181, + * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by + * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as + * in -DtableName="newTableName". It prints out configuration it is running with at the start and + * on the end it prints out percentiles. + */ +public class IncrementPerformanceTest implements Tool { + private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class); + private static final byte [] QUALIFIER = new byte [] {'q'}; + private Configuration conf; + private final MetricName metricName = new MetricName(this.getClass(), "increment"); + private static final String TABLENAME = "tableName"; + private static final String COLUMN_FAMILY = "columnFamilyName"; + private static final String THREAD_COUNT = "threadCount"; + private static final int DEFAULT_THREAD_COUNT = 80; + private static final String INCREMENT_COUNT = "incrementCount"; + private static final int DEFAULT_INCREMENT_COUNT = 10000; + + IncrementPerformanceTest() {} + + public int run(final String [] args) throws Exception { + Configuration conf = getConf(); + final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME); + final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY)); + int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT); + final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT); + LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" + + getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName + + ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount + + ", incrementCount=" + incrementCount); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + Set<Future<?>> futures = new HashSet<Future<?>>(); + final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter + while (integer.incrementAndGet() <= threadCount) { + futures.add(service.submit(new Runnable() { + @Override + public void run() { + HTable table; + try { + // ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(TABLE_NAME)); + table = new HTable(getConf(), tableName.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + Timer timer = Metrics.newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + for (int i = 0; i < incrementCount; i++) { + byte[] row = Bytes.toBytes(i); + TimerContext context = timer.time(); + try { + table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l); + } catch (IOException e) { + // swallow..it's a test. + } finally { + context.stop(); + } + } + } + })); + } + + for(Future<?> future : futures) future.get(); + service.shutdown(); + Snapshot s = Metrics.newTimer(this.metricName, + TimeUnit.MILLISECONDS, TimeUnit.SECONDS).getSnapshot(); + LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(), + s.get95thPercentile(), s.get99thPercentile())); + return 0; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 81253a5..28c354f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -51,7 +51,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -3139,7 +3138,7 @@ public class TestFromClientSide { equals(value, CellUtil.cloneValue(key))); } - private void assertIncrementKey(Cell key, byte [] row, byte [] family, + static void assertIncrementKey(Cell key, byte [] row, byte [] family, byte [] qualifier, long value) throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + @@ -3363,7 +3362,7 @@ public class TestFromClientSide { return stamps; } - private boolean equals(byte [] left, byte [] right) { + static boolean equals(byte [] left, byte [] right) { if (left == null && right == null) return true; if (left == null && right.length == 0) return true; if (right == null && left.length == 0) return true; @@ -4483,264 +4482,6 @@ public class TestFromClientSide { } @Test - public void testIncrementWithDeletes() throws Exception { - LOG.info("Starting testIncrementWithDeletes"); - final TableName TABLENAME = - TableName.valueOf("testIncrementWithDeletes"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - TEST_UTIL.flush(TABLENAME); - - Delete del = new Delete(ROW); - ht.delete(del); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - - Get get = new Get(ROW); - Result r = ht.get(get); - assertEquals(1, r.size()); - assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); - } - - @Test - public void testIncrementingInvalidValue() throws Exception { - LOG.info("Starting testIncrementingInvalidValue"); - final TableName TABLENAME = TableName.valueOf("testIncrementingInvalidValue"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - Put p = new Put(ROW); - // write an integer here (not a Long) - p.add(FAMILY, COLUMN, Bytes.toBytes(5)); - ht.put(p); - try { - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, COLUMN, 5); - try { - ht.increment(inc); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - } - - @Test - public void testIncrementInvalidArguments() throws Exception { - LOG.info("Starting testIncrementInvalidArguments"); - final TableName TABLENAME = TableName.valueOf("testIncrementInvalidArguments"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - try { - // try null row - ht.incrementColumnValue(null, FAMILY, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null family - ht.incrementColumnValue(ROW, null, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null qualifier - ht.incrementColumnValue(ROW, FAMILY, null, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - // try null row - try { - Increment incNoRow = new Increment((byte [])null); - incNoRow.addColumn(FAMILY, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } catch (NullPointerException npe) { - // success - } - // try null family - try { - Increment incNoFamily = new Increment(ROW); - incNoFamily.addColumn(null, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - // try null qualifier - try { - Increment incNoQualifier = new Increment(ROW); - incNoQualifier.addColumn(FAMILY, null, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - } - - @Test - public void testIncrementOutOfOrder() throws Exception { - LOG.info("Starting testIncrementOutOfOrder"); - final TableName TABLENAME = TableName.valueOf("testIncrementOutOfOrder"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") - }; - - Increment inc = new Increment(ROW); - for (int i=0; i<QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify expected results - Result r = ht.get(new Get(ROW)); - Cell [] kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); - - // Now try multiple columns again - inc = new Increment(ROW); - for (int i=0; i<QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify - r = ht.get(new Get(ROW)); - kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); - } - - @Test - public void testIncrementOnSameColumn() throws Exception { - LOG.info("Starting testIncrementOnSameColumn"); - final byte[] TABLENAME = Bytes.toBytes("testIncrementOnSameColumn"); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte[][] QUALIFIERS = - new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; - - Increment inc = new Increment(ROW); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify expected results - Result r = ht.get(new Get(ROW)); - Cell[] kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); - - // Now try multiple columns again - inc = new Increment(ROW); - for (int i = 0; i < QUALIFIERS.length; i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - inc.addColumn(FAMILY, QUALIFIERS[i], 1); - } - ht.increment(inc); - - // Verify - r = ht.get(new Get(ROW)); - kvs = r.rawCells(); - assertEquals(3, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); - - ht.close(); - } - - @Test - public void testIncrement() throws Exception { - LOG.info("Starting testIncrement"); - final TableName TABLENAME = TableName.valueOf("testIncrement"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] ROWS = new byte [][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), - Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), - Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") - }; - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), - Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), - Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") - }; - - // Do some simple single-column increments - - // First with old API - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3); - ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4); - - // Now increment things incremented with old and do some new - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, QUALIFIERS[1], 1); - inc.addColumn(FAMILY, QUALIFIERS[3], 1); - inc.addColumn(FAMILY, QUALIFIERS[4], 1); - ht.increment(inc); - - // Verify expected results - Result r = ht.get(new Get(ROW)); - Cell [] kvs = r.rawCells(); - assertEquals(5, kvs.length); - assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); - assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3); - assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3); - assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5); - assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1); - - // Now try multiple columns by different amounts - inc = new Increment(ROWS[0]); - for (int i=0;i<QUALIFIERS.length;i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], i+1); - } - ht.increment(inc); - // Verify - r = ht.get(new Get(ROWS[0])); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i=0;i<QUALIFIERS.length;i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1); - } - - // Re-increment them - inc = new Increment(ROWS[0]); - for (int i=0;i<QUALIFIERS.length;i++) { - inc.addColumn(FAMILY, QUALIFIERS[i], i+1); - } - ht.increment(inc); - // Verify - r = ht.get(new Get(ROWS[0])); - kvs = r.rawCells(); - assertEquals(QUALIFIERS.length, kvs.length); - for (int i=0;i<QUALIFIERS.length;i++) { - assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); - } - } - - - @Test public void testClientPoolRoundRobin() throws IOException { final TableName tableName = TableName.valueOf("testClientPoolRoundRobin");
