Author: liyin Date: Wed Apr 2 20:49:31 2014 New Revision: 1584170 URL: http://svn.apache.org/r1584170 Log: [HBASE-10808] Forward porting Leases and Histogram bug fixes to trunk.
Author: manukranthk Summary: This diff attempts to forward port the Leases and Histogram bug fixes back to trunk. Test Plan: Run existing unit tests. Reviewers: liyintang, daviddeng Reviewed By: liyintang CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D1226083 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/PointScanBenchmark.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestConversionUtils.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestLeases.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/LeaseListener.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Leases.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionUtilities.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanSearch.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHFileHistogramE2E.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestUniformSplitHistogram.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Wed Apr 2 20:49:31 2014 @@ -105,7 +105,7 @@ public class HColumnDescriptor implement public static final String BLOOMFILTER_ERRORRATE = "BLOOMFILTER_ERRORRATE"; public static final String FOREVER = "FOREVER"; public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE"; - public static final String ROWKEY_PREFIX_LENGTH_FOR_BLOOMFILTER = "ROWKEY_PREFIX_LENGTH"; + public static final String ROWKEY_PREFIX_LENGTH = "ROWKEY_PREFIX_LENGTH"; public static final String HFILEHISTOGRAM_BUCKET_COUNT = "HFILEHISTOGRAM_BUCKET_COUNT"; @@ -206,7 +206,7 @@ public class HColumnDescriptor implement String.valueOf(DEFAULT_ENCODE_ON_DISK)); DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, String.valueOf(DEFAULT_DATA_BLOCK_ENCODING)); - DEFAULT_VALUES.put(ROWKEY_PREFIX_LENGTH_FOR_BLOOMFILTER, + DEFAULT_VALUES.put(ROWKEY_PREFIX_LENGTH, String.valueOf(DEFAULT_ROWKEY_PREFIX_LENGTH_FOR_BLOOM)); DEFAULT_VALUES.put(HFILEHISTOGRAM_BUCKET_COUNT, String.valueOf(HFileHistogram.DEFAULT_HFILEHISTOGRAM_BINCOUNT)); @@ -762,14 +762,14 @@ public class HColumnDescriptor implement */ public int getRowPrefixLengthForBloom() { return getIntValueFromString( - ROWKEY_PREFIX_LENGTH_FOR_BLOOMFILTER, + ROWKEY_PREFIX_LENGTH, DEFAULT_ROWKEY_PREFIX_LENGTH_FOR_BLOOM, "Cannot parse row key prefix length"); } public void setRowKeyPrefixLengthForBloom(int prefixLength) { if (prefixLength > 0) { - setValue(ROWKEY_PREFIX_LENGTH_FOR_BLOOMFILTER, String.valueOf(prefixLength)); + setValue(ROWKEY_PREFIX_LENGTH, String.valueOf(prefixLength)); } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Apr 2 20:49:31 2014 @@ -1066,6 +1066,11 @@ public final class HConstants { "hbase.regionserver.use.guava.bytes.comparision"; public static boolean DEFAULT_USE_GUAVA_BYTES_COMPARISION = false; + public static final long DEFAULT_REGIONSERVER_LEASE_THREAD_WAKE_FREQUENCY = + 15000; + public static final String REGIONSERVER_LEASE_THREAD_WAKE_FREQUENCY = + "hbase.regionserver.thread.wakefrequency"; + private HConstants() { // Can't be instantiated with this constructor. } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/LeaseListener.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/LeaseListener.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/LeaseListener.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/LeaseListener.java Wed Apr 2 20:49:31 2014 @@ -28,7 +28,40 @@ package org.apache.hadoop.hbase; * lease. Users of the Leases class can use a LeaseListener subclass to, for * example, clean up resources after a lease has expired. */ -public interface LeaseListener { +public abstract class LeaseListener { + + private String leaseName; + private long leaseStartTS; + + public LeaseListener(String leaseName, long leaseStartTS) { + this.leaseName = leaseName; + this.leaseStartTS = leaseStartTS; + } + + /** + * Adds current system milli seconds as lease start time. + * @param leaseName2 + */ + public LeaseListener(String leaseName) { + this(leaseName, System.currentTimeMillis()); + } + + public String getLeaseName() { + return leaseName; + } + + public void setLeaseName(String leaseName) { + this.leaseName = leaseName; + } + + public long getLeaseStartTS() { + return leaseStartTS; + } + + public void setLeaseStartTS(long leaseStartTS) { + this.leaseStartTS = leaseStartTS; + } + /** When a lease expires, this method is called. */ - public void leaseExpired(); + public abstract void leaseExpired(); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Leases.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Leases.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Leases.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Leases.java Wed Apr 2 20:49:31 2014 @@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFac import java.util.ConcurrentModificationException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Delayed; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; @@ -32,43 +34,48 @@ import java.util.concurrent.TimeUnit; import java.io.IOException; import org.apache.hadoop.hbase.util.HasThread; +import com.google.common.base.Preconditions; + /** * Leases * - * There are several server classes in HBase that need to track external - * clients that occasionally send heartbeats. + * There are several server classes in HBase that need to track external clients + * that occasionally send heartbeats. * - * <p>These external clients hold resources in the server class. - * Those resources need to be released if the external client fails to send a - * heartbeat after some interval of time passes. + * <p> + * These external clients hold resources in the server class. Those resources + * need to be released if the external client fails to send a heartbeat after + * some interval of time passes. * - * <p>The Leases class is a general reusable class for this kind of pattern. - * An instance of the Leases class will create a thread to do its dirty work. - * You should close() the instance if you want to clean up the thread properly. + * <p> + * The Leases class is a general reusable class for this kind of pattern. An + * instance of the Leases class will create a thread to do its dirty work. You + * should close() the instance if you want to clean up the thread properly. * * <p> * NOTE: This class extends HasThread rather than Chore because the sleep time - * can be interrupted when there is something to do, rather than the Chore - * sleep time which is invariant. + * can be interrupted when there is something to do, rather than the Chore sleep + * time which is invariant. */ public class Leases extends HasThread { private static final Log LOG = LogFactory.getLog(Leases.class.getName()); private final int leasePeriod; - private final int leaseCheckFrequency; - private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>(); - protected final Map<String, Lease> leases = new HashMap<String, Lease>(); + protected final ConcurrentHashMap<String, LeaseListener> leaseMap = + new ConcurrentHashMap<String, LeaseListener>(); private volatile boolean stopRequested = false; + private final long threadWakeFrequencyMS; /** * Creates a lease monitor * - * @param leasePeriod - length of time (milliseconds) that the lease is valid - * @param leaseCheckFrequency - how often the lease should be checked - * (milliseconds) + * @param leasePeriod + * - length of time (milliseconds) that the lease is valid + * @param threadWakeFrequencyMS + * - how often the lease should be checked (milliseconds) */ - public Leases(final int leasePeriod, final int leaseCheckFrequency) { + public Leases(final int leasePeriod, long threadWakeFrequencyMS) { this.leasePeriod = leasePeriod; - this.leaseCheckFrequency = leaseCheckFrequency; + this.threadWakeFrequencyMS = threadWakeFrequencyMS; } /** @@ -76,81 +83,74 @@ public class Leases extends HasThread { */ @Override public void run() { - while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) { - Lease lease = null; + HashSet<String> expiredSet = new HashSet<String>(); + while (!stopRequested) { try { - lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS); + expiredSet.clear(); + long now = System.currentTimeMillis(); + for (Map.Entry<String, LeaseListener> entry : leaseMap.entrySet()) { + Long startTS = entry.getValue().getLeaseStartTS(); + if ((now - startTS) >= leasePeriod) { + expiredSet.add(entry.getKey()); + entry.getValue().leaseExpired(); + } + } + leaseMap.keySet().removeAll(expiredSet); + Thread.sleep(threadWakeFrequencyMS); } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); continue; } catch (ConcurrentModificationException e) { + LOG.error(e.getMessage(), e); + assert false; // This should fail in unit tests. continue; } catch (Throwable e) { LOG.fatal("Unexpected exception killed leases thread", e); break; } - if (lease == null) { - continue; - } - // A lease expired. Run the expired code before removing from queue - // since its presence in queue is used to see if lease exists still. - if (lease.getListener() == null) { - LOG.error("lease listener is null for lease " + lease.getLeaseName()); - } else { - lease.getListener().leaseExpired(); - } - synchronized (leaseQueue) { - leases.remove(lease.getLeaseName()); - } } close(); } /** - * Shuts down this lease instance when all outstanding leases expire. - * Like {@link #close()} but rather than violently end all leases, waits - * first on extant leases to finish. Use this method if the lease holders - * could loose data, leak locks, etc. Presumes client has shutdown - * allocation of new leases. + * Shuts down this lease instance when all outstanding leases expire. Like + * {@link #close()} but rather than violently ending all leases, + * waits first on extant leases to finish. + * Use this method if the lease holders could loose data, leak locks, etc. + * Presumes client has shutdown allocation of new leases. */ public void closeAfterLeasesExpire() { this.stopRequested = true; } /** - * Shut down this Leases instance. All pending leases will be destroyed, + * Shut down this Leases instance. All pending leases will be destroyed, * without any cancellation calls. */ public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); this.stopRequested = true; - synchronized (leaseQueue) { - leaseQueue.clear(); - leases.clear(); - leaseQueue.notifyAll(); - } + leaseMap.clear(); LOG.info(Thread.currentThread().getName() + " closed leases"); } /** - * Obtain a lease + * Obtain a lease. * * @param leaseName name of the lease * @param listener listener that will process lease expirations + * + * @return returns the existing lease listener associated with the key, + * null if this is a new key. * @throws LeaseStillHeldException */ public void createLease(String leaseName, final LeaseListener listener) - throws LeaseStillHeldException { + throws LeaseStillHeldException { if (stopRequested) { return; } - Lease lease = new Lease(leaseName, listener, - System.currentTimeMillis() + leasePeriod); - synchronized (leaseQueue) { - if (leases.containsKey(leaseName)) { - throw new LeaseStillHeldException(leaseName); - } - leases.put(leaseName, lease); - leaseQueue.add(lease); + if (leaseMap.put(leaseName, listener) != null) { + throw new LeaseStillHeldException(leaseName); } } @@ -176,107 +176,30 @@ public class Leases extends HasThread { } /** - * Renew a lease + * Renew a lease. * - * @param leaseName name of lease + * @param leaseName + * name of lease * @throws LeaseException */ public void renewLease(final String leaseName) throws LeaseException { - synchronized (leaseQueue) { - Lease lease = leases.get(leaseName); - // We need to check to see if the remove is successful as the poll in the run() - // method could have completed between the get and the remove which will result - // in a corrupt leaseQueue. - if (lease == null || !leaseQueue.remove(lease)) { - throw new LeaseException("lease '" + leaseName + - "' does not exist or has already expired"); - } - lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); - leaseQueue.add(lease); + LeaseListener listener; + if ((listener = leaseMap.get(leaseName)) == null) { + throw new LeaseException("lease '" + leaseName + "' does not exist"); } + listener.setLeaseStartTS(System.currentTimeMillis()); } /** * Client explicitly cancels a lease. * - * @param leaseName name of lease + * @param leaseName + * name of lease * @throws LeaseException */ public void cancelLease(final String leaseName) throws LeaseException { - synchronized (leaseQueue) { - Lease lease = leases.remove(leaseName); - if (lease == null) { - throw new LeaseException("lease '" + leaseName + "' does not exist"); - } - leaseQueue.remove(lease); - } - } - - /** This class tracks a single Lease. */ - private static class Lease implements Delayed { - private final String leaseName; - private final LeaseListener listener; - private long expirationTime; - - Lease(final String leaseName, LeaseListener listener, long expirationTime) { - this.leaseName = leaseName; - this.listener = listener; - this.expirationTime = expirationTime; - } - - /** @return the lease name */ - public String getLeaseName() { - return leaseName; - } - - /** @return listener */ - public LeaseListener getListener() { - return this.listener; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - return this.hashCode() == ((Lease) obj).hashCode(); - } - - @Override - public int hashCode() { - return this.leaseName.hashCode(); - } - - public long getDelay(TimeUnit unit) { - return unit.convert(this.expirationTime - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); + if (leaseMap.remove(leaseName) == null) { + throw new LeaseException("lease '" + leaseName + "' does not exist"); } - - public int compareTo(Delayed o) { - long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); - - return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); - } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } - - /** - * Get the expiration time for that lease - * @return expiration time - */ - public long getExpirationTime() { - return this.expirationTime; - } - } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java Wed Apr 2 20:49:31 2014 @@ -28,6 +28,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.hbase.KeyValue; @@ -47,8 +48,24 @@ import com.google.common.collect.Lists; */ public class UniformSplitHFileHistogram implements HFileHistogram { protected NumericHistogram underlyingHistogram; - // TODO manukranthk : make this configurable. - int padding = 8; + public static final int PADDING = 8; + private static final byte[] INFINITY; + // Infinity but padded with a zero at the start to avoid messing with 2's complement. + private static final byte[] INFINITY_PADDED; + private static final double INFINITY_DOUBLE; + static { + /** + * Returns {0xff, 0xff .... 0xff} + * <---- padding ----> + */ + INFINITY = new byte[PADDING]; + INFINITY_PADDED = new byte[PADDING + 1]; + for (int i = 0; i < PADDING; i++) { + INFINITY[i] = (byte)0xFF; + INFINITY_PADDED[i + 1] = (byte)0xFF; + } + INFINITY_DOUBLE = (new BigInteger(getPaddedInfinityArr())).doubleValue(); + } public UniformSplitHFileHistogram(int binCount) { this.underlyingHistogram = new HiveBasedNumericHistogram( @@ -66,27 +83,35 @@ public class UniformSplitHFileHistogram @Override public void add(KeyValue kv) { - double val = convertBytesToDouble(kv.getRow()); + double val = convertBytesToDouble(kv.getBuffer(), + kv.getRowOffset(), kv.getRowLength()); underlyingHistogram.add(val); } - private double getInfinity() { - return new BigInteger(getInfinityArr()).doubleValue(); + protected static double getInfinity() { + return INFINITY_DOUBLE; } /** * This returns the maximum number that we can represent using padding bytes. - * Returns {0x00, 0xff, 0xff .... 0xff } - * <---- padding ----> + * Returns {0xff, 0xff .... 0xff} + * <---- padding ----> * @return */ - private byte[] getInfinityArr() { - byte[] row = new byte[1]; - row[0] = (byte) 0; - return Bytes.appendToTail(row, padding, (byte)0xFF); + protected static byte[] getInfinityArr() { + return Arrays.copyOf(INFINITY, PADDING); + } + + /** + * To use while converting to a BigInteger. + * Contains a 0 in the 0'th index and 0xFF in the rest, + * containing a total of PADDING + 1 bytes. + */ + protected static byte[] getPaddedInfinityArr() { + return Arrays.copyOf(INFINITY_PADDED, PADDING + 1); } - private double getMinusInfinity() { + protected static double getMinusInfinity() { return 0.0; } @@ -104,12 +129,20 @@ public class UniformSplitHFileHistogram * @param row * @return */ - protected double convertBytesToDouble(byte[] row) { - byte[] tmpRow = Bytes.head(row, Math.min(row.length, padding)); - byte[] newRow = Bytes.padTail(tmpRow, padding - tmpRow.length); + protected static double convertBytesToDouble(byte[] row) { + return convertBytesToDouble(row, 0, row.length); + } + + protected static double convertBytesToDouble(byte[] rowbuffer, int offset, + int length) { + byte[] paddedRow = new byte[PADDING + 1]; + // To avoid messing with 2's complement. - newRow = Bytes.padHead(newRow, 1); - return new BigInteger(newRow).doubleValue(); + paddedRow[0] = 0; + int minlength = Math.min(length, PADDING); + System.arraycopy(rowbuffer, offset, paddedRow, 1, minlength); + + return new BigInteger(paddedRow).doubleValue(); } /** @@ -118,7 +151,7 @@ public class UniformSplitHFileHistogram * @param d * @return */ - protected byte[] convertDoubleToBytes(double d) { + protected static byte[] convertDoubleToBytes(double d) { BigDecimal tmpDecimal = new BigDecimal(d); BigInteger tmp = tmpDecimal.toBigInteger(); byte[] arr = tmp.toByteArray(); @@ -128,14 +161,12 @@ public class UniformSplitHFileHistogram Preconditions.checkArgument(arr.length == 1 || arr[1] != 0); arr = Bytes.tail(arr, arr.length - 1); } - if (arr.length > padding) { - // Can happen due to loose precision guarentee in double. - // while doing the conversion, + if (arr.length > PADDING) { // {0x00, 0xff, ... , 0xff, 0xff}=>double=>{0x01, 0x00, ... , 0x00, 0x00} // might happen. - arr = Bytes.tail(getInfinityArr(), padding); + arr = getInfinityArr(); } - return Bytes.padHead(arr, padding - arr.length); + return Bytes.padHead(arr, PADDING - arr.length); } @Override @@ -166,8 +197,8 @@ public class UniformSplitHFileHistogram private HFileHistogram.Bucket getFromNumericHistogramBucket( NumericHistogram.Bucket bucket) { Bucket b = (new Bucket.Builder()) - .setStartRow(this.convertDoubleToBytes(bucket.getStart())) - .setEndRow(this.convertDoubleToBytes(bucket.getEnd())) + .setStartRow(convertDoubleToBytes(bucket.getStart())) + .setEndRow(convertDoubleToBytes(bucket.getEnd())) .setNumRows(bucket.getCount()).create(); return b; } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr 2 20:49:31 2014 @@ -676,7 +676,8 @@ public class HRegionServer implements HR this.leases = new Leases( (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD), - this.threadWakeFrequency); + conf.getLong(HConstants.REGIONSERVER_LEASE_THREAD_WAKE_FREQUENCY, + HConstants.DEFAULT_REGIONSERVER_LEASE_THREAD_WAKE_FREQUENCY)); } /** @@ -3035,17 +3036,16 @@ public class HRegionServer implements HR * Instantiated as a scanner lease. * If the lease times out, the scanner is closed */ - private class ScannerListener implements LeaseListener { - private final String scannerName; + private class ScannerListener extends LeaseListener { ScannerListener(final String n) { - this.scannerName = n; + super(n); } @Override public void leaseExpired() { - LOG.info("Scanner " + this.scannerName + " lease expired"); - InternalScanner s = scanners.remove(this.scannerName); + LOG.info("Scanner " + this.getLeaseName() + " lease expired"); + InternalScanner s = scanners.remove(this.getLeaseName()); if (s != null) { try { s.close(); @@ -3238,19 +3238,18 @@ public class HRegionServer implements HR * Instantiated as a row lock lease. * If the lease times out, the row lock is released */ - private class RowLockListener implements LeaseListener { - private final String lockName; + private class RowLockListener extends LeaseListener { private final HRegion region; RowLockListener(final String lockName, final HRegion region) { - this.lockName = lockName; + super(lockName); this.region = region; } @Override public void leaseExpired() { - LOG.info("Row Lock " + this.lockName + " lease expired"); - Integer r = rowlocks.remove(this.lockName); + LOG.info("Row Lock " + this.getLeaseName() + " lease expired"); + Integer r = rowlocks.remove(this.getLeaseName()); if(r != null) { region.releaseRowLock(r); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionUtilities.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionUtilities.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionUtilities.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionUtilities.java Wed Apr 2 20:49:31 2014 @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -115,9 +116,43 @@ public class HRegionUtilities { } /** - * Adjusting the startRow of startBucket to region's startRow - * and endRow of endBucket to region's endRow. - * Modifies the current list + * In some cases the end region can have empty end row. Special casing this + * in the isValid check. + * @param regionEndKey + * @return + */ + public static boolean isRegionWithEmptyEndKey(byte[] regionEndKey) { + return regionEndKey.length == 0; + } + + /** + * A bucket is not valid if + * * bucket falls before the region boundaries. + * * bucket falls after the region boundaries. + * * bucket has same start row and end row. + * @param b + * @param regionStartKey + * @param regionEndKey + * @return + */ + public static boolean isValidBucket(Bucket b, byte[] regionStartKey, + byte[] regionEndKey) { + if (Bytes.compareTo(regionStartKey, b.getEndRow()) >= 0) { + return false; + } + if (!isRegionWithEmptyEndKey(regionEndKey) + && Bytes.compareTo(regionEndKey, b.getStartRow()) <= 0) { + return false; + } + if (Bytes.compareTo(b.getStartRow(), b.getEndRow()) == 0) { + return false; + } + return true; + } + + /** + * Picking the buckets within the valid range of the [startKey, endKey) + * and adjust the start and end rows of the start and end buckets of the * @param buckets * @return */ @@ -125,12 +160,20 @@ public class HRegionUtilities { List<Bucket> buckets, byte[] startKey, byte[] endKey) { int size = buckets.size(); Preconditions.checkArgument(size > 1); - Bucket startBucket = buckets.get(0); - Bucket endBucket = buckets.get(size - 1); - buckets.set(0, new HFileHistogram.Bucket.Builder(startBucket) + List<Bucket> retbuckets = new ArrayList<Bucket> (size); + for (Bucket b : buckets) { + if (isValidBucket(b, startKey, endKey)) { + retbuckets.add(b); + } + } + size = retbuckets.size(); + if (size == 0) return null; + Bucket startBucket = retbuckets.get(0); + Bucket endBucket = retbuckets.get(size - 1); + retbuckets.set(0, new HFileHistogram.Bucket.Builder(startBucket) .setStartRow(startKey).create()); - buckets.set(size - 1, new HFileHistogram.Bucket.Builder(endBucket) + retbuckets.set(size - 1, new HFileHistogram.Bucket.Builder(endBucket) .setEndRow(endKey).create()); - return buckets; + return retbuckets; } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Wed Apr 2 20:49:31 2014 @@ -1905,4 +1905,12 @@ public class Bytes { TProtocol protocol = new TFacebookCompactProtocol(buffer); return codec.read(protocol); } + + public static int longestCommonPrefix(byte[] arr1, byte[] arr2) { + int len = Math.min(arr1.length, arr2.length); + for (int i = 0; i < len; i++) { + if (arr1[i] != arr2[i]) return i; + } + return len; + } } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java Wed Apr 2 20:49:31 2014 @@ -118,7 +118,9 @@ public abstract class Benchmark { */ public void printBenchmarkResults() { System.out.println("Benchmark results"); - benchmarkResults.prettyPrint(); + if (benchmarkResults != null) { + benchmarkResults.prettyPrint(); + } } Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/PointScanBenchmark.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/PointScanBenchmark.java?rev=1584170&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/PointScanBenchmark.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/PointScanBenchmark.java Wed Apr 2 20:49:31 2014 @@ -0,0 +1,153 @@ +package org.apache.hadoop.hbase.benchmarks; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.metrics.PercentileMetric; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Histogram; + +/** + * Performs random point scans using 4 byte startRow and (startRow + 1) + * @author manukranthk + */ +public class PointScanBenchmark { + + private final Log LOG = LogFactory.getLog(PointScanBenchmark.class); + private byte[] tableName; + private String zkQuorum; + private byte[] family; + private int times; + private long threshold; + private boolean profiling; + + public PointScanBenchmark( + byte[] tableName, + String zookeeperQuorum, + byte[] family, + int times, + long threshold, + boolean profiling) throws IOException { + this.tableName = tableName; + this.zkQuorum = zookeeperQuorum; + this.family = family; + this.times = times; + this.threshold = threshold; + this.profiling = profiling; + } + + public void runBenchmarks() throws IOException { + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); + HTable table = new HTable(conf, tableName); + long totalTime = 0; + Histogram hist = new Histogram(10, 0, 100); + table.setProfiling(profiling); + List<Long> thresholdList = new ArrayList<Long>(); + List<Byte> thresholdBytes = new ArrayList<Byte>(); + int thresholdCount = 10000; + Random rand = new Random(); + byte[] startRow = new byte[10]; + for (int i = 0; i < times; i++) { + for (byte b = 0; b < 127; b++) { + long startTime = System.nanoTime(); + rand.nextBytes(startRow); + Scan s = getScan(startRow); + ResultScanner scanner = table.getScanner(s); + int cnt = 0; + for (Result r : scanner) { + cnt++; + } + long endTime = System.nanoTime(); + long curTime = endTime - startTime; + totalTime += curTime; + if (curTime > threshold) { + LOG.debug("Adding to threshold list : " + curTime); + if (profiling) { + LOG.debug(table.getProfilingData().toPrettyString()); + } + if (thresholdList.size() < thresholdCount) { + thresholdList.add(curTime); + thresholdBytes.add(b); + } + } + table.getProfilingData(); + hist.addValue(curTime); + LOG.debug(String.format("Printing the stats: Row Cnt : %d, Time Taken : %d ns, Byte : %d", cnt, curTime, b)); + } + } + LOG.debug(String.format("Avg time : %d ns", totalTime/times)); + LOG.debug("Histogram stats : P99 : " +hist.getPercentileEstimate(PercentileMetric.P95) + ", P95 : " + hist.getPercentileEstimate(PercentileMetric.P99)); + for (int i = 0; i< thresholdList.size(); i++) { + LOG.debug("(" + thresholdBytes.get(i) + ", " + thresholdList.get(i) + ")"); + } + } + + public Scan getScan(byte[] startRow) { + Scan s = new Scan(); + s.setStartRow(startRow); + return s; + } + + /** + * @param args + * @throws ParseException + * @throws IOException + */ + public static void main(String[] args) throws ParseException, IOException { + Options opt = new Options(); + opt.addOption(OptionBuilder.withArgName("tableName").hasArg() + .withDescription("Table Name").create("t")); + opt.addOption(OptionBuilder.withArgName("zookeeper").hasArg() + .withDescription("Zookeeper Quorum").create("zk")); + opt.addOption(OptionBuilder.withArgName("times").hasArg() + .withDescription("Number of times to perform the scan").create("times")); + opt.addOption(OptionBuilder.withArgName("family").hasArg() + .withDescription("Column Family").create("cf")); + opt.addOption(OptionBuilder.withArgName("threshold").hasArg() + .withDescription("Threshold").create("th")); + opt.addOption(OptionBuilder.withArgName("profiling").hasArg() + .withDescription("Enable per request profiling").create("prof")); + CommandLine cmd = new GnuParser().parse(opt, args); + byte[] tableName = Bytes.toBytes(cmd.getOptionValue("t")); + String zkQuorum = ""; + byte[] family = null; + int times = 1000; + long threshold = 100000000; + boolean profiling = false; + if (cmd.hasOption("zk")) { + zkQuorum = cmd.getOptionValue("zk"); + } + if (cmd.hasOption("cf")) { + family = Bytes.toBytes(cmd.getOptionValue("cf")); + } + if (cmd.hasOption("times")) { + times = Integer.parseInt(cmd.getOptionValue("times")); + } + if (cmd.hasOption("th")) { + threshold = Long.parseLong(cmd.getOptionValue("th")); + } + if (cmd.hasOption("prof")) { + profiling = Boolean.parseBoolean(cmd.getOptionValue("prof")); + } + PointScanBenchmark bench = + new PointScanBenchmark(tableName, zkQuorum, family, times, threshold, profiling); + bench.runBenchmarks(); + } +} Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanSearch.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanSearch.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanSearch.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanSearch.java Wed Apr 2 20:49:31 2014 @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.util.Byte */ public class ScanSearch extends Benchmark { public static final Log LOG = LogFactory.getLog(ScanSearch.class); - private static final long PRINT_INTERVAL_KVS = 1000000; + private static long PRINT_INTERVAL_KVS = 1000000; public static byte[] tableName = null; public static int cachingSize = 10000; public static boolean prefetch = true; @@ -42,6 +42,7 @@ public class ScanSearch extends Benchmar public static int nonBlockingPreloadingCount = 0; public static boolean clientSideScan = false; public static int max_regions = Integer.MAX_VALUE; + public static boolean doProfiling = false; public void runBenchmark() throws Throwable { ArrayList<HRegionInfo> regions = this.getRegions(); @@ -143,12 +144,14 @@ public class ScanSearch extends Benchmar long numKVs = 0; long numBytes = 0; - Result kv; long printAfterNumKVs = PRINT_INTERVAL_KVS; long startTime = System.currentTimeMillis(); // read all the KV's ResultScanner scanner = null; + if (doProfiling) { + htable.setProfiling(true); + } try { if (!clientSideScan) { scanner = htable.getScanner(scan); @@ -162,7 +165,7 @@ public class ScanSearch extends Benchmar return; } try { - while ((kv = scanner.next()) != null) { + for (Result kv : scanner) { numKVs += kv.size(); if (kv.raw() != null) { for (KeyValue k : kv.raw()) @@ -170,6 +173,9 @@ public class ScanSearch extends Benchmar } if (numKVs > printAfterNumKVs) { + if (doProfiling) { + System.out.println(htable.getProfilingData().toPrettyString()); + } printAfterNumKVs += PRINT_INTERVAL_KVS; if (printStats) { printStats(region.getRegionNameAsString(), numKVs, numBytes, @@ -177,7 +183,9 @@ public class ScanSearch extends Benchmar } } } - } catch (IOException e) { + } catch (Exception e) { + LOG.debug("Caught exception", e); + } finally { scanner.close(); } @@ -185,7 +193,6 @@ public class ScanSearch extends Benchmar printStats(region.getRegionNameAsString(), numKVs, numBytes, startTime, caching, prefetch, preloadBlocks); } - scanner.close(); } } @@ -219,6 +226,11 @@ public class ScanSearch extends Benchmar .withDescription("Number of scanners preloading").create("x")); opt.addOption(OptionBuilder.withArgName("maxregions").hasArg() .withDescription("Max number of regions to scan").create("n")); + opt.addOption(OptionBuilder.withArgName("print-interval").hasArg() + .withDescription("Number of key values after which we " + + "can print the stats.").create("pi")); + opt.addOption(OptionBuilder.withArgName("useProfiling").hasArg() + .withDescription("Set per request profiling data and get it").create("prof")); CommandLine cmd = new GnuParser().parse(opt, args); ScanSearch.tableName = Bytes.toBytes(cmd.getOptionValue("t")); @@ -241,6 +253,12 @@ public class ScanSearch extends Benchmar if (cmd.hasOption("n")) { ScanSearch.max_regions = Integer.parseInt(cmd.getOptionValue("n")); } + if (cmd.hasOption("pi")) { + ScanSearch.PRINT_INTERVAL_KVS = Integer.parseInt(cmd.getOptionValue("pi")); + } + if (cmd.hasOption("prof")) { + ScanSearch.doProfiling = Boolean.parseBoolean(cmd.getOptionValue("prof")); + } String className = Thread.currentThread().getStackTrace()[1].getClassName(); System.out.println("Running benchmark " + className); @SuppressWarnings("unchecked") Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHFileHistogramE2E.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHFileHistogramE2E.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHFileHistogramE2E.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHFileHistogramE2E.java Wed Apr 2 20:49:31 2014 @@ -57,6 +57,7 @@ public class TestHFileHistogramE2E { assertTrue(hist != null); boolean first = true; List<Bucket> buckets = hist.getUniformBuckets(); + int idx = 0; assertTrue(buckets != null); assertTrue(buckets.size() > 0); Bucket prevBucket = buckets.get(0); @@ -64,10 +65,15 @@ public class TestHFileHistogramE2E { if (first) { first = false; prevBucket = b; + idx++; continue; } assertTrue(Bytes.compareTo(b.getStartRow(), prevBucket.getEndRow()) >= 0); - assertTrue(Bytes.compareTo(b.getEndRow(), prevBucket.getStartRow()) > 0); + assertTrue(Bytes.toStringBinary(b.getEndRow()) + " : " + + Bytes.toStringBinary(prevBucket.getStartRow()), + ++idx >= buckets.size() || // The last bucket + Bytes.compareTo(b.getEndRow(), prevBucket.getStartRow()) > 0); + prevBucket = b; } } Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestConversionUtils.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestConversionUtils.java?rev=1584170&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestConversionUtils.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestConversionUtils.java Wed Apr 2 20:49:31 2014 @@ -0,0 +1,56 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile.histogram; + +import java.util.Random; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import static org.apache.hadoop.hbase.io.hfile.histogram.UniformSplitHFileHistogram.*; +import static org.junit.Assert.*; + +public class TestConversionUtils { + + @Test + public void testDoubleConversion() { + double d = convertBytesToDouble(getPaddedInfinityArr()); + assertTrue("d:" + d, d > 0.0); + byte[] b = convertDoubleToBytes(d); + assertTrue(b.length == PADDING); + + for (int i = 0; i < 1; i++) { + testDoubleConversionOnce(); + } + } + + public void testDoubleConversionOnce() { + Random r = new Random(); + byte[] arr = new byte[PADDING]; + r.nextBytes(arr); + + double d = convertBytesToDouble(arr); + byte[] arrret = convertDoubleToBytes(d); + assertTrue("arr: " + Bytes.toStringBinary(arr) + ", arrret : " + + Bytes.toStringBinary(arrret), Bytes.longestCommonPrefix(arr, arrret) + >= 4); + } + +} Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestUniformSplitHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestUniformSplitHistogram.java?rev=1584170&r1=1584169&r2=1584170&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestUniformSplitHistogram.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestUniformSplitHistogram.java Wed Apr 2 20:49:31 2014 @@ -42,8 +42,11 @@ public class TestUniformSplitHistogram { List<Bucket> lst = hist.getUniformBuckets(); assertTrue(lst.size() > 0); Bucket prevBucket = null; + int bucketIndex = 0; for (Bucket b : lst) { + bucketIndex++; if (prevBucket != null) { + System.out.println(bucketIndex); assertTrue(Bytes.toStringBinary(b.getStartRow()) + " not greater than " + Bytes.toStringBinary(prevBucket.getStartRow()), Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestLeases.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestLeases.java?rev=1584170&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestLeases.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestLeases.java Wed Apr 2 20:49:31 2014 @@ -0,0 +1,154 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.LeaseException; +import org.apache.hadoop.hbase.LeaseListener; +import org.apache.hadoop.hbase.Leases; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; + +public class TestLeases { + private static final Log LOG = LogFactory.getLog(TestLeases.class); + private static final int NUM_CALLS = 100; + private static final int MAX_WAIT = 4; + private Leases leases; + private final int leasePeriod = 50; // ms + private final long wakeFreq = 1; // ms + + @Test + public void test() + throws LeaseException, InterruptedException, ExecutionException { + int cnt = 0; + for (int i = 0; i < 20; i++) { + if (testOneInstance()) cnt++; + } + } + + public boolean testOneInstance() + throws LeaseException, InterruptedException, ExecutionException { + final AtomicInteger expiredLeaseCnt = new AtomicInteger(0); + final AtomicInteger cancelledLeaseCnt = new AtomicInteger(0); + LOG.debug(String.format("Creating leases with lease period : %d, wake frequency : %d", + leasePeriod, (int)wakeFreq)); + + this.leases = new Leases(leasePeriod, wakeFreq); + leases.setDaemon(true); + leases.setName("Lease Thread"); + leases.start(); + + // Simulating a bunch of add scanner calls + final Random rand = new Random(); + final Random rand2 = new Random(); + final Map<Integer, Boolean> leaseIds = new ConcurrentHashMap<Integer, Boolean>(); + int numLeasesCreated = 0; + for (int i = 0; i < NUM_CALLS; i++) { + int leaseId = rand2.nextInt(); + try { + leases.createLease(String.valueOf(leaseId), + new MockLeaseListener(String.valueOf(leaseId), expiredLeaseCnt)); + numLeasesCreated++; + leaseIds.put(leaseId, true); + + // Testing the LeaseStillHeldException case on one of the attempts. + if (numLeasesCreated == NUM_CALLS/2) { + try { + leases.createLease(String.valueOf(leaseId), + new MockLeaseListener(String.valueOf(leaseId), expiredLeaseCnt)); + } catch (LeaseStillHeldException e) { + // It works + LOG.debug("Inserting duplicate lease id resulted in " + + "LeaseStillHeldException"); + continue; + } + assertTrue("Duplicate attempt of lease creation.", false); + } + + // Testing the + if (numLeasesCreated == NUM_CALLS/3) { + try { + leases.renewLease("invalid_lease_id"); + } catch (LeaseException e) { + // It works + LOG.debug("Renewing invalid id throws " + + "LeaseException"); + continue; + } + assertTrue("Duplicate attempt of lease creation.", false); + } + } catch (LeaseStillHeldException e) { + continue; + } + } + + for (int i = 0; i < NUM_CALLS / 2; i++) { + final int waitTime = rand.nextInt(MAX_WAIT * 10); + Threads.sleep(waitTime); + int idx = rand.nextInt(leaseIds.size()); + int leaseId = leaseIds.keySet().toArray(new Integer[0])[idx]; + if (idx % 2 == 0) { + try { + leases.cancelLease(String.valueOf(leaseId)); + LOG.debug("Lease cancelled :" + leaseId); + } catch (LeaseException e) { + continue; + } catch (Exception e) { + e.printStackTrace(); + } + cancelledLeaseCnt.addAndGet(1); + leaseIds.remove(leaseId); + } else { + try { + leases.renewLease(String.valueOf(leaseId)); + } catch (LeaseException e) { + } + } + } + + // wait for leases to drain all the leases. + leases.closeAfterLeasesExpire(); + leases.join(); + LOG.debug("LeaseIds.size() : " + leaseIds.size() + + ", expiredLeaseCnt.get() : " + expiredLeaseCnt.get() + + ", cancelledLeaseCnt.get() : " + cancelledLeaseCnt.get() + + ", numLeasesCreated : " + numLeasesCreated); + assertTrue("LeaseIds.size() : " + leaseIds.size() + + ", expiredLeaseCnt.get() : " + expiredLeaseCnt.get() + + ", cancelledLeaseCnt.get() : " + cancelledLeaseCnt.get() + + ", numLeasesCreated : " + numLeasesCreated, + expiredLeaseCnt.get() + cancelledLeaseCnt.get() >= + numLeasesCreated); + if (expiredLeaseCnt.get() + cancelledLeaseCnt.get() > + numLeasesCreated) { + // This is the case where the ConcurrentHashMap gives unpredicatable + // behavior because we aren't guarding readers and writers from each other + return false; + } + return true; + } + + private static class MockLeaseListener extends LeaseListener { + private final AtomicInteger leaseCnt; + private final String leaseName_visible; + MockLeaseListener(String leaseName, AtomicInteger leaseCnt) { + super(leaseName); + leaseName_visible = leaseName; + this.leaseCnt = leaseCnt; + } + + @Override + public void leaseExpired() { + LOG.debug("Expiring lease : " + leaseName_visible ); + this.leaseCnt.addAndGet(1); + } + } +}
