Author: stack
Date: Mon Mar 9 22:15:27 2009
New Revision: 751874
URL: http://svn.apache.org/viewvc?rev=751874&view=rev
Log:
HBASE-803 Atomic increment operations
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=751874&r1=751873&r2=751874&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Mar 9 22:15:27 2009
@@ -86,6 +86,7 @@
(Erik Holstad via Stack)
HBASE-1240 Would be nice if RowResult could be comparable
(Erik Holstad via Stack)
+ HBASE-803 Atomic increment operations (Ryan Rawson and Jon Gray via Stack)
Release 0.19.0 - 01/21/2009
INCOMPATIBLE CHANGES
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=751874&r1=751873&r2=751874&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Mon
Mar 9 22:15:27 2009
@@ -1479,6 +1479,18 @@
public ArrayList<BatchUpdate> getWriteBuffer() {
return writeBuffer;
}
+
+ public long incrementColumnValue(final byte [] row, final byte [] column,
+ final int amount) throws IOException {
+ return connection.getRegionServerWithRetries(
+ new ServerCallable<Long>(connection, tableName, row) {
+ public Long call() throws IOException {
+ return server.incrementColumnValue(
+ location.getRegionInfo().getRegionName(), row, column, amount);
+ }
+ }
+ );
+ }
/**
* Implements the scanner interface for the HBase client.
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=751874&r1=751873&r2=751874&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
Mon Mar 9 22:15:27 2009
@@ -68,7 +68,8 @@
* <li>Version 16: Removed HMasterRegionInterface.getRootRegionLocation and
* HMasterInterface.findRootRegion. We use ZooKeeper to store root region
* location instead.</li>
+ * <li>Version 17: Added incrementColumnValue.</li>
* </ul>
*/
- public static final long versionID = 16L;
+ public static final long versionID = 17L;
}
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=751874&r1=751873&r2=751874&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Mon Mar 9 22:15:27 2009
@@ -292,4 +292,18 @@
*/
public void unlockRow(final byte [] regionName, final long lockId)
throws IOException;
+
+ /**
+ * Atomically increments a column value. If the column value isn't
long-like, this could
+ * throw an exception.
+ *
+ * @param regionName
+ * @param row
+ * @param column
+ * @param amount
+ * @return new incremented column value
+ * @throws IOException
+ */
+ public long incrementColumnValue(byte [] regionName, byte [] row,
+ byte [] column, long amount) throws IOException;
}
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=751874&r1=751873&r2=751874&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Mon Mar 9 22:15:27 2009
@@ -43,6 +43,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ColumnNameParseException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -2585,4 +2586,79 @@
}
}
}
-}
\ No newline at end of file
+
+
+ public long incrementColumnValue(byte[] row, byte[] column, long amount)
throws IOException {
+ checkRow(row);
+ checkColumn(column);
+
+ Integer lid = obtainRowLock(row);
+ splitsAndClosesLock.readLock().lock();
+ try {
+ HStoreKey hsk = new HStoreKey(row, column);
+ long ts = System.currentTimeMillis();
+ byte [] value = null;
+ long newval; // the new value.
+
+ Store store = getStore(column);
+
+ List<Cell> c;
+ // Try the memcache first.
+ store.lock.readLock().lock();
+ try {
+ c = store.memcache.get(hsk, 1);
+ } finally {
+ store.lock.readLock().unlock();
+ }
+ if (c.size() == 1) {
+ // Use the memcache timestamp value.
+ LOG.debug("Overwriting the memcache value for " + Bytes.toString(row)
+ "/" + Bytes.toString(column));
+ ts = c.get(0).getTimestamp();
+ value = c.get(0).getValue();
+ } else if (c.size() > 1) {
+ throw new DoNotRetryIOException("more than 1 value returned in
incrementColumnValue from memcache");
+ }
+
+ if (value == null) {
+ // Check the store (including disk) for the previous value.
+ Cell[] cell = store.get(hsk, 1);
+ if (cell != null && cell.length == 1) {
+ LOG.debug("Using HFile previous value for " + Bytes.toString(row) +
"/" + Bytes.toString(column));
+ value = cell[0].getValue();
+ } else if (cell != null && c.size() > 1) {
+ throw new DoNotRetryIOException("more than 1 value returned in
incrementColumnValue from Store");
+ }
+ }
+
+ if (value == null) {
+ // Doesn't exist
+ LOG.debug("Creating new counter value for " + Bytes.toString(row) +
"/"+ Bytes.toString(column));
+ newval = amount;
+ } else {
+ newval = incrementBytes(value, amount);
+ }
+
+ BatchUpdate b = new BatchUpdate(row, ts);
+ b.put(column, Bytes.toBytes(newval));
+ batchUpdate(b, lid, true);
+ return newval;
+ } finally {
+ splitsAndClosesLock.readLock().unlock();
+ releaseRowLock(lid);
+ }
+ }
+
+ private long incrementBytes(byte[] value, long amount) throws IOException {
+ // Hopefully this doesn't happen too often.
+ if (value.length < Bytes.SIZEOF_LONG) {
+ byte [] newvalue = new byte[Bytes.SIZEOF_LONG];
+ System.arraycopy(value, 0, newvalue, newvalue.length - value.length,
value.length);
+ value = newvalue;
+ } else if (value.length > Bytes.SIZEOF_LONG) {
+ throw new DoNotRetryIOException("Increment Bytes - value too big: " +
value.length);
+ }
+ long v = Bytes.toLong(value);
+ v += amount;
+ return v;
+ }
+}
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=751874&r1=751873&r2=751874&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Mon Mar 9 22:15:27 2009
@@ -2287,4 +2287,34 @@
.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
doMain(args, regionServerClass);
}
+
+ /** {...@inheritdoc} */
+ public long incrementColumnValue(byte[] regionName, byte[] row,
+ byte[] column, long amount) throws IOException {
+ checkOpen();
+
+ NullPointerException npe = null;
+ if (regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if (row == null) {
+ npe = new NullPointerException("row is null");
+ } else if (column == null) {
+ npe = new NullPointerException("column is null");
+ }
+ if (npe != null) {
+ IOException io = new IOException(
+ "Invalid arguments to incrementColumnValue", npe);
+ throw io;
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ return region.incrementColumnValue(row, column, amount);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+
+
+ }
}