Author: stack
Date: Sat Sep 4 03:57:21 2010
New Revision: 992530
URL: http://svn.apache.org/viewvc?rev=992530&view=rev
Log:
HBASE-1845 MultiGet, MultiDelete, and MultiPut - batched to the appropriate
region servers; commit again -- was removed by hbase-2692 commit
Removed:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java Sat Sep
4 03:57:21 2010
@@ -60,7 +60,7 @@ import java.util.TreeSet;
* <p>
* To add a filter, execute {...@link #setFilter(Filter) setFilter}.
*/
-public class Get implements Writable {
+public class Get implements Writable, Row, Comparable<Row> {
private static final byte GET_VERSION = (byte)1;
private byte [] row = null;
@@ -325,6 +325,11 @@ public class Get implements Writable {
return sb.toString();
}
+ //Row
+ public int compareTo(Row other) {
+ return Bytes.compareTo(this.getRow(), other.getRow());
+ }
+
//Writable
public void readFields(final DataInput in)
throws IOException {
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
Sat Sep 4 03:57:21 2010
@@ -211,6 +211,21 @@ public interface HConnection {
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
+ /**
+ * Process a mixed batch of Get, Put and Delete actions. All actions for a
+ * RegionServer are forwarded in one RPC call.
+ *
+ * @param actions The collection of actions.
+ * @param tableName Name of the hbase table
+ * @param pool thread pool for parallel execution
+ * @param results An empty array, same size as list. If an exception is
thrown,
+ * you can test here for partial results, and to determine which actions
+ * processed successfully.
+ * @throws IOException
+ */
+ public void processBatch(List<Row> actions, final byte[] tableName,
+ ExecutorService pool, Result[] results)
+ throws IOException;
/**
* Process a batch of Puts. Does the retries.
@@ -218,20 +233,32 @@ public interface HConnection {
* @param tableName The name of the table
* @return Count of committed Puts. On fault, < list.size().
* @throws IOException if a remote or network exception occurs
+ * @deprecated Use HConnectionManager::processBatch instead.
*/
- public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
+ public int processBatchOfRows(ArrayList<Put> list, byte[] tableName,
ExecutorService pool)
throws IOException;
/**
* Process a batch of Deletes. Does the retries.
* @param list A batch of Deletes to process.
- * @return Count of committed Deletes. On fault, < list.size().
* @param tableName The name of the table
+ * @return Count of committed Deletes. On fault, < list.size().
* @throws IOException if a remote or network exception occurs
+ * @deprecated Use HConnectionManager::processBatch instead.
*/
- public int processBatchOfDeletes(List<Delete> list, byte[] tableName)
+ public int processBatchOfDeletes(List<Delete> list, byte[] tableName,
ExecutorService pool)
throws IOException;
+ /**
+ * Process a batch of Puts.
+ *
+ * @param list The collection of actions. The list is mutated: all
successful Puts
+ * are removed from the list.
+ * @param tableName Name of the hbase table
+ * @param pool Thread pool for parallel execution
+ * @throws IOException
+ * @deprecated Use HConnectionManager::processBatch instead.
+ */
public void processBatchOfPuts(List<Put> list,
final byte[] tableName, ExecutorService pool)
throws IOException;
@@ -248,7 +275,7 @@ public interface HConnection {
/**
* Check whether region cache prefetch is enabled or not.
* @param tableName name of table to check
- * @return true if table's region cache prefecth is enabled. Otherwise
+ * @return true if table's region cache prefetch is enabled. Otherwise
* it is disabled.
*/
public boolean getRegionCachePrefetch(final byte[] tableName);
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Sat Sep 4 03:57:21 2010
@@ -29,12 +29,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.Soft
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -863,7 +867,7 @@ public class HConnectionManager {
* Allows flushing the region cache.
*/
public void clearRegionCache() {
- cachedRegionLocations.clear();
+ cachedRegionLocations.clear();
}
/*
@@ -1105,176 +1109,38 @@ public class HConnectionManager {
return location;
}
- /*
- * Helper class for batch updates.
- * Holds code shared doing batch puts and batch deletes.
+ /**
+ * @deprecated Use HConnectionManager::processBatch instead.
*/
- private abstract class Batch {
- final HConnection c;
-
- private Batch(final HConnection c) {
- this.c = c;
- }
-
- /**
- * This is the method subclasses must implement.
- * @param currentList current list of rows
- * @param tableName table we are processing
- * @param row row
- * @return Count of items processed or -1 if all.
- * @throws IOException if a remote or network exception occurs
- * @throws RuntimeException other undefined exception
- */
- abstract int doCall(final List<? extends Row> currentList,
- final byte [] row, final byte [] tableName)
- throws IOException, RuntimeException;
-
- /**
- * Process the passed <code>list</code>.
- * @param list list of rows to process
- * @param tableName table we are processing
- * @return Count of how many added or -1 if all added.
- * @throws IOException if a remote or network exception occurs
- */
- int process(final List<? extends Row> list, final byte[] tableName)
- throws IOException {
- byte [] region = getRegionName(tableName, list.get(0).getRow(), false);
- byte [] currentRegion = region;
- boolean isLastRow;
- boolean retryOnlyOne = false;
- List<Row> currentList = new ArrayList<Row>();
- int i, tries;
- for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) {
- Row row = list.get(i);
- currentList.add(row);
- // If the next record goes to a new region, then we are to clear
- // currentList now during this cycle.
- isLastRow = (i + 1) == list.size();
- if (!isLastRow) {
- region = getRegionName(tableName, list.get(i + 1).getRow(), false);
- }
- if (!Bytes.equals(currentRegion, region) || isLastRow ||
retryOnlyOne) {
- int index = doCall(currentList, row.getRow(), tableName);
- // index is == -1 if all processed successfully, else its index
- // of last record successfully processed.
- if (index != -1) {
- if (tries == numRetries - 1) {
- throw new RetriesExhaustedException("Some server,
retryOnlyOne=" +
- retryOnlyOne + ", index=" + index + ", islastrow=" +
isLastRow +
- ", tries=" + tries + ", numtries=" + numRetries + ", i=" + i
+
- ", listsize=" + list.size() + ", region=" +
- Bytes.toStringBinary(region), currentRegion, row.getRow(),
- tries, new ArrayList<Throwable>());
- }
- tries = doBatchPause(currentRegion, tries);
- i = i - currentList.size() + index;
- retryOnlyOne = true;
- // Reload location.
- region = getRegionName(tableName, list.get(i + 1).getRow(),
true);
- } else {
- // Reset these flags/counters on successful batch Put
- retryOnlyOne = false;
- tries = 0;
- }
- currentRegion = region;
- currentList.clear();
- }
- }
- return i;
- }
-
- /*
- * @param t
- * @param r
- * @param re
- * @return Region name that holds passed row <code>r</code>
- * @throws IOException
- */
- private byte [] getRegionName(final byte [] t, final byte [] r,
- final boolean re)
- throws IOException {
- HRegionLocation location = getRegionLocationForRowWithRetries(t, r,
re);
- return location.getRegionInfo().getRegionName();
- }
-
- /*
- * Do pause processing before retrying...
- * @param currentRegion
- * @param tries
- * @return New value for tries.
- */
- private int doBatchPause(final byte [] currentRegion, final int tries) {
- int localTries = tries;
- long sleepTime = getPauseTime(tries);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) +
- " location because regionserver didn't accept updates; tries=" +
- tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms");
- }
- try {
- Thread.sleep(sleepTime);
- localTries++;
- } catch (InterruptedException e) {
- // continue
- }
- return localTries;
- }
- }
-
- public int processBatchOfRows(final ArrayList<Put> list,
- final byte[] tableName)
+ public int processBatchOfRows(final ArrayList<Put> list, final byte[]
tableName, ExecutorService pool)
throws IOException {
- if (list.isEmpty()) {
- return 0;
- }
- if (list.size() > 1) {
- Collections.sort(list);
- }
- Batch b = new Batch(this) {
- @SuppressWarnings("unchecked")
- @Override
- int doCall(final List<? extends Row> currentList, final byte [] row,
- final byte [] tableName)
- throws IOException, RuntimeException {
- final List<Put> puts = (List<Put>)currentList;
- return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
- tableName, row) {
- public Integer call() throws IOException {
- return server.put(location.getRegionInfo().getRegionName(),
puts);
- }
- });
+ Result[] results = new Result[list.size()];
+ processBatch((List) list, tableName, pool, results);
+ int count = 0;
+ for (Result r : results) {
+ if (r != null) {
+ count++;
}
- };
- return b.process(list, tableName);
+ }
+ return (count == list.size() ? -1 : count);
}
+ /**
+ * @deprecated Use HConnectionManager::processBatch instead.
+ */
public int processBatchOfDeletes(final List<Delete> list,
- final byte[] tableName)
+ final byte[] tableName, ExecutorService pool)
throws IOException {
- if (list.isEmpty()) {
- return 0;
- }
- if (list.size() > 1) {
- Collections.sort(list);
- }
- Batch b = new Batch(this) {
- @SuppressWarnings("unchecked")
- @Override
- int doCall(final List<? extends Row> currentList, final byte [] row,
- final byte [] tableName)
- throws IOException, RuntimeException {
- final List<Delete> deletes = (List<Delete>)currentList;
- return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
- tableName, row) {
- public Integer call() throws IOException {
- return server.delete(location.getRegionInfo().getRegionName(),
- deletes);
- }
- });
- }
- };
- return b.process(list, tableName);
+ Result[] results = new Result[list.size()];
+ processBatch((List) list, tableName, pool, results);
+ int count = 0;
+ for (Result r : results) {
+ if (r != null) {
+ count++;
+ }
}
+ return (count == list.size() ? -1 : count);
+ }
void close(boolean stopProxy) {
if (master != null) {
@@ -1291,168 +1157,196 @@ public class HConnectionManager {
}
}
- /**
- * Process a batch of Puts on the given executor service.
- *
- * @param list the puts to make - successful puts will be removed.
- * @param pool thread pool to execute requests on
- *
- * In the case of an exception, we take different actions depending on the
- * situation:
- * - If the exception is a DoNotRetryException, we rethrow it and leave
the
- * 'list' parameter in an indeterminate state.
- * - If the 'list' parameter is a singleton, we directly throw the
specific
- * exception for that put.
- * - Otherwise, we throw a generic exception indicating that an error
occurred.
- * The 'list' parameter is mutated to contain those puts that did not
succeed.
- */
- public void processBatchOfPuts(List<Put> list,
- final byte[] tableName, ExecutorService
pool) throws IOException {
- boolean singletonList = list.size() == 1;
+ private Callable<MultiResponse> createCallable(
+ final HServerAddress address,
+ final MultiAction multi,
+ final byte [] tableName) {
+ final HConnection connection = this;
+ return new Callable<MultiResponse>() {
+ public MultiResponse call() throws IOException {
+ return getRegionServerWithoutRetries(
+ new ServerCallable<MultiResponse>(connection, tableName,
null) {
+ public MultiResponse call() throws IOException {
+ return server.multi(multi);
+ }
+ @Override
+ public void instantiateServer(boolean reload) throws
IOException {
+ server = connection.getHRegionConnection(address);
+ }
+ }
+ );
+ }
+ };
+ }
+
+ public void processBatch(List<Row> list,
+ final byte[] tableName,
+ ExecutorService pool,
+ Result[] results) throws IOException {
+
+ // results must be the same size as list
+ if (results.length != list.size()) {
+ throw new IllegalArgumentException("argument results must be the same
size as argument list");
+ }
+
+ if (list.size() == 0) {
+ return;
+ }
+
+ List<Row> workingList = new ArrayList<Row>(list);
+ final boolean singletonList = (list.size() == 1);
+ boolean retry = true;
Throwable singleRowCause = null;
- for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
- Collections.sort(list);
- Map<HServerAddress, MultiPut> regionPuts =
- new HashMap<HServerAddress, MultiPut>();
- // step 1:
- // break up into regionserver-sized chunks and build the data structs
- for ( Put put : list ) {
- byte [] row = put.getRow();
-
- HRegionLocation loc = locateRegion(tableName, row, true);
- HServerAddress address = loc.getServerAddress();
- byte [] regionName = loc.getRegionInfo().getRegionName();
-
- MultiPut mput = regionPuts.get(address);
- if (mput == null) {
- mput = new MultiPut(address);
- regionPuts.put(address, mput);
- }
- mput.add(regionName, put);
- }
-
- // step 2:
- // make the requests
- // Discard the map, just use a list now, makes error recovery easier.
- List<MultiPut> multiPuts = new
ArrayList<MultiPut>(regionPuts.values());
-
- List<Future<MultiPutResponse>> futures =
- new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
- for ( MultiPut put : multiPuts ) {
- futures.add(pool.submit(createPutCallable(put.address,
- put,
- tableName)));
- }
- // RUN!
- List<Put> failed = new ArrayList<Put>();
-
- // step 3:
- // collect the failures and tries from step 1.
- for (int i = 0; i < futures.size(); i++ ) {
- Future<MultiPutResponse> future = futures.get(i);
- MultiPut request = multiPuts.get(i);
+
+ for (int tries = 0; tries < numRetries && retry; ++tries) {
+
+ // sleep first, if this is a retry
+ if (tries >= 1) {
+ long sleepTime = getPauseTime(tries);
+ LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ignore) {
+ LOG.debug("Interupted");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // step 1: break up into regionserver-sized chunks and build the data
structs
+
+ Map<HServerAddress, MultiAction> actionsByServer = new
HashMap<HServerAddress, MultiAction>();
+ for (int i=0; i<workingList.size(); i++) {
+ Row row = workingList.get(i);
+ if (row != null) {
+ HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
+ HServerAddress address = loc.getServerAddress();
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+
+ MultiAction actions = actionsByServer.get(address);
+ if (actions == null) {
+ actions = new MultiAction();
+ actionsByServer.put(address, actions);
+ }
+
+ Action action = new Action(regionName, row, i);
+ actions.add(regionName, action);
+ }
+ }
+
+ // step 2: make the requests
+
+ Map<HServerAddress,Future<MultiResponse>> futures =
+ new HashMap<HServerAddress,
Future<MultiResponse>>(actionsByServer.size());
+
+ for (Entry<HServerAddress, MultiAction> e :
actionsByServer.entrySet()) {
+ futures.put(e.getKey(), pool.submit(createCallable(e.getKey(),
e.getValue(), tableName)));
+ }
+
+ // step 3: collect the failures and successes and prepare for retry
+
+ for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer :
futures.entrySet()) {
+ HServerAddress address = responsePerServer.getKey();
+
try {
- MultiPutResponse resp = future.get();
+ // Gather the results for one server
+ Future<MultiResponse> future = responsePerServer.getValue();
- // For each region
- for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
- Integer result = resp.getAnswer(e.getKey());
- if (result == null) {
- // failed
- LOG.debug("Failed all for region: " +
- Bytes.toStringBinary(e.getKey()) + ", removing from
cache");
- failed.addAll(e.getValue());
- } else if (result >= 0) {
- // some failures
- List<Put> lst = e.getValue();
- failed.addAll(lst.subList(result, lst.size()));
- LOG.debug("Failed past " + result + " for region: " +
- Bytes.toStringBinary(e.getKey()) + ", removing from
cache");
+ // Not really sure what a reasonable timeout value is. Here's a
first try.
+
+ MultiResponse resp = future.get(1000, TimeUnit.MILLISECONDS);
+
+ if (resp == null) {
+ // Entire server failed
+ LOG.debug("Failed all for server: " + address + ", removing from
cache");
+ } else {
+ // For each region
+ for (Entry<byte[], List<Pair<Integer,Result>>> e :
resp.getResults().entrySet()) {
+ byte[] regionName = e.getKey();
+ List<Pair<Integer, Result>> regionResults = e.getValue();
+ for (int i = 0; i < regionResults.size(); i++) {
+ Pair<Integer, Result> regionResult = regionResults.get(i);
+ if (regionResult.getSecond() == null) {
+ // failed
+ LOG.debug("Failures for region: " +
Bytes.toStringBinary(regionName) + ", removing from cache");
+ } else {
+ // success
+ results[regionResult.getFirst()] =
regionResult.getSecond();
+ }
+ }
}
}
+ } catch (TimeoutException e) {
+ LOG.debug("Timeout for region server: " + address + ", removing
from cache");
} catch (InterruptedException e) {
- // go into the failed list.
- LOG.debug("Failed all from " + request.address, e);
- failed.addAll(request.allPuts());
+ LOG.debug("Failed all from " + address, e);
+ Thread.currentThread().interrupt();
+ break;
} catch (ExecutionException e) {
- Throwable cause = e.getCause();
- // Don't print stack trace if NSRE; NSRE is 'normal' operation.
- if (cause instanceof NotServingRegionException) {
- String msg = cause.getMessage();
- if (msg != null && msg.length() > 0) {
- // msg is the exception as a String... we just want first line.
- msg = msg.split("[\\n\\r]+\\s*at")[0];
- }
- LOG.debug("Failed execution of all on " + request.address +
- " because: " + msg);
- } else {
- // all go into the failed list.
- LOG.debug("Failed execution of all on " + request.address,
- e.getCause());
- }
- failed.addAll(request.allPuts());
+ LOG.debug("Failed all from " + address, e);
- // Just give up, leaving the batch put list in an
untouched/semi-committed state
+ // Just give up, leaving the batch incomplete
if (e.getCause() instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) e.getCause();
}
-
+
if (singletonList) {
// be richer for reporting in a 1 row case.
singleRowCause = e.getCause();
}
- }
+ }
}
- list.clear();
- if (!failed.isEmpty()) {
- for (Put failedPut: failed) {
- deleteCachedLocation(tableName, failedPut.getRow());
- }
-
- list.addAll(failed);
- long sleepTime = getPauseTime(tries);
- LOG.debug("processBatchOfPuts had some failures, sleeping for " +
sleepTime +
- " ms!");
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException ignored) {
+ // Find failures (i.e. null Result), and add them to the workingList
(in
+ // order), so they can be retried.
+ retry = false;
+ workingList.clear();
+ for (int i = 0; i < results.length; i++) {
+ if (results[i] == null) {
+ retry = true;
+ Row row = list.get(i);
+ workingList.add(row);
+ deleteCachedLocation(tableName, row.getRow());
+ } else {
+ // add null to workingList, so the order remains consistent with
the original list argument.
+ workingList.add(null);
}
}
}
- if (!list.isEmpty()) {
- if (singletonList && singleRowCause != null) {
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new IOException("Aborting attempt because of a thread
interruption");
+ }
+
+ if (retry) {
+ // ran out of retries and didn't successfully finish everything!
+ if (singleRowCause != null) {
throw new IOException(singleRowCause);
+ } else {
+ throw new RetriesExhaustedException("Still had " + workingList.size()
+ + " actions left after retrying " + numRetries + " times.");
}
-
- // ran out of retries and didnt succeed everything!
- throw new RetriesExhaustedException("Still had " + list.size() + "
puts left after retrying " +
- numRetries + " times.");
}
}
-
- private Callable<MultiPutResponse> createPutCallable(
- final HServerAddress address, final MultiPut puts,
- final byte [] tableName) {
- final HConnection connection = this;
- return new Callable<MultiPutResponse>() {
- public MultiPutResponse call() throws IOException {
- return getRegionServerWithoutRetries(
- new ServerCallable<MultiPutResponse>(connection, tableName,
null) {
- public MultiPutResponse call() throws IOException {
- MultiPutResponse resp = server.multiPut(puts);
- resp.request = puts;
- return resp;
- }
- @Override
- public void instantiateServer(boolean reload) throws
IOException {
- server = connection.getHRegionConnection(address);
- }
- }
- );
+ /**
+ * @deprecated Use HConnectionManager::processBatch instead.
+ */
+ public void processBatchOfPuts(List<Put> list,
+ final byte[] tableName,
+ ExecutorService pool) throws IOException {
+ Result[] results = new Result[list.size()];
+ processBatch((List) list, tableName, pool, results);
+
+ // mutate list so that it is empty for complete success, or contains
only failed records
+ // results are returned in the same order as the requests in list
+ // walk the list backwards, so we can remove from list without impacting
the indexes of earlier members
+ for (int i = results.length - 1; i>=0; i--) {
+ // if result is not null, it succeeded
+ if (results[i] != null) {
+ list.remove(i);
}
- };
+ }
}
private Throwable translateException(Throwable t) throws IOException {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat
Sep 4 03:57:21 2010
@@ -46,6 +46,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -77,7 +78,7 @@ public class HTable implements HTableInt
private long currentWriteBufferSize;
protected int scannerCaching;
private int maxKeyValueSize;
-
+ private ExecutorService pool; // For Multi
private long maxScannerResultSize;
/**
@@ -144,13 +145,11 @@ public class HTable implements HTableInt
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-
- int nrHRS = getCurrentNrHRS();
- if (nrHRS == 0) {
- // No servers running -- set default of 10 threads.
- nrHRS = 10;
+
+ int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
+ if (nrThreads == 0) {
+ nrThreads = 1; // is there a better default?
}
- int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS);
// Unfortunately Executors.newCachedThreadPool does not allow us to
// set the maximum size of the pool, so we have to do it ourselves.
@@ -175,9 +174,6 @@ public class HTable implements HTableInt
return admin.getClusterStatus().getServers();
}
- // For multiput
- private ExecutorService pool;
-
/**
* Tells whether or not a table is enabled or not.
* @param tableName Name of table to check.
@@ -508,6 +504,40 @@ public class HTable implements HTableInt
);
}
+ /**
+ * Method that does a batch call on Deletes, Gets and Puts.
+ *
+ * @param actions list of Get, Put, Delete objects
+ * @param results Empty Result[], same size as actions. Provides access to
partial
+ * results, in case an exception is thrown. A null in the result array means
that
+ * the call for that action failed, even after retries
+ * @throws IOException
+ */
+ public synchronized void batch(final List<Row> actions, final Result[]
results) throws IOException {
+ connection.processBatch(actions, tableName, pool, results);
+ }
+
+ /**
+ * Method that does a batch call on Deletes, Gets and Puts.
+ *
+ * @param actions list of Get, Put, Delete objects
+ * @return the results from the actions. A null in the return array means
that
+ * the call for that action failed, even after retries
+ * @throws IOException
+ */
+ public synchronized Result[] batch(final List<Row> actions) throws
IOException {
+ Result[] results = new Result[actions.size()];
+ connection.processBatch(actions, tableName, pool, results);
+ return results;
+ }
+
+ /**
+ * Deletes the specified cells/row.
+ *
+ * @param delete The object that specifies what to delete.
+ * @throws IOException if a remote or network exception occurs.
+ * @since 0.20.0
+ */
public void delete(final Delete delete)
throws IOException {
connection.getRegionServerWithRetries(
@@ -520,13 +550,28 @@ public class HTable implements HTableInt
);
}
+ /**
+ * Deletes the specified cells/rows in bulk.
+ * @param deletes List of things to delete. As a side effect, it will be
modified:
+ * successful {...@link Delete}s are removed. The ordering of the list will
not change.
+ * @throws IOException if a remote or network exception occurs. In that case
+ * the {...@code deletes} argument will contain the {...@link Delete}
instances
+ * that have not be successfully applied.
+ * @since 0.20.1
+ */
public void delete(final List<Delete> deletes)
throws IOException {
- int last = 0;
- try {
- last = connection.processBatchOfDeletes(deletes, this.tableName);
- } finally {
- deletes.subList(0, last).clear();
+ Result[] results = new Result[deletes.size()];
+ connection.processBatch((List) deletes, tableName, pool, results);
+
+ // mutate list so that it is empty for complete success, or contains only
failed records
+ // results are returned in the same order as the requests in list
+ // walk the list backwards, so we can remove from list without impacting
the indexes of earlier members
+ for (int i = results.length - 1; i>=0; i--) {
+ // if result is not null, it succeeded
+ if (results[i] != null) {
+ deletes.remove(i);
+ }
}
}
@@ -657,10 +702,17 @@ public class HTable implements HTableInt
);
}
+ /**
+ * Executes all the buffered {...@link Put} operations.
+ * <p>
+ * This method gets called once automatically for every {...@link Put} or
batch
+ * of {...@link Put}s (when {...@link #batch(List)} is used) when
+ * {...@link #isAutoFlush()} is {...@code true}.
+ * @throws IOException if a remote or network exception occurs.
+ */
public void flushCommits() throws IOException {
try {
- connection.processBatchOfPuts(writeBuffer,
- tableName, pool);
+ connection.processBatchOfPuts(writeBuffer, tableName, pool);
} finally {
// the write buffer was adjusted by processBatchOfPuts
currentWriteBufferSize = 0;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java Sat
Sep 4 03:57:21 2010
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.TreeMap;
/**
+ * @deprecated Use MultiAction instead
* Data type class for putting multiple regions worth of puts in one RPC.
*/
public class MultiPut implements Writable {
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
Sat Sep 4 03:57:21 2010
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.TreeMap;
/**
+ * @deprecated Replaced by MultiResponse
* Response class for MultiPut.
*/
public class MultiPutResponse implements Writable {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java Sat Sep
4 03:57:21 2010
@@ -29,5 +29,4 @@ public interface Row extends WritableCom
* @return The row.
*/
public byte [] getRow();
-
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
Sat Sep 4 03:57:21 2010
@@ -46,8 +46,12 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
@@ -187,6 +191,13 @@ public class HbaseObjectWritable impleme
addToMap(NavigableSet.class, code++);
addToMap(ColumnPrefixFilter.class, code++);
+
+ // Multi
+ addToMap(Row.class, code++);
+ addToMap(Action.class, code++);
+ addToMap(MultiAction.class, code++);
+ addToMap(MultiResponse.class, code++);
+
}
private Class<?> declaredClass;
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Sat Sep 4 03:57:21 2010
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.NotServin
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.Put;
@@ -266,6 +268,13 @@ public interface HRegionInterface extend
*/
public HServerInfo getHServerInfo() throws IOException;
+ /**
+ * Method used for doing multiple actions(Deletes, Gets and Puts) in one call
+ * @param multi
+ * @return MultiResult
+ * @throws IOException
+ */
+ public MultiResponse multi(MultiAction multi) throws IOException;
/**
* Multi put for putting multiple regions worth of puts at once.
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=992530&r1=992529&r2=992530&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat Sep 4 03:57:21 2010
@@ -82,6 +82,10 @@ import org.apache.hadoop.hbase.catalog.C
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiResponse;
+import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -2205,6 +2209,54 @@ public class HRegionServer implements HR
}
@Override
+ public MultiResponse multi(MultiAction multi) throws IOException {
+ MultiResponse response = new MultiResponse();
+ for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
+ byte[] regionName = e.getKey();
+ List<Action> actionsForRegion = e.getValue();
+ // sort based on the row id - this helps in the case where we reach the
+ // end of a region, so that we don't have to try the rest of the
+ // actions in the list.
+ Collections.sort(actionsForRegion);
+ Row action = null;
+ try {
+ for (Action a : actionsForRegion) {
+ action = a.getAction();
+ if (action instanceof Delete) {
+ delete(regionName, (Delete) action);
+ response.add(regionName, new Pair<Integer, Result>(
+ a.getOriginalIndex(), new Result()));
+ } else if (action instanceof Get) {
+ response.add(regionName, new Pair<Integer, Result>(
+ a.getOriginalIndex(), get(regionName, (Get) action)));
+ } else if (action instanceof Put) {
+ put(regionName, (Put) action);
+ response.add(regionName, new Pair<Integer, Result>(
+ a.getOriginalIndex(), new Result()));
+ } else {
+ LOG.debug("Error: invalid Action, row must be a Get, Delete or
Put.");
+ throw new IllegalArgumentException("Invalid Action, row must be a
Get, Delete or Put.");
+ }
+ }
+ } catch (IOException ioe) {
+ if (multi.size() == 1) {
+ throw ioe;
+ } else {
+ LOG.error("Exception found while attempting " + action.toString()
+ + " " + StringUtils.stringifyException(ioe));
+ response.add(regionName,null);
+ // stop processing on this region, continue to the next.
+ }
+ }
+ }
+
+ return response;
+ }
+
+ /**
+ * @deprecated Use HRegionServer.multi( MultiAction action) instead
+ */
+ @Override
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
MultiPutResponse resp = new MultiPutResponse();
@@ -2246,7 +2298,7 @@ public class HRegionServer implements HR
public CompactionRequestor getCompactionRequester() {
return this.compactSplitThread;
}
-
+
//
// Main program and support routines
//