Author: rawson
Date: Tue Oct 19 01:01:52 2010
New Revision: 1024074

URL: http://svn.apache.org/viewvc?rev=1024074&view=rev
Log:
HBASE-2985  HRegionServer.multi() no longer calls HRegion.put(List) when 
possible


Modified:
    hbase/trunk/CHANGES.txt
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Oct 19 01:01:52 2010
@@ -596,6 +596,8 @@ Release 0.21.0 - Unreleased
    HBASE-3121  [rest] Do not perform cache control when returning results
    HBASE-2669  HCM.shutdownHook causes data loss with
                hbase.client.write.buffer != 0
+   HBASE-2985  HRegionServer.multi() no longer calls HRegion.put(List) when 
+              possible
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
 Tue Oct 19 01:01:52 2010
@@ -1068,7 +1068,7 @@ public class HConnectionManager {
     }
 
     private Callable<MultiResponse> createCallable(
-        final HServerAddress address, 
+        final HServerAddress address,
         final MultiAction multi,
         final byte [] tableName) {
          final HConnection connection = this;
@@ -1098,11 +1098,11 @@ public class HConnectionManager {
       if (results.length != list.size()) {
         throw new IllegalArgumentException("argument results must be the same 
size as argument list");
       }
-      
+
       if (list.size() == 0) {
         return;
       }
-      
+
       List<Row> workingList = new ArrayList<Row>(list);
       final boolean singletonList = (list.size() == 1);
       boolean retry = true;
@@ -1114,8 +1114,8 @@ public class HConnectionManager {
         if (tries >= 1) {
           long sleepTime = getPauseTime(tries);
           LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
-          try { 
-            Thread.sleep(sleepTime); 
+          try {
+            Thread.sleep(sleepTime);
           } catch (InterruptedException ignore) {
             LOG.debug("Interupted");
             Thread.currentThread().interrupt();
@@ -1132,38 +1132,38 @@ public class HConnectionManager {
             HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
             HServerAddress address = loc.getServerAddress();
             byte[] regionName = loc.getRegionInfo().getRegionName();
-  
+
             MultiAction actions = actionsByServer.get(address);
             if (actions == null) {
               actions = new MultiAction();
               actionsByServer.put(address, actions);
             }
-            
-            Action action = new Action(regionName, row, i); 
+
+            Action action = new Action(regionName, row, i);
             actions.add(regionName, action);
           }
         }
-        
+
         // step 2: make the requests
 
-        Map<HServerAddress,Future<MultiResponse>> futures = 
+        Map<HServerAddress,Future<MultiResponse>> futures =
             new HashMap<HServerAddress, 
Future<MultiResponse>>(actionsByServer.size());
-         
+
         for (Entry<HServerAddress, MultiAction> e : 
actionsByServer.entrySet()) {
           futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), 
e.getValue(), tableName)));
         }
-        
+
         // step 3: collect the failures and successes and prepare for retry
 
         for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : 
futures.entrySet()) {
           HServerAddress address = responsePerServer.getKey();
-          
+
           try {
             // Gather the results for one server
             Future<MultiResponse> future = responsePerServer.getValue();
 
             // Not really sure what a reasonable timeout value is. Here's a 
first try.
-            
+
             MultiResponse resp = future.get();
 
             if (resp == null) {
@@ -1176,7 +1176,7 @@ public class HConnectionManager {
                 List<Pair<Integer, Result>> regionResults = e.getValue();
                 for (Pair<Integer, Result> regionResult : regionResults) {
                   if (regionResult == null) {
-                    // failed
+                    // if the first/only record is 'null' the entire region 
failed.
                     LOG.debug("Failures for region: " + 
Bytes.toStringBinary(regionName) + ", removing from cache");
                   } else {
                     // success
@@ -1196,12 +1196,12 @@ public class HConnectionManager {
             if (e.getCause() instanceof DoNotRetryIOException) {
               throw (DoNotRetryIOException) e.getCause();
             }
-            
+
             if (singletonList) {
               // be richer for reporting in a 1 row case.
               singleRowCause = e.getCause();
             }
-          } 
+          }
         }
 
         // Find failures (i.e. null Result), and add them to the workingList 
(in
@@ -1224,7 +1224,7 @@ public class HConnectionManager {
       if (Thread.currentThread().isInterrupted()) {
         throw new IOException("Aborting attempt because of a thread 
interruption");
       }
-      
+
       if (retry) {
         // ran out of retries and didn't successfully finish everything!
         if (singleRowCause != null) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue 
Oct 19 01:01:52 2010
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Writ
 
 /**
  * Used to communicate with a single HBase table.
- * 
+ *
  * This class is not thread safe for updates; the underlying write buffer can
  * be corrupted if multiple threads contend over a single HTable instance.
  *
@@ -100,7 +100,7 @@ public class HTable implements HTableInt
   /**
    * Creates an object to access a HBase table.
    * Internally it creates a new instance of {...@link Configuration} and a new
-   * client to zookeeper as well as other resources.  It also comes up with 
+   * client to zookeeper as well as other resources.  It also comes up with
    * a fresh view of the cluster and must do discovery from scratch of region
    * locations; i.e. it will not make use of already-cached region locations if
    * available. Use only when being quick and dirty.
@@ -115,7 +115,7 @@ public class HTable implements HTableInt
   /**
    * Creates an object to access a HBase table.
    * Internally it creates a new instance of {...@link Configuration} and a new
-   * client to zookeeper as well as other resources.  It also comes up with 
+   * client to zookeeper as well as other resources.  It also comes up with
    * a fresh view of the cluster and must do discovery from scratch of region
    * locations; i.e. it will not make use of already-cached region locations if
    * available. Use only when being quick and dirty.
@@ -176,7 +176,7 @@ public class HTable implements HTableInt
       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-    
+
     int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
     if (nrThreads == 0) {
       nrThreads = 1; // is there a better default?
@@ -551,7 +551,10 @@ public class HTable implements HTableInt
   }
 
   /**
-   * Method that does a batch call on Deletes, Gets and Puts.
+   * Method that does a batch call on Deletes, Gets and Puts.  The ordering of
+   * execution of the actions is not defined. Meaning if you do a Put and a
+   * Get in the same {...@link #batch} call, you will not necessarily be
+   * guaranteed that the Get returns what the Put had put.
    *
    * @param actions list of Get, Put, Delete objects
    * @param results Empty Result[], same size as actions. Provides access to 
partial
@@ -566,7 +569,7 @@ public class HTable implements HTableInt
 
   /**
    * Method that does a batch call on Deletes, Gets and Puts.
-   * 
+   *
    * @param actions list of Get, Put, Delete objects
    * @return the results from the actions. A null in the return array means 
that
    * the call for that action failed, even after retries
@@ -581,7 +584,7 @@ public class HTable implements HTableInt
 
   /**
    * Deletes the specified cells/row.
-   * 
+   *
    * @param delete The object that specifies what to delete.
    * @throws IOException if a remote or network exception occurs.
    * @since 0.20.0
@@ -602,7 +605,7 @@ public class HTable implements HTableInt
   /**
    * Deletes the specified cells/rows in bulk.
    * @param deletes List of things to delete. As a side effect, it will be 
modified:
-   * successful {...@link Delete}s are removed. The ordering of the list will 
not change. 
+   * successful {...@link Delete}s are removed. The ordering of the list will 
not change.
    * @throws IOException if a remote or network exception occurs. In that case
    * the {...@code deletes} argument will contain the {...@link Delete} 
instances
    * that have not be successfully applied.

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java 
Tue Oct 19 01:01:52 2010
@@ -42,8 +42,8 @@ public class MultiResponse implements Wr
 
   // map of regionName to list of (Results paired to the original index for 
that
   // Result)
-  private Map<byte[], List<Pair<Integer, Result>>> results = new 
TreeMap<byte[], List<Pair<Integer, Result>>>(
-      Bytes.BYTES_COMPARATOR);
+  private Map<byte[], List<Pair<Integer, Result>>> results =
+      new TreeMap<byte[], List<Pair<Integer, Result>>>(Bytes.BYTES_COMPARATOR);
 
   public MultiResponse() {
   }
@@ -111,7 +111,7 @@ public class MultiResponse implements Wr
       for (int j = 0; j < listSize; j++) {
         Integer idx = in.readInt();
         if (idx == -1) {
-          lst.add(null); 
+          lst.add(null);
         } else {
           Result r = (Result) HbaseObjectWritable.readObject(in, null);
           lst.add(new Pair<Integer, Result>(idx, r));

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 Tue Oct 19 01:01:52 2010
@@ -2330,9 +2330,11 @@ public class HRegionServer implements HR
       // actions in the list.
       Collections.sort(actionsForRegion);
       Row action = null;
+      List<Action> puts = new ArrayList<Action>();
       try {
         for (Action a : actionsForRegion) {
           action = a.getAction();
+          // TODO catch exceptions so we can report them on a per-item basis.
           if (action instanceof Delete) {
             delete(regionName, (Delete) action);
             response.add(regionName, new Pair<Integer, Result>(
@@ -2341,14 +2343,50 @@ public class HRegionServer implements HR
             response.add(regionName, new Pair<Integer, Result>(
                 a.getOriginalIndex(), get(regionName, (Get) action)));
           } else if (action instanceof Put) {
-            put(regionName, (Put) action);
-            response.add(regionName, new Pair<Integer, Result>(
-                a.getOriginalIndex(), new Result()));
+            puts.add(a);
           } else {
             LOG.debug("Error: invalid Action, row must be a Get, Delete or 
Put.");
             throw new IllegalArgumentException("Invalid Action, row must be a 
Get, Delete or Put.");
           }
         }
+
+        // We do the puts with result.put so we can get the batching efficiency
+        // we so need. All this data munging doesn't seem great, but at least
+        // we arent copying bytes or anything.
+        if (!puts.isEmpty()) {
+          HRegion region = getRegion(regionName);
+          if (!region.getRegionInfo().isMetaTable()) {
+            this.cacheFlusher.reclaimMemStoreMemory();
+          }
+
+          Pair<Put,Integer> [] putsWithLocks = new Pair[puts.size()];
+          int i = 0;
+          for (Action a : puts) {
+            Put p = (Put) a.getAction();
+
+            Integer lock = getLockFromId(p.getLockId());
+            putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+          }
+
+          this.requestCount.addAndGet(puts.size());
+
+          OperationStatusCode[] codes = region.put(putsWithLocks);
+          for( i = 0 ; i < codes.length ; i++) {
+            OperationStatusCode code = codes[i];
+
+            Action theAction = puts.get(i);
+            Result result = null;
+
+            if (code == OperationStatusCode.SUCCESS) {
+              result = new Result();
+            }
+            // TODO turning the alternate exception into a different result
+
+            response.add(regionName,
+                new Pair<Integer, Result>(
+                    theAction.getOriginalIndex(), result));
+          }
+        }
       } catch (IOException ioe) {
         if (multi.size() == 1) throw ioe;
         LOG.debug("Exception processing " +

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1024074&r1=1024073&r2=1024074&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java 
(original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java 
Tue Oct 19 01:01:52 2010
@@ -140,7 +140,7 @@ public class TestMultiParallel {
   /**
    * Only run one Multi test with a forced RegionServer abort. Otherwise, the
    * unit tests will take an unnecessarily long time to run.
-   * 
+   *
    * @throws Exception
    */
   @Test public void testFlushCommitsWithAbort() throws Exception {
@@ -354,17 +354,11 @@ public class TestMultiParallel {
     get.addColumn(BYTES_FAMILY, QUALIFIER);
     actions.add(get);
 
-    // 5 get of the put in #2 (entire family)
-    get = new Get(KEYS[10]);
-    get.addFamily(BYTES_FAMILY);
-    actions.add(get);
-
-    // 6 get of the delete from #3
-    get = new Get(KEYS[20]);
-    get.addColumn(BYTES_FAMILY, QUALIFIER);
-    actions.add(get);
+    // There used to be a 'get' of a previous put here, but removed
+    // since this API really cannot guarantee order in terms of mixed
+    // get/puts.
 
-    // 7 put of new column
+    // 5 put of new column
     put = new Put(KEYS[40]);
     put.add(BYTES_FAMILY, qual2, val2);
     actions.add(put);
@@ -378,10 +372,7 @@ public class TestMultiParallel {
     validateEmpty(results[2]);
     validateEmpty(results[3]);
     validateResult(results[4]);
-    validateResult(results[5]);
-    validateResult(results[5], qual2, val2); // testing second column in #5
-    validateEmpty(results[6]); // deleted
-    validateEmpty(results[7]);
+    validateEmpty(results[5]);
 
     // validate last put, externally from the batch
     get = new Get(KEYS[40]);


Reply via email to