Author: larsh
Date: Wed Feb 8 18:30:37 2012
New Revision: 1242037
URL: http://svn.apache.org/viewvc?rev=1242037&view=rev
Log:
HBASE-5229 Provide basic building blocks for 'multi-row' local transactions.
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java?rev=1242037&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
Wed Feb 8 18:30:37 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.WrongRegionException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class demonstrates how to implement atomic multi row transactions using
+ * {@link HRegion#mutateRowsWithLocks(java.util.Collection,
java.util.Collection)}
+ * and Coprocessor endpoints.
+ */
+public class MultiRowMutationEndpoint extends BaseEndpointCoprocessor
implements
+ MultiRowMutationProtocol {
+
+ @Override
+ public void mutateRows(List<Mutation> mutations) throws IOException {
+ // get the coprocessor environment
+ RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)
getEnvironment();
+
+ // set of rows to lock, sorted to avoid deadlocks
+ SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+ HRegionInfo regionInfo = env.getRegion().getRegionInfo();
+ for (Mutation m : mutations) {
+ // check whether rows are in range for this region
+ if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
+ String msg = "Requested row out of range '"
+ + Bytes.toStringBinary(m.getRow()) + "'";
+ if (rowsToLock.isEmpty()) {
+ // if this is the first row, region might have moved,
+ // allow client to retry
+ throw new WrongRegionException(msg);
+ } else {
+ // rows are split between regions, do not retry
+ throw new DoNotRetryIOException(msg);
+ }
+ }
+ rowsToLock.add(m.getRow());
+ }
+ // call utility method on region
+ env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
+ }
+}
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java?rev=1242037&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java
Wed Feb 8 18:30:37 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Defines a protocol to perform multi row transactions.
+ * See {@link MultiRowMutationEndpoint} for the implementation.
+ * </br>
+ * See
+ * {@link HRegion#mutateRowsWithLocks(java.util.Collection,
java.util.Collection)}
+ * for details and limitations.
+ * </br>
+ * Example:
+ * <code><pre>
+ * List<Mutation> mutations = ...;
+ * Put p1 = new Put(row1);
+ * Put p2 = new Put(row2);
+ * ...
+ * mutations.add(p1);
+ * mutations.add(p2);
+ * MultiRowMutationProtocol mrOp = t.coprocessorProxy(
+ * MultiRowMutationProtocol.class, row1);
+ * mrOp.mutateRows(mutations);
+ * </pre></code>
+ */
+public interface MultiRowMutationProtocol extends CoprocessorProtocol {
+ public void mutateRows(List<Mutation> mutations) throws IOException;
+}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Wed Feb 8 18:30:37 2012
@@ -4148,12 +4148,26 @@ public class HRegion implements HeapSize
return results;
}
- public void mutateRow(RowMutation rm,
- Integer lockid) throws IOException {
+ public void mutateRow(RowMutation rm) throws IOException {
+ mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
+ }
+
+ /**
+ * Perform atomic mutations within the region.
+ * @param mutations The list of mutations to perform.
+ * <code>mutations</code> can contain operations for multiple rows.
+ * Caller has to ensure that all rows are contained in this region.
+ * @param rowsToLock Rows to lock
+ * If multiple rows are locked care should be taken that
+ * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
+ * @throws IOException
+ */
+ public void mutateRowsWithLocks(Collection<Mutation> mutations,
+ Collection<byte[]> rowsToLock) throws IOException {
boolean flush = false;
startRegionOperation();
- Integer lid = null;
+ List<Integer> acquiredLocks = null;
try {
// 1. run all pre-hooks before the atomic operation
// if any pre hook indicates "bypass", bypass the entire operation
@@ -4161,7 +4175,7 @@ public class HRegion implements HeapSize
// one WALEdit is used for all edits.
WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) {
- for (Mutation m : rm.getMutations()) {
+ for (Mutation m : mutations) {
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
// by pass everything
@@ -4178,8 +4192,17 @@ public class HRegion implements HeapSize
}
}
- // 2. acquire the row lock
- lid = getLock(lockid, rm.getRow(), true);
+ // 2. acquire the row lock(s)
+ acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
+ for (byte[] row : rowsToLock) {
+ // attempt to lock all involved rows, fail if one lock times out
+ Integer lid = getLock(null, row, true);
+ if (lid == null) {
+ throw new IOException("Failed to acquire lock on "
+ + Bytes.toStringBinary(row));
+ }
+ acquiredLocks.add(lid);
+ }
// 3. acquire the region lock
this.updatesLock.readLock().lock();
@@ -4191,7 +4214,7 @@ public class HRegion implements HeapSize
byte[] byteNow = Bytes.toBytes(now);
try {
// 5. Check mutations and apply edits to a single WALEdit
- for (Mutation m : rm.getMutations()) {
+ for (Mutation m : mutations) {
if (m instanceof Put) {
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
checkFamilies(familyMap.keySet());
@@ -4218,7 +4241,7 @@ public class HRegion implements HeapSize
// 7. apply to memstore
long addedSize = 0;
- for (Mutation m : rm.getMutations()) {
+ for (Mutation m : mutations) {
addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
}
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
@@ -4231,7 +4254,7 @@ public class HRegion implements HeapSize
}
// 10. run all coprocessor post hooks, after region lock is released
if (coprocessorHost != null) {
- for (Mutation m : rm.getMutations()) {
+ for (Mutation m : mutations) {
if (m instanceof Put) {
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
} else if (m instanceof Delete) {
@@ -4240,9 +4263,11 @@ public class HRegion implements HeapSize
}
}
} finally {
- if (lid != null) {
+ if (acquiredLocks != null) {
// 11. release the row lock
- releaseRowLock(lid);
+ for (Integer lid : acquiredLocks) {
+ releaseRowLock(lid);
+ }
}
if (flush) {
// 12. Flush cache if needed. Do it outside update lock.
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Wed Feb 8 18:30:37 2012
@@ -3160,7 +3160,7 @@ public class HRegionServer implements HR
throws IOException {
checkOpen();
if (regionName == null) {
- throw new IOException("Invalid arguments to atomicMutation " +
+ throw new IOException("Invalid arguments to mutateRow " +
"regionName is null");
}
requestCount.incrementAndGet();
@@ -3169,7 +3169,7 @@ public class HRegionServer implements HR
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
- region.mutateRow(rm, null);
+ region.mutateRow(rm);
} catch (IOException e) {
checkFileSystem();
throw e;
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Wed Feb 8 18:30:37 2012
@@ -36,7 +36,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -65,6 +64,9 @@ import org.apache.hadoop.hbase.LargeTest
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationProtocol;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -112,6 +114,9 @@ public class TestFromClientSide {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ MultiRowMutationEndpoint.class.getName());
// We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES);
}
@@ -4041,6 +4046,31 @@ public class TestFromClientSide {
}
@Test
+ public void testMultiRowMutation() throws Exception {
+ LOG.info("Starting testMultiRowMutation");
+ final byte [] TABLENAME = Bytes.toBytes("testMultiRowMutation");
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+
+ HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+ List<Mutation> mrm = new ArrayList<Mutation>();
+ Put p = new Put(ROW);
+ p.add(FAMILY, QUALIFIER, VALUE);
+ mrm.add(p);
+ p = new Put(ROW1);
+ p.add(FAMILY, QUALIFIER, VALUE);
+ mrm.add(p);
+ MultiRowMutationProtocol mr = t.coprocessorProxy(
+ MultiRowMutationProtocol.class, ROW);
+ mr.mutateRows(mrm);
+ Get g = new Get(ROW);
+ Result r = t.get(g);
+ assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
+ g = new Get(ROW1);
+ r = t.get(g);
+ assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
public void testRowMutation() throws Exception {
LOG.info("Starting testRowMutation");
final byte [] TABLENAME = Bytes.toBytes("testRowMutation");
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1242037&r1=1242036&r2=1242037&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Wed Feb 8 18:30:37 2012
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,11 +32,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -246,11 +251,11 @@ public class TestAtomicOperation extends
}
/**
- * Test multi-threaded increments.
+ * Test multi-threaded row mutations.
*/
public void testRowMutationMultiThreads() throws IOException {
- LOG.info("Starting test testMutationMultiThreads");
+ LOG.info("Starting test testRowMutationMultiThreads");
initHRegion(tableName, getName(), fam1);
// create 100 threads, each will alternate between adding and
@@ -263,7 +268,52 @@ public class TestAtomicOperation extends
AtomicInteger failures = new AtomicInteger(0);
// create all threads
for (int i = 0; i < numThreads; i++) {
- all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures);
+ all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures)
{
+ @Override
+ public void run() {
+ boolean op = true;
+ for (int i=0; i<numOps; i++) {
+ try {
+ // throw in some flushes
+ if (r.nextFloat() < 0.001) {
+ LOG.debug("flushing");
+ region.flushcache();
+ }
+ long ts = timeStamps.incrementAndGet();
+ RowMutation rm = new RowMutation(row);
+ if (op) {
+ Put p = new Put(row, ts);
+ p.add(fam1, qual1, value1);
+ rm.add(p);
+ Delete d = new Delete(row);
+ d.deleteColumns(fam1, qual2, ts);
+ rm.add(d);
+ } else {
+ Delete d = new Delete(row);
+ d.deleteColumns(fam1, qual1, ts);
+ rm.add(d);
+ Put p = new Put(row, ts);
+ p.add(fam1, qual2, value2);
+ rm.add(p);
+ }
+ region.mutateRow(rm);
+ op ^= true;
+ // check: should always see exactly one column
+ Get g = new Get(row);
+ Result r = region.get(g, null);
+ if (r.size() != 1) {
+ LOG.debug(r);
+ failures.incrementAndGet();
+ fail();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ failures.incrementAndGet();
+ fail();
+ }
+ }
+ }
+ };
}
// run all threads
@@ -282,62 +332,104 @@ public class TestAtomicOperation extends
}
+ /**
+ * Test multi-threaded region mutations.
+ */
+ public void testMultiRowMutationMultiThreads() throws IOException {
+
+ LOG.info("Starting test testMultiRowMutationMultiThreads");
+ initHRegion(tableName, getName(), fam1);
+
+ // create 100 threads, each will alternate between adding and
+ // removing a column
+ int numThreads = 100;
+ int opsPerThread = 1000;
+ AtomicOperation[] all = new AtomicOperation[numThreads];
+
+ AtomicLong timeStamps = new AtomicLong(0);
+ AtomicInteger failures = new AtomicInteger(0);
+ final List<byte[]> rowsToLock = Arrays.asList(row, row2);
+ // create all threads
+ for (int i = 0; i < numThreads; i++) {
+ all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures)
{
+ @Override
+ public void run() {
+ boolean op = true;
+ for (int i=0; i<numOps; i++) {
+ try {
+ // throw in some flushes
+ if (r.nextFloat() < 0.001) {
+ LOG.debug("flushing");
+ region.flushcache();
+ }
+ long ts = timeStamps.incrementAndGet();
+ List<Mutation> mrm = new ArrayList<Mutation>();
+ if (op) {
+ Put p = new Put(row2, ts);
+ p.add(fam1, qual1, value1);
+ mrm.add(p);
+ Delete d = new Delete(row);
+ d.deleteColumns(fam1, qual1, ts);
+ mrm.add(d);
+ } else {
+ Delete d = new Delete(row2);
+ d.deleteColumns(fam1, qual1, ts);
+ mrm.add(d);
+ Put p = new Put(row, ts);
+ p.add(fam1, qual1, value2);
+ mrm.add(p);
+ }
+ region.mutateRowsWithLocks(mrm, rowsToLock);
+ op ^= true;
+ // check: should always see exactly one column
+ Scan s = new Scan(row);
+ RegionScanner rs = region.getScanner(s);
+ List<KeyValue> r = new ArrayList<KeyValue>();
+ while(rs.next(r));
+ if (r.size() != 1) {
+ LOG.debug(r);
+ failures.incrementAndGet();
+ fail();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ failures.incrementAndGet();
+ fail();
+ }
+ }
+ }
+ };
+ }
+
+ // run all threads
+ for (int i = 0; i < numThreads; i++) {
+ all[i].start();
+ }
+
+ // wait for all threads to finish
+ for (int i = 0; i < numThreads; i++) {
+ try {
+ all[i].join();
+ } catch (InterruptedException e) {
+ }
+ }
+ assertEquals(0, failures.get());
+ }
+
public static class AtomicOperation extends Thread {
- private final HRegion region;
- private final int numOps;
- private final AtomicLong timeStamps;
- private final AtomicInteger failures;
- private final Random r = new Random();
- public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
AtomicInteger failures) {
+ protected final HRegion region;
+ protected final int numOps;
+ protected final AtomicLong timeStamps;
+ protected final AtomicInteger failures;
+ protected final Random r = new Random();
+
+ public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
+ AtomicInteger failures) {
this.region = region;
this.numOps = numOps;
this.timeStamps = timeStamps;
this.failures = failures;
}
- @Override
- public void run() {
- boolean op = true;
- for (int i=0; i<numOps; i++) {
- try {
- // throw in some flushes
- if (r.nextFloat() < 0.001) {
- LOG.debug("flushing");
- region.flushcache();
- }
- long ts = timeStamps.incrementAndGet();
- RowMutation arm = new RowMutation(row);
- if (op) {
- Put p = new Put(row, ts);
- p.add(fam1, qual1, value1);
- arm.add(p);
- Delete d = new Delete(row);
- d.deleteColumns(fam1, qual2, ts);
- arm.add(d);
- } else {
- Delete d = new Delete(row);
- d.deleteColumns(fam1, qual1, ts);
- arm.add(d);
- Put p = new Put(row, ts);
- p.add(fam1, qual2, value2);
- arm.add(p);
- }
- region.mutateRow(arm, null);
- op ^= true;
- // check: should always see exactly one column
- Get g = new Get(row);
- Result r = region.get(g, null);
- if (r.size() != 1) {
- LOG.debug(r);
- failures.incrementAndGet();
- fail();
- }
- } catch (IOException e) {
- e.printStackTrace();
- failures.incrementAndGet();
- fail();
- }
- }
- }
}
@org.junit.Rule