Author: jimk
Date: Thu Jul 26 14:58:22 2007
New Revision: 560014

URL: http://svn.apache.org/viewvc?view=rev&rev=560014
Log:
HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches to 
a single row at a time)

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=560014&r1=560013&r2=560014
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Jul 26 14:58:22 2007
@@ -77,3 +77,5 @@
      region server
  49. HADOOP-1646 RegionServer OOME's under sustained, substantial loading by
      10 concurrent clients
+ 50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict 
batches
+     to a single row at a time)

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=560014&r1=560013&r2=560014
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
 Thu Jul 26 14:58:22 2007
@@ -22,13 +22,15 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,99 +63,9 @@
   int numRetries;
   HMasterInterface master;
   private final Configuration conf;
-  private volatile long currentLockId;
+  private AtomicLong currentLockId;
   private Class<? extends HRegionInterface> serverInterfaceClass;
-  
-  protected class BatchHandler {
-    private HashMap<RegionLocation, BatchUpdate> regionToBatch;
-    private HashMap<Long, BatchUpdate> lockToBatch;
-    
-    /** constructor */
-    public BatchHandler() {
-      this.regionToBatch = new HashMap<RegionLocation, BatchUpdate>();
-      this.lockToBatch = new HashMap<Long, BatchUpdate>();
-    }
-    
-    /** 
-     * Start a batch row insertion/update.
-     * 
-     * Manages multiple batch updates that are targeted for multiple servers,
-     * should the rows span several region servers.
-     * 
-     * No changes are committed until the client commits the batch operation 
via
-     * HClient.batchCommit().
-     * 
-     * The entire batch update can be abandoned by calling 
HClient.batchAbort();
-     *
-     * Callers to this method are given a handle that corresponds to the row 
being
-     * changed. The handle must be supplied on subsequent put or delete calls 
so
-     * that the row can be identified.
-     * 
-     * @param row Name of row to start update against.
-     * @return Row lockid.
-     */
-    public synchronized long startUpdate(Text row) {
-      RegionLocation info = getRegionLocation(row);
-      BatchUpdate batch = regionToBatch.get(info);
-      if(batch == null) {
-        batch = new BatchUpdate();
-        regionToBatch.put(info, batch);
-      }
-      long lockid = batch.startUpdate(row);
-      lockToBatch.put(lockid, batch);
-      return lockid;
-    }
-    
-    /**
-     * Change the value for the specified column
-     * 
-     * @param lockid lock id returned from startUpdate
-     * @param column column whose value is being set
-     * @param value new value for column
-     */
-    public synchronized void put(long lockid, Text column, byte[] value) {
-      BatchUpdate batch = lockToBatch.get(lockid);
-      if (batch == null) {
-        throw new IllegalArgumentException("invalid lock id " + lockid);
-      }
-      batch.put(lockid, column, value);
-    }
-    
-    /** 
-     * Delete the value for a column
-     *
-     * @param lockid              - lock id returned from startUpdate
-     * @param column              - name of column whose value is to be deleted
-     */
-    public synchronized void delete(long lockid, Text column) {
-      BatchUpdate batch = lockToBatch.get(lockid);
-      if (batch == null) {
-        throw new IllegalArgumentException("invalid lock id " + lockid);
-      }
-      batch.delete(lockid, column);
-    }
-    
-    /** 
-     * Finalize a batch mutation
-     *
-     * @param timestamp time to associate with all the changes
-     * @throws IOException
-     */
-    public synchronized void commit(long timestamp) throws IOException {
-      try {
-        for(Map.Entry<RegionLocation, BatchUpdate> e: 
regionToBatch.entrySet()) {
-          RegionLocation r = e.getKey();
-          HRegionInterface server = getHRegionConnection(r.serverAddress);
-          server.batchUpdate(r.regionInfo.getRegionName(), timestamp,
-              e.getValue());
-        }
-      } catch (RemoteException e) {
-        throw RemoteExceptionHandler.decodeRemoteException(e);
-      }
-    }
-  }
-
-  private BatchHandler batch;
+  private AtomicReference<BatchUpdate> batch;
   
   /*
    * Data structure that holds current location for a region and its info.
@@ -606,8 +518,8 @@
    */
   public HClient(Configuration conf) {
     this.conf = conf;
-    this.batch = null;
-    this.currentLockId = -1;
+    this.batch = new AtomicReference<BatchUpdate>();
+    this.currentLockId = new AtomicLong(-1L);
 
     this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
     this.numRetries = conf.getInt("hbase.client.retries.number", 5);
@@ -1159,7 +1071,7 @@
     if(tableName == null || tableName.getLength() == 0) {
       throw new IllegalArgumentException("table name cannot be null or zero 
length");
     }
-    if(this.currentLockId != -1 || batch != null) {
+    if(this.currentLockId.get() != -1L || batch.get() != null) {
       throw new IllegalStateException("update in progress");
     }
     this.currentTableServers = tableServers.getTableServers(tableName);
@@ -1481,51 +1393,90 @@
    * 
    * No changes are committed until the call to commitBatchUpdate returns.
    * A call to abortBatchUpdate will abandon the entire batch.
-   * 
-   * Note that in batch mode, calls to commit or abort are ignored.
+   *
+   * @param row name of row to be updated
+   * @return lockid to be used in subsequent put, delete and commit calls
    */
-  public synchronized void startBatchUpdate() {
-    if(this.currentTableServers == null) {
+  public synchronized long startBatchUpdate(final Text row) {
+    if (this.currentTableServers == null) {
       throw new IllegalStateException("Must open table first");
     }
-    
-    if(batch == null) {
-      batch = new BatchHandler();
+    if (batch.get() != null) {
+      throw new IllegalStateException("batch update in progress");
     }
+    batch.set(new BatchUpdate());
+    return batch.get().startUpdate(row);
   }
   
   /** 
    * Abort a batch mutation
+   * @param lockid lock id returned by startBatchUpdate
    */
-  public synchronized void abortBatch() {
-    batch = null;
+  public synchronized void abortBatch(final long lockid) {
+    BatchUpdate u = batch.get();
+    if (u == null) {
+      throw new IllegalStateException("no batch update in progress");
+    }
+    if (u.getLockid() != lockid) {
+      throw new IllegalArgumentException("invalid lock id " + lockid);
+    }
+    batch.set(null);
   }
   
   /** 
    * Finalize a batch mutation
    *
+   * @param lockid lock id returned by startBatchUpdate
    * @throws IOException
    */
-  public synchronized void commitBatch() throws IOException {
-    commitBatch(System.currentTimeMillis());
+  public void commitBatch(final long lockid) throws IOException {
+    commitBatch(lockid, System.currentTimeMillis());
   }
 
   /** 
    * Finalize a batch mutation
    *
+   * @param lockid lock id returned by startBatchUpdate
    * @param timestamp time to associate with all the changes
    * @throws IOException
    */
-  public synchronized void commitBatch(long timestamp) throws IOException {
-    if(batch == null) {
+  public synchronized void commitBatch(final long lockid, final long timestamp)
+  throws IOException {
+    BatchUpdate u = batch.get();
+    if (u == null) {
       throw new IllegalStateException("no batch update in progress");
     }
+    if (u.getLockid() != lockid) {
+      throw new IllegalArgumentException("invalid lock id " + lockid);
+    }
     
     try {
-      batch.commit(timestamp);
-      
+      for (int tries = 0; tries < numRetries; tries++) {
+        RegionLocation r = getRegionLocation(u.getRow());
+        HRegionInterface server = getHRegionConnection(r.serverAddress);
+        try {
+          server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, u);
+          break;
+
+        } catch (IOException e) {
+          if (tries < numRetries -1) {
+            reloadCurrentTable(r);
+
+          } else {
+            if (e instanceof RemoteException) {
+              e = 
RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+            }
+            throw e;
+          }
+        }
+        try {
+          Thread.sleep(pause);
+
+        } catch (InterruptedException e) {
+        }
+      }
     } finally {
-      batch = null;
+      batch.set(null);
     }
   }
   
@@ -1545,26 +1496,27 @@
    * @throws IOException
    */
   public synchronized long startUpdate(final Text row) throws IOException {
-    if(this.currentLockId != -1) {
+    if (this.currentLockId.get() != -1L) {
       throw new IllegalStateException("update in progress");
     }
-    if(batch != null) {
-      return batch.startUpdate(row);
+    if (batch.get() != null) {
+      throw new IllegalStateException("batch update in progress");
     }
-    for(int tries = 0; tries < numRetries; tries++) {
+    for (int tries = 0; tries < numRetries; tries++) {
       IOException e = null;
       RegionLocation info = getRegionLocation(row);
       try {
         currentServer = getHRegionConnection(info.serverAddress);
         currentRegion = info.regionInfo.regionName;
         clientid = rand.nextLong();
-        this.currentLockId = currentServer.startUpdate(currentRegion, 
clientid, row);
+        this.currentLockId.set(
+            currentServer.startUpdate(currentRegion, clientid, row));
         break;
         
       } catch (IOException ex) {
         e = ex;
       }
-      if(tries < numRetries - 1) {
+      if (tries < numRetries - 1) {
         try {
           Thread.sleep(this.pause);
           
@@ -1577,13 +1529,13 @@
           e = ex;
         }
       } else {
-        if(e instanceof RemoteException) {
+        if (e instanceof RemoteException) {
           e = RemoteExceptionHandler.decodeRemoteException((RemoteException) 
e);
         }
         throw e;
       }
     }
-    return this.currentLockId;
+    return this.currentLockId.get();
   }
   
   /** 
@@ -1596,29 +1548,29 @@
    * @throws IOException
    */
   public void put(long lockid, Text column, byte val[]) throws IOException {
-    if(val == null) {
+    if (val == null) {
       throw new IllegalArgumentException("value cannot be null");
     }
-    if(batch != null) {
-      batch.put(lockid, column, val);
+    if (batch.get() != null) {
+      batch.get().put(lockid, column, val);
       return;
     }
     
-    if(lockid != this.currentLockId) {
+    if (lockid != this.currentLockId.get()) {
       throw new IllegalArgumentException("invalid lockid");
     }
     try {
       this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
         val);
-    } catch(IOException e) {
+    } catch (IOException e) {
       try {
         this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-      } catch(IOException e2) {
+      } catch (IOException e2) {
         LOG.warn(e2);
       }
       this.currentServer = null;
       this.currentRegion = null;
-      if(e instanceof RemoteException) {
+      if (e instanceof RemoteException) {
         e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
       }
       throw e;
@@ -1633,18 +1585,18 @@
    * @throws IOException
    */
   public void delete(long lockid, Text column) throws IOException {
-    if(batch != null) {
-      batch.delete(lockid, column);
+    if (batch.get() != null) {
+      batch.get().delete(lockid, column);
       return;
     }
     
-    if(lockid != this.currentLockId) {
+    if (lockid != this.currentLockId.get()) {
       throw new IllegalArgumentException("invalid lockid");
     }
     try {
       this.currentServer.delete(this.currentRegion, this.clientid, lockid,
         column);
-    } catch(IOException e) {
+    } catch (IOException e) {
       try {
         this.currentServer.abort(this.currentRegion, this.clientid, lockid);
       } catch(IOException e2) {
@@ -1652,7 +1604,7 @@
       }
       this.currentServer = null;
       this.currentRegion = null;
-      if(e instanceof RemoteException) {
+      if (e instanceof RemoteException) {
         e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
       }
       throw e;
@@ -1666,24 +1618,25 @@
    * @throws IOException
    */
   public void abort(long lockid) throws IOException {
-    if(batch != null) {
+    if (batch.get() != null) {
+      abortBatch(lockid);
       return;
     }
     
-    if(lockid != this.currentLockId) {
+    if (lockid != this.currentLockId.get()) {
       throw new IllegalArgumentException("invalid lockid");
     }
     try {
       this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-    } catch(IOException e) {
+    } catch (IOException e) {
       this.currentServer = null;
       this.currentRegion = null;
-      if(e instanceof RemoteException) {
+      if (e instanceof RemoteException) {
         e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
       }
       throw e;
     } finally {
-      this.currentLockId = -1;
+      this.currentLockId.set(-1L);
     }
   }
   
@@ -1705,11 +1658,12 @@
    * @throws IOException
    */
   public void commit(long lockid, long timestamp) throws IOException {
-    if(batch != null) {
+    if (batch.get() != null) {
+      commitBatch(lockid, timestamp);
       return;
     }
     
-    if(lockid != this.currentLockId) {
+    if (lockid != this.currentLockId.get()) {
       throw new IllegalArgumentException("invalid lockid");
     }
     try {
@@ -1724,7 +1678,7 @@
       }
       throw e;
     } finally {
-      this.currentLockId = -1;
+      this.currentLockId.set(-1L);
     }
   }
   
@@ -1735,24 +1689,24 @@
    * @throws IOException
    */
   public void renewLease(long lockid) throws IOException {
-    if(batch != null) {
+    if (batch.get() != null) {
       return;
     }
     
-    if(lockid != this.currentLockId) {
+    if (lockid != this.currentLockId.get()) {
       throw new IllegalArgumentException("invalid lockid");
     }
     try {
       this.currentServer.renewLease(lockid, this.clientid);
-    } catch(IOException e) {
+    } catch (IOException e) {
       try {
         this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-      } catch(IOException e2) {
+      } catch (IOException e2) {
         LOG.warn(e2);
       }
       this.currentServer = null;
       this.currentRegion = null;
-      if(e instanceof RemoteException) {
+      if (e instanceof RemoteException) {
         e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
       }
       throw e;
@@ -1770,7 +1724,7 @@
     private Text startRow;
     private long scanTime;
     private boolean closed;
-    private volatile RegionLocation[] regions;
+    private AtomicReferenceArray<RegionLocation> regions;
     @SuppressWarnings("hiding")
     private int currentRegion;
     private HRegionInterface server;
@@ -1791,7 +1745,8 @@
       Collection<RegionLocation> info =
         currentTableServers.tailMap(firstServer).values();
       
-      this.regions = info.toArray(new RegionLocation[info.size()]);
+      this.regions = new AtomicReferenceArray<RegionLocation>(
+          info.toArray(new RegionLocation[info.size()]));
     }
     
     ClientScanner(Text[] columns, Text startRow, long timestamp,
@@ -1821,14 +1776,14 @@
         this.scannerId = -1L;
       }
       this.currentRegion += 1;
-      if(this.currentRegion == this.regions.length) {
+      if(this.currentRegion == this.regions.length()) {
         close();
         return false;
       }
       try {
         for(int tries = 0; tries < numRetries; tries++) {
-          RegionLocation info = this.regions[currentRegion];
-          this.server = 
getHRegionConnection(this.regions[currentRegion].serverAddress);
+          RegionLocation info = this.regions.get(currentRegion);
+          this.server = getHRegionConnection(info.serverAddress);
           
           try {
             if (this.filter == null) {

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=560014&r1=560013&r2=560014
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
 Thu Jul 26 14:58:22 2007
@@ -976,23 +976,20 @@
    */
   public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
   throws IOException {
-    for(Map.Entry<Text, ArrayList<BatchOperation>> e: b) {
-      Text row = e.getKey();
-      long clientid = rand.nextLong();
-      long lockid = startUpdate(regionName, clientid, row);
-      for(BatchOperation op: e.getValue()) {
-        switch(op.getOp()) {
-        case BatchOperation.PUT_OP:
-          put(regionName, clientid, lockid, op.getColumn(), op.getValue());
-          break;
-          
-        case BatchOperation.DELETE_OP:
-          delete(regionName, clientid, lockid, op.getColumn());
-          break;
-        }
+    long clientid = rand.nextLong();
+    long lockid = startUpdate(regionName, clientid, b.getRow());
+    for(BatchOperation op: b) {
+      switch(op.getOp()) {
+      case BatchOperation.PUT_OP:
+        put(regionName, clientid, lockid, op.getColumn(), op.getValue());
+        break;
+
+      case BatchOperation.DELETE_OP:
+        delete(regionName, clientid, lockid, op.getColumn());
+        break;
       }
-      commit(regionName, clientid, lockid, timestamp);
     }
+    commit(regionName, clientid, lockid, timestamp);
   }
   
   /**

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?view=diff&rev=560014&r1=560013&r2=560014
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
 Thu Jul 26 14:58:22 2007
@@ -23,9 +23,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.io.Text;
@@ -38,25 +36,38 @@
  * can result in multiple BatchUpdate objects if the batch contains rows that
  * are served by multiple region servers.
  */
-public class BatchUpdate implements Writable,
-Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
+public class BatchUpdate implements Writable, Iterable<BatchOperation> {
   
   // used to generate lock ids
   private Random rand;
+
+  // the row being updated
+  private Text row;
   
-  // used on client side to map lockid to a set of row updates
-  private HashMap<Long, ArrayList<BatchOperation>> lockToRowOps;
+  // the lockid
+  private long lockid;
   
-  // the operations for each row
-  private HashMap<Text, ArrayList<BatchOperation>> operations;
+  // the batched operations
+  private ArrayList<BatchOperation> operations;
   
   /** constructor */
   public BatchUpdate() {
     this.rand = new Random();
-    this.lockToRowOps = new HashMap<Long, ArrayList<BatchOperation>>();
-    this.operations = new HashMap<Text, ArrayList<BatchOperation>>();
+    this.row = new Text();
+    this.lockid = -1L;
+    this.operations = new ArrayList<BatchOperation>();
   }
 
+  /** @return the lock id */
+  public long getLockid() {
+    return lockid;
+  }
+  
+  /** @return the row */
+  public Text getRow() {
+    return row;
+  }
+  
   /** 
    * Start a batch row insertion/update.
    * 
@@ -66,21 +77,15 @@
    * The entire batch update can be abandoned by calling HClient.batchAbort();
    *
    * Callers to this method are given a handle that corresponds to the row 
being
-   * changed. The handle must be supplied on subsequent put or delete calls so
-   * that the row can be identified.
+   * changed. The handle must be supplied on subsequent put or delete calls.
    * 
    * @param row Name of row to start update against.
    * @return Row lockid.
    */
-  public synchronized long startUpdate(Text row) {
-    Long lockid = Long.valueOf(Math.abs(rand.nextLong()));
-    ArrayList<BatchOperation> ops = operations.get(row);
-    if(ops == null) {
-      ops = new ArrayList<BatchOperation>();
-      operations.put(row, ops);
-    }
-    lockToRowOps.put(lockid, ops);
-    return lockid.longValue();
+  public synchronized long startUpdate(final Text row) {
+    this.row = row;
+    this.lockid = Long.valueOf(Math.abs(rand.nextLong()));
+    return this.lockid;
   }
   
   /** 
@@ -90,12 +95,12 @@
    * @param column              - column whose value is being set
    * @param val                 - new value for column
    */
-  public synchronized void put(long lockid, Text column, byte val[]) {
-    ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
-    if(ops == null) {
-      throw new IllegalArgumentException("no row for lockid " + lockid);
+  public synchronized void put(final long lockid, final Text column,
+      final byte val[]) {
+    if(this.lockid != lockid) {
+      throw new IllegalArgumentException("invalid lockid " + lockid);
     }
-    ops.add(new BatchOperation(column, val));
+    operations.add(new BatchOperation(column, val));
   }
   
   /** 
@@ -104,12 +109,11 @@
    * @param lockid              - lock id returned from startUpdate
    * @param column              - name of column whose value is to be deleted
    */
-  public synchronized void delete(long lockid, Text column) {
-    ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
-    if(ops == null) {
-      throw new IllegalArgumentException("no row for lockid " + lockid);
+  public synchronized void delete(final long lockid, final Text column) {
+    if(this.lockid != lockid) {
+      throw new IllegalArgumentException("invalid lockid " + lockid);
     }
-    ops.add(new BatchOperation(column));
+    operations.add(new BatchOperation(column));
   }
 
   //
@@ -117,11 +121,10 @@
   //
   
   /**
-   * @return Iterator<Map.Entry<Text, ArrayList<BatchOperation>>>
-   *         Text row -> ArrayList<BatchOperation> changes
+   * @return Iterator<BatchOperation>
    */
-  public Iterator<Map.Entry<Text, ArrayList<BatchOperation>>> iterator() {
-    return operations.entrySet().iterator();
+  public Iterator<BatchOperation> iterator() {
+    return operations.iterator();
   }
   
   //
@@ -132,20 +135,12 @@
    * [EMAIL PROTECTED]
    */
   public void readFields(DataInput in) throws IOException {
+    row.readFields(in);
     int nOps = in.readInt();
     for (int i = 0; i < nOps; i++) {
-      Text row = new Text();
-      row.readFields(in);
-      
-      int nRowOps = in.readInt();
-      ArrayList<BatchOperation> rowOps = new ArrayList<BatchOperation>();
-      for(int j = 0; j < nRowOps; j++) {
-        BatchOperation op = new BatchOperation();
-        op.readFields(in);
-        rowOps.add(op);
-      }
-      
-      operations.put(row, rowOps);
+      BatchOperation op = new BatchOperation();
+      op.readFields(in);
+      operations.add(op);
     }
   }
 
@@ -153,16 +148,10 @@
    * [EMAIL PROTECTED]
    */
   public void write(DataOutput out) throws IOException {
+    row.write(out);
     out.writeInt(operations.size());
-    for (Map.Entry<Text, ArrayList<BatchOperation>> e: operations.entrySet()) {
-      e.getKey().write(out);
-      
-      ArrayList<BatchOperation> ops = e.getValue();
-      out.writeInt(ops.size());
-      
-      for(BatchOperation op: ops) {
-        op.write(out);
-      }
+    for (BatchOperation op: operations) {
+      op.write(out);
     }
   }
 }

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java?view=diff&rev=560014&r1=560013&r2=560014
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
 Thu Jul 26 14:58:22 2007
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.hadoop.io.Text;
@@ -29,11 +30,21 @@
 public class TestBatchUpdate extends HBaseClusterTestCase {
   private static final String CONTENTS_STR = "contents:";
   private static final Text CONTENTS = new Text(CONTENTS_STR);
-  private static final byte[] value = { 1, 2, 3, 4 };
+  private byte[] value;
 
   private HTableDescriptor desc = null;
   private HClient client = null;
 
+  /** constructor */
+  public TestBatchUpdate() {
+    try {
+      value = "abcd".getBytes(HConstants.UTF8_ENCODING);
+      
+    } catch (UnsupportedEncodingException e) {
+      fail();
+    }
+  }
+  
   /**
    * [EMAIL PROTECTED]
    */
@@ -56,7 +67,7 @@
   /** the test case */
   public void testBatchUpdate() {
     try {
-      client.commitBatch();
+      client.commitBatch(-1L);
       
     } catch (IllegalStateException e) {
       // expected
@@ -65,7 +76,7 @@
       fail();
     }
 
-    client.startBatchUpdate();
+    long lockid = client.startBatchUpdate(new Text("row1"));
     
     try {
       client.openTable(HConstants.META_TABLE_NAME);
@@ -77,14 +88,22 @@
       fail();
     }
     try {
-      long lockid = client.startUpdate(new Text("row1"));
+      try {
+        @SuppressWarnings("unused")
+        long dummy = client.startUpdate(new Text("row2"));
+      } catch (IllegalStateException e) {
+        // expected
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail();
+      }
       client.put(lockid, CONTENTS, value);
       client.delete(lockid, CONTENTS);
+      client.commitBatch(lockid);
       
-      lockid = client.startUpdate(new Text("row2"));
+      lockid = client.startBatchUpdate(new Text("row2"));
       client.put(lockid, CONTENTS, value);
-      
-      client.commitBatch();
+      client.commit(lockid);
  
       Text[] columns = { CONTENTS };
       HScannerInterface scanner = client.obtainScanner(columns, new Text());


Reply via email to