Author: rawson
Date: Tue Oct 19 01:01:52 2010
New Revision: 1024074
URL: http://svn.apache.org/viewvc?rev=1024074&view=rev
Log:
HBASE-2985 HRegionServer.multi() no longer calls HRegion.put(List) when
possible
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Oct 19 01:01:52 2010
@@ -596,6 +596,8 @@ Release 0.21.0 - Unreleased
HBASE-3121 [rest] Do not perform cache control when returning results
HBASE-2669 HCM.shutdownHook causes data loss with
hbase.client.write.buffer != 0
+ HBASE-2985 HRegionServer.multi() no longer calls HRegion.put(List) when
+ possible
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Tue Oct 19 01:01:52 2010
@@ -1068,7 +1068,7 @@ public class HConnectionManager {
}
private Callable<MultiResponse> createCallable(
- final HServerAddress address,
+ final HServerAddress address,
final MultiAction multi,
final byte [] tableName) {
final HConnection connection = this;
@@ -1098,11 +1098,11 @@ public class HConnectionManager {
if (results.length != list.size()) {
throw new IllegalArgumentException("argument results must be the same
size as argument list");
}
-
+
if (list.size() == 0) {
return;
}
-
+
List<Row> workingList = new ArrayList<Row>(list);
final boolean singletonList = (list.size() == 1);
boolean retry = true;
@@ -1114,8 +1114,8 @@ public class HConnectionManager {
if (tries >= 1) {
long sleepTime = getPauseTime(tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
- try {
- Thread.sleep(sleepTime);
+ try {
+ Thread.sleep(sleepTime);
} catch (InterruptedException ignore) {
LOG.debug("Interupted");
Thread.currentThread().interrupt();
@@ -1132,38 +1132,38 @@ public class HConnectionManager {
HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
HServerAddress address = loc.getServerAddress();
byte[] regionName = loc.getRegionInfo().getRegionName();
-
+
MultiAction actions = actionsByServer.get(address);
if (actions == null) {
actions = new MultiAction();
actionsByServer.put(address, actions);
}
-
- Action action = new Action(regionName, row, i);
+
+ Action action = new Action(regionName, row, i);
actions.add(regionName, action);
}
}
-
+
// step 2: make the requests
- Map<HServerAddress,Future<MultiResponse>> futures =
+ Map<HServerAddress,Future<MultiResponse>> futures =
new HashMap<HServerAddress,
Future<MultiResponse>>(actionsByServer.size());
-
+
for (Entry<HServerAddress, MultiAction> e :
actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(),
e.getValue(), tableName)));
}
-
+
// step 3: collect the failures and successes and prepare for retry
for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer :
futures.entrySet()) {
HServerAddress address = responsePerServer.getKey();
-
+
try {
// Gather the results for one server
Future<MultiResponse> future = responsePerServer.getValue();
// Not really sure what a reasonable timeout value is. Here's a
first try.
-
+
MultiResponse resp = future.get();
if (resp == null) {
@@ -1176,7 +1176,7 @@ public class HConnectionManager {
List<Pair<Integer, Result>> regionResults = e.getValue();
for (Pair<Integer, Result> regionResult : regionResults) {
if (regionResult == null) {
- // failed
+ // if the first/only record is 'null' the entire region
failed.
LOG.debug("Failures for region: " +
Bytes.toStringBinary(regionName) + ", removing from cache");
} else {
// success
@@ -1196,12 +1196,12 @@ public class HConnectionManager {
if (e.getCause() instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) e.getCause();
}
-
+
if (singletonList) {
// be richer for reporting in a 1 row case.
singleRowCause = e.getCause();
}
- }
+ }
}
// Find failures (i.e. null Result), and add them to the workingList
(in
@@ -1224,7 +1224,7 @@ public class HConnectionManager {
if (Thread.currentThread().isInterrupted()) {
throw new IOException("Aborting attempt because of a thread
interruption");
}
-
+
if (retry) {
// ran out of retries and didn't successfully finish everything!
if (singleRowCause != null) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue
Oct 19 01:01:52 2010
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Writ
/**
* Used to communicate with a single HBase table.
- *
+ *
* This class is not thread safe for updates; the underlying write buffer can
* be corrupted if multiple threads contend over a single HTable instance.
*
@@ -100,7 +100,7 @@ public class HTable implements HTableInt
/**
* Creates an object to access a HBase table.
* Internally it creates a new instance of {...@link Configuration} and a new
- * client to zookeeper as well as other resources. It also comes up with
+ * client to zookeeper as well as other resources. It also comes up with
* a fresh view of the cluster and must do discovery from scratch of region
* locations; i.e. it will not make use of already-cached region locations if
* available. Use only when being quick and dirty.
@@ -115,7 +115,7 @@ public class HTable implements HTableInt
/**
* Creates an object to access a HBase table.
* Internally it creates a new instance of {...@link Configuration} and a new
- * client to zookeeper as well as other resources. It also comes up with
+ * client to zookeeper as well as other resources. It also comes up with
* a fresh view of the cluster and must do discovery from scratch of region
* locations; i.e. it will not make use of already-cached region locations if
* available. Use only when being quick and dirty.
@@ -176,7 +176,7 @@ public class HTable implements HTableInt
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-
+
int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
if (nrThreads == 0) {
nrThreads = 1; // is there a better default?
@@ -551,7 +551,10 @@ public class HTable implements HTableInt
}
/**
- * Method that does a batch call on Deletes, Gets and Puts.
+ * Method that does a batch call on Deletes, Gets and Puts. The ordering of
+ * execution of the actions is not defined. Meaning if you do a Put and a
+ * Get in the same {...@link #batch} call, you will not necessarily be
+ * guaranteed that the Get returns what the Put had put.
*
* @param actions list of Get, Put, Delete objects
* @param results Empty Result[], same size as actions. Provides access to
partial
@@ -566,7 +569,7 @@ public class HTable implements HTableInt
/**
* Method that does a batch call on Deletes, Gets and Puts.
- *
+ *
* @param actions list of Get, Put, Delete objects
* @return the results from the actions. A null in the return array means
that
* the call for that action failed, even after retries
@@ -581,7 +584,7 @@ public class HTable implements HTableInt
/**
* Deletes the specified cells/row.
- *
+ *
* @param delete The object that specifies what to delete.
* @throws IOException if a remote or network exception occurs.
* @since 0.20.0
@@ -602,7 +605,7 @@ public class HTable implements HTableInt
/**
* Deletes the specified cells/rows in bulk.
* @param deletes List of things to delete. As a side effect, it will be
modified:
- * successful {...@link Delete}s are removed. The ordering of the list will
not change.
+ * successful {...@link Delete}s are removed. The ordering of the list will
not change.
* @throws IOException if a remote or network exception occurs. In that case
* the {...@code deletes} argument will contain the {...@link Delete}
instances
* that have not be successfully applied.
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
Tue Oct 19 01:01:52 2010
@@ -42,8 +42,8 @@ public class MultiResponse implements Wr
// map of regionName to list of (Results paired to the original index for
that
// Result)
- private Map<byte[], List<Pair<Integer, Result>>> results = new
TreeMap<byte[], List<Pair<Integer, Result>>>(
- Bytes.BYTES_COMPARATOR);
+ private Map<byte[], List<Pair<Integer, Result>>> results =
+ new TreeMap<byte[], List<Pair<Integer, Result>>>(Bytes.BYTES_COMPARATOR);
public MultiResponse() {
}
@@ -111,7 +111,7 @@ public class MultiResponse implements Wr
for (int j = 0; j < listSize; j++) {
Integer idx = in.readInt();
if (idx == -1) {
- lst.add(null);
+ lst.add(null);
} else {
Result r = (Result) HbaseObjectWritable.readObject(in, null);
lst.add(new Pair<Integer, Result>(idx, r));
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue Oct 19 01:01:52 2010
@@ -2330,9 +2330,11 @@ public class HRegionServer implements HR
// actions in the list.
Collections.sort(actionsForRegion);
Row action = null;
+ List<Action> puts = new ArrayList<Action>();
try {
for (Action a : actionsForRegion) {
action = a.getAction();
+ // TODO catch exceptions so we can report them on a per-item basis.
if (action instanceof Delete) {
delete(regionName, (Delete) action);
response.add(regionName, new Pair<Integer, Result>(
@@ -2341,14 +2343,50 @@ public class HRegionServer implements HR
response.add(regionName, new Pair<Integer, Result>(
a.getOriginalIndex(), get(regionName, (Get) action)));
} else if (action instanceof Put) {
- put(regionName, (Put) action);
- response.add(regionName, new Pair<Integer, Result>(
- a.getOriginalIndex(), new Result()));
+ puts.add(a);
} else {
LOG.debug("Error: invalid Action, row must be a Get, Delete or
Put.");
throw new IllegalArgumentException("Invalid Action, row must be a
Get, Delete or Put.");
}
}
+
+ // We do the puts with result.put so we can get the batching efficiency
+ // we so need. All this data munging doesn't seem great, but at least
+ // we arent copying bytes or anything.
+ if (!puts.isEmpty()) {
+ HRegion region = getRegion(regionName);
+ if (!region.getRegionInfo().isMetaTable()) {
+ this.cacheFlusher.reclaimMemStoreMemory();
+ }
+
+ Pair<Put,Integer> [] putsWithLocks = new Pair[puts.size()];
+ int i = 0;
+ for (Action a : puts) {
+ Put p = (Put) a.getAction();
+
+ Integer lock = getLockFromId(p.getLockId());
+ putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+ }
+
+ this.requestCount.addAndGet(puts.size());
+
+ OperationStatusCode[] codes = region.put(putsWithLocks);
+ for( i = 0 ; i < codes.length ; i++) {
+ OperationStatusCode code = codes[i];
+
+ Action theAction = puts.get(i);
+ Result result = null;
+
+ if (code == OperationStatusCode.SUCCESS) {
+ result = new Result();
+ }
+ // TODO turning the alternate exception into a different result
+
+ response.add(regionName,
+ new Pair<Integer, Result>(
+ theAction.getOriginalIndex(), result));
+ }
+ }
} catch (IOException ioe) {
if (multi.size() == 1) throw ioe;
LOG.debug("Exception processing " +
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
Tue Oct 19 01:01:52 2010
@@ -140,7 +140,7 @@ public class TestMultiParallel {
/**
* Only run one Multi test with a forced RegionServer abort. Otherwise, the
* unit tests will take an unnecessarily long time to run.
- *
+ *
* @throws Exception
*/
@Test public void testFlushCommitsWithAbort() throws Exception {
@@ -354,17 +354,11 @@ public class TestMultiParallel {
get.addColumn(BYTES_FAMILY, QUALIFIER);
actions.add(get);
- // 5 get of the put in #2 (entire family)
- get = new Get(KEYS[10]);
- get.addFamily(BYTES_FAMILY);
- actions.add(get);
-
- // 6 get of the delete from #3
- get = new Get(KEYS[20]);
- get.addColumn(BYTES_FAMILY, QUALIFIER);
- actions.add(get);
+ // There used to be a 'get' of a previous put here, but removed
+ // since this API really cannot guarantee order in terms of mixed
+ // get/puts.
- // 7 put of new column
+ // 5 put of new column
put = new Put(KEYS[40]);
put.add(BYTES_FAMILY, qual2, val2);
actions.add(put);
@@ -378,10 +372,7 @@ public class TestMultiParallel {
validateEmpty(results[2]);
validateEmpty(results[3]);
validateResult(results[4]);
- validateResult(results[5]);
- validateResult(results[5], qual2, val2); // testing second column in #5
- validateEmpty(results[6]); // deleted
- validateEmpty(results[7]);
+ validateEmpty(results[5]);
// validate last put, externally from the batch
get = new Get(KEYS[40]);