Author: jimk Date: Tue Aug 14 18:12:29 2007 New Revision: 565993 URL: http://svn.apache.org/viewvc?view=rev&rev=565993 Log: HADOOP-1710 All updates should be batch updates
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHTable.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Tue Aug 14 18:12:29 2007 @@ -412,7 +412,7 @@ if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - return this.table.get().startBatchUpdate(row); + return this.table.get().startUpdate(row); } /** @@ -423,7 +423,7 @@ if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - this.table.get().abortBatch(lockid); + this.table.get().abort(lockid); } /** @@ -448,7 +448,7 @@ if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - this.table.get().commitBatch(lockid, timestamp); + this.table.get().commit(lockid, timestamp); } /** @@ -464,9 +464,8 @@ * * @param row Name of row to start update against. * @return Row lockid. - * @throws IOException */ - public long startUpdate(final Text row) throws IOException { + public long startUpdate(final Text row) { if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } @@ -480,9 +479,8 @@ * @param lockid lock id returned from startUpdate * @param column column whose value is being set * @param val new value for column - * @throws IOException */ - public void put(long lockid, Text column, byte val[]) throws IOException { + public void put(long lockid, Text column, byte val[]) { if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } @@ -494,9 +492,8 @@ * * @param lockid - lock id returned from startUpdate * @param column - name of column whose value is to be deleted - * @throws IOException */ - public void delete(long lockid, Text column) throws IOException { + public void delete(long lockid, Text column) { if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } @@ -507,9 +504,8 @@ * Abort a row mutation * * @param lockid - lock id returned from startUpdate - * @throws IOException */ - public void abort(long lockid) throws IOException { + public void abort(long lockid) { if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } @@ -544,13 +540,11 @@ * Renew lease on update * * @param lockid - lock id returned from startUpdate - * @throws IOException */ - public void renewLease(long lockid) throws IOException { + public void renewLease(@SuppressWarnings("unused") long lockid) { if(this.table.get() == null) { throw new IllegalStateException("Must open table first"); } - this.table.get().renewLease(lockid); } private void printUsage() { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java Tue Aug 14 18:12:29 2007 @@ -502,7 +502,7 @@ } catch (IOException e) { if (tries < numRetries - 1) { - findServersForTable(META_TABLE_NAME); + metaServers = findServersForTable(META_TABLE_NAME); success = false; break; } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue Aug 14 18:12:29 2007 @@ -1796,21 +1796,20 @@ // Remove server from root/meta entries - long clientId = rand.nextLong(); for (ToDoEntry e: toDoList) { - long lockid = server.startUpdate(regionName, clientId, e.row); + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(e.row); if (e.deleteRegion) { - server.delete(regionName, clientId, lockid, COL_REGIONINFO); + b.delete(lockid, COL_REGIONINFO); } else if (e.regionOffline) { e.info.offLine = true; - server.put(regionName, clientId, lockid, COL_REGIONINFO, - Writables.getBytes(e.info)); + b.put(lockid, COL_REGIONINFO, Writables.getBytes(e.info)); } - server.delete(regionName, clientId, lockid, COL_SERVER); - server.delete(regionName, clientId, lockid, COL_STARTCODE); - server.commit(regionName, clientId, lockid, System.currentTimeMillis()); + b.delete(lockid, COL_SERVER); + b.delete(lockid, COL_STARTCODE); + server.batchUpdate(regionName, System.currentTimeMillis(), b); } // Get regions reassigned @@ -2053,23 +2052,20 @@ server = connection.getHRegionConnection(r.server); } - long clientId = rand.nextLong(); try { - long lockid = server.startUpdate(metaRegionName, clientId, - regionInfo.regionName); + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(regionInfo.regionName); if (deleteRegion) { - server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO); + b.delete(lockid, COL_REGIONINFO); } else if (!reassignRegion ) { regionInfo.offLine = true; - server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - Writables.getBytes(regionInfo)); + b.put(lockid, COL_REGIONINFO, Writables.getBytes(regionInfo)); } - server.delete(metaRegionName, clientId, lockid, COL_SERVER); - server.delete(metaRegionName, clientId, lockid, COL_STARTCODE); - server.commit(metaRegionName, clientId, lockid, - System.currentTimeMillis()); + b.delete(lockid, COL_SERVER); + b.delete(lockid, COL_STARTCODE); + server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); break; @@ -2199,18 +2195,15 @@ LOG.info("updating row " + region.getRegionName() + " in table " + metaRegionName); - long clientId = rand.nextLong(); try { - long lockid = server.startUpdate(metaRegionName, clientId, - region.getRegionName()); + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(region.getRegionName()); - server.put(metaRegionName, clientId, lockid, COL_SERVER, + b.put(lockid, COL_SERVER, Writables.stringToBytes(serverAddress.toString())); - server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode); - - server.commit(metaRegionName, clientId, lockid, - System.currentTimeMillis()); + b.put(lockid, COL_STARTCODE, startCode); + server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // It's a meta region. @@ -2335,11 +2328,11 @@ newRegion.getTableDesc().getName()).lastKey())); Text metaRegionName = m.regionName; - HRegionInterface r = connection.getHRegionConnection(m.server); - long scannerid = r.openScanner(metaRegionName, COL_REGIONINFO_ARRAY, + HRegionInterface server = connection.getHRegionConnection(m.server); + long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null); try { - KeyedData[] data = r.next(scannerid); + KeyedData[] data = server.next(scannerid); // Test data and that the row for the data is for our table. If table // does not exist, scanner will return row after where our table would @@ -2355,7 +2348,7 @@ } } finally { - r.close(scannerid); + server.close(scannerid); } // 2. Create the HRegion @@ -2367,13 +2360,10 @@ HRegionInfo info = region.getRegionInfo(); Text regionName = region.getRegionName(); - long clientId = rand.nextLong(); - long lockid = r.startUpdate(metaRegionName, clientId, regionName); - - r.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - Writables.getBytes(info)); - - r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(regionName); + b.put(lockid, COL_REGIONINFO, Writables.getBytes(info)); + server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); // 4. Close the new region to flush it to disk. Close its log file too. @@ -2608,7 +2598,6 @@ new HashMap<String, HashSet<HRegionInfo>>(); protected long lockid; - protected long clientId; ChangeTableState(Text tableName, boolean onLine) throws IOException { super(tableName); @@ -2653,41 +2642,36 @@ LOG.debug("updating columns in row: " + i.regionName); } - lockid = -1L; - clientId = rand.nextLong(); - try { - lockid = server.startUpdate(m.regionName, clientId, i.regionName); - updateRegionInfo(server, m.regionName, i); - server.delete(m.regionName, clientId, lockid, COL_SERVER); - server.delete(m.regionName, clientId, lockid, COL_STARTCODE); - server.commit(m.regionName, clientId, lockid, - System.currentTimeMillis()); - - lockid = -1L; - - if (LOG.isDebugEnabled()) { - LOG.debug("updated columns in row: " + i.regionName); - } - - } catch (IOException e) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) e); - } - LOG.error("column update failed in row: " + i.regionName, e); + BatchUpdate b = new BatchUpdate(); + lockid = b.startUpdate(i.regionName); + updateRegionInfo(b, i); + b.delete(lockid, COL_SERVER); + b.delete(lockid, COL_STARTCODE); - } finally { + for (int tries = 0; tries < numRetries; tries++) { try { - if (lockid != -1L) { - server.abort(m.regionName, clientId, lockid); + server.batchUpdate(m.regionName, System.currentTimeMillis(), b); + + if (LOG.isDebugEnabled()) { + LOG.debug("updated columns in row: " + i.regionName); } + break; - } catch (IOException iex) { - if (iex instanceof RemoteException) { - iex = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) iex); + } catch (IOException e) { + if (tries == numRetries - 1) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } + LOG.error("column update failed in row: " + i.regionName, e); + break; } - LOG.error("", iex); + } + try { + Thread.sleep(threadWakeFrequency); + + } catch (InterruptedException e) { + // continue } } @@ -2738,12 +2722,11 @@ servedRegions.clear(); } - protected void updateRegionInfo(final HRegionInterface server, - final Text regionName, final HRegionInfo i) throws IOException { + protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i) + throws IOException { i.offLine = !online; - server.put(regionName, clientId, lockid, COL_REGIONINFO, - Writables.getBytes(i)); + b.put(lockid, COL_REGIONINFO, Writables.getBytes(i)); } } @@ -2790,11 +2773,10 @@ } @Override - protected void updateRegionInfo( - @SuppressWarnings("hiding") HRegionInterface server, Text regionName, - @SuppressWarnings("unused") HRegionInfo i) throws IOException { + protected void updateRegionInfo(BatchUpdate b, + @SuppressWarnings("unused") HRegionInfo i) { - server.delete(regionName, clientId, lockid, COL_REGIONINFO); + b.delete(lockid, COL_REGIONINFO); } } @@ -2816,39 +2798,34 @@ protected void updateRegionInfo(HRegionInterface server, Text regionName, HRegionInfo i) throws IOException { + + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(i.regionName); + b.put(lockid, COL_REGIONINFO, Writables.getBytes(i)); - long lockid = -1L; - long clientId = rand.nextLong(); - try { - lockid = server.startUpdate(regionName, clientId, i.regionName); - server.put(regionName, clientId, lockid, COL_REGIONINFO, - Writables.getBytes(i)); - - server.commit(regionName, clientId, lockid, System.currentTimeMillis()); - lockid = -1L; + for (int tries = 0; tries < numRetries; tries++) { + try { + server.batchUpdate(regionName, System.currentTimeMillis(), b); - if (LOG.isDebugEnabled()) { - LOG.debug("updated columns in row: " + i.regionName); - } + if (LOG.isDebugEnabled()) { + LOG.debug("updated columns in row: " + i.regionName); + } + break; - } catch (Exception e) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - LOG.error("column update failed in row: " + i.regionName, e); - - } finally { - if (lockid != -1L) { - try { - server.abort(regionName, clientId, lockid); - - } catch (IOException iex) { - if (iex instanceof RemoteException) { - iex = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) iex); + } catch (IOException e) { + if (tries == numRetries - 1) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } - LOG.error("", iex); + LOG.error("column update failed in row: " + i.regionName, e); + break; } + } + try { + Thread.sleep(threadWakeFrequency); + + } catch (InterruptedException e) { + // continue } } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Tue Aug 14 18:12:29 2007 @@ -283,13 +283,8 @@ LOG.debug("updated columns in row: " + regionsToDelete[r]); } } finally { - try { - if(lockid != -1L) { - table.abort(lockid); - } - - } catch(IOException iex) { - LOG.error(iex); + if(lockid != -1L) { + table.abort(lockid); } } } @@ -309,13 +304,8 @@ + newRegion.getRegionName()); } } finally { - try { - if(lockid != -1L) { - table.abort(lockid); - } - - } catch(IOException iex) { - LOG.error(iex); + if(lockid != -1L) { + table.abort(lockid); } } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Tue Aug 14 18:12:29 2007 @@ -97,7 +97,7 @@ * @throws IOException */ public KeyedData[] getRow(final Text regionName, final Text row) - throws IOException; + throws IOException; //TODO ////////////////////////////////////////////////////////////////////////////// // Start an atomic row insertion/update. No changes are committed until the @@ -126,7 +126,10 @@ * @param row Name of row to start update against. * @return Row lockid. * @throws IOException + * + * Deprecated. Use @see [EMAIL PROTECTED] #batchUpdate(Text, long, BatchUpdate)} instead. */ + @Deprecated public long startUpdate(final Text regionName, final long clientid, final Text row) throws IOException; @@ -140,7 +143,10 @@ * @param column column whose value is being set * @param val new value for column * @throws IOException + * + * Deprecated. Use @see [EMAIL PROTECTED] #batchUpdate(Text, long, BatchUpdate)} instead. */ + @Deprecated public void put(final Text regionName, final long clientid, final long lockid, final Text column, final byte [] val) throws IOException; @@ -153,7 +159,10 @@ * @param lockid lock id returned from startUpdate * @param column name of column whose value is to be deleted * @throws IOException + * + * Deprecated. Use @see [EMAIL PROTECTED] #batchUpdate(Text, long, BatchUpdate)} instead. */ + @Deprecated public void delete(final Text regionName, final long clientid, final long lockid, final Text column) throws IOException; @@ -165,7 +174,10 @@ * @param clientid a unique value to identify the client * @param lockid lock id returned from startUpdate * @throws IOException + * + * Deprecated. Use @see [EMAIL PROTECTED] #batchUpdate(Text, long, BatchUpdate)} instead. */ + @Deprecated public void abort(final Text regionName, final long clientid, final long lockid) throws IOException; @@ -178,7 +190,10 @@ * @param lockid lock id returned from startUpdate * @param timestamp the time (in milliseconds to associate with this change) * @throws IOException + * + * Deprecated. Use @see [EMAIL PROTECTED] #batchUpdate(Text, long, BatchUpdate)} instead. */ + @Deprecated public void commit(final Text regionName, final long clientid, final long lockid, final long timestamp) throws IOException; @@ -189,7 +204,10 @@ * @param lockid lock id returned from startUpdate * @param clientid a unique value to identify the client * @throws IOException + * + * Deprecated. Use @see [EMAIL PROTECTED] #batchUpdate(Text, long, BatchUpdate)} instead. */ + @Deprecated public void renewLease(long lockid, long clientid) throws IOException; ////////////////////////////////////////////////////////////////////////////// @@ -229,7 +247,7 @@ * @return array of values * @throws IOException */ - public KeyedData[] next(long scannerId) throws IOException; + public KeyedData[] next(long scannerId) throws IOException; //TODO /** * Close a scanner Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Tue Aug 14 18:12:29 2007 @@ -225,7 +225,7 @@ // Remove old region from META // NOTE: there is no need for retry logic here. HTable does it for us. - long lockid = t.startBatchUpdate(oldRegionInfo.getRegionName()); + long lockid = t.startUpdate(oldRegionInfo.getRegionName()); oldRegionInfo.offLine = true; oldRegionInfo.split = true; t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo)); @@ -235,17 +235,17 @@ t.put(lockid, COL_SPLITB, Writables.getBytes( newRegions[1].getRegionInfo())); - t.commitBatch(lockid); + t.commit(lockid); // Add new regions to META for (int i = 0; i < newRegions.length; i++) { - lockid = t.startBatchUpdate(newRegions[i].getRegionName()); + lockid = t.startUpdate(newRegions[i].getRegionName()); t.put(lockid, COL_REGIONINFO, Writables.getBytes( newRegions[i].getRegionInfo())); - t.commitBatch(lockid); + t.commit(lockid); } // Now tell the master about the new regions @@ -975,18 +975,14 @@ // HRegionInterface ////////////////////////////////////////////////////////////////////////////// - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException { requestCount.incrementAndGet(); return getRegion(regionName).getRegionInfo(); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException { requestCount.incrementAndGet(); @@ -1006,9 +1002,7 @@ commit(regionName, clientid, lockid, timestamp); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public byte [] get(final Text regionName, final Text row, final Text column) throws IOException { @@ -1016,9 +1010,7 @@ return getRegion(regionName).get(row, column); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public byte [][] get(final Text regionName, final Text row, final Text column, final int numVersions) throws IOException { @@ -1026,18 +1018,14 @@ return getRegion(regionName).get(row, column, numVersions); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public byte [][] get(final Text regionName, final Text row, final Text column, final long timestamp, final int numVersions) throws IOException { requestCount.incrementAndGet(); return getRegion(regionName).get(row, column, timestamp, numVersions); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public KeyedData[] getRow(final Text regionName, final Text row) throws IOException { requestCount.incrementAndGet(); @@ -1052,9 +1040,7 @@ return result; } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public KeyedData[] next(final long scannerId) throws IOException { requestCount.incrementAndGet(); @@ -1096,18 +1082,15 @@ return values.toArray(new KeyedData[values.size()]); } - /** - * [EMAIL PROTECTED] + /* + * NOTE: When startUpdate, put, delete, abort, commit and renewLease are + * removed from HRegionInterface, these methods (with the exception of + * renewLease) must remain, as they are called by batchUpdate (renewLease + * can just be removed) + * + * However, the remaining methods can become protected instead of public + * at that point. */ - public long startUpdate(Text regionName, long clientid, Text row) - throws IOException { - requestCount.incrementAndGet(); - HRegion region = getRegion(regionName); - long lockid = region.startUpdate(row); - this.leases.createLease(clientid, lockid, - new RegionListener(region, lockid)); - return lockid; - } /** Create a lease for an update. If it times out, the update is aborted */ private static class RegionListener implements LeaseListener { @@ -1119,9 +1102,7 @@ this.localLockId = lockId; } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public void leaseExpired() { try { localRegion.abort(localLockId); @@ -1139,9 +1120,18 @@ } } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ + public long startUpdate(Text regionName, long clientid, Text row) + throws IOException { + requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + long lockid = region.startUpdate(row); + this.leases.createLease(clientid, lockid, + new RegionListener(region, lockid)); + return lockid; + } + + /** [EMAIL PROTECTED] */ public void put(final Text regionName, final long clientid, final long lockid, final Text column, final byte [] val) throws IOException { @@ -1151,9 +1141,7 @@ region.put(lockid, column, val); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException { requestCount.incrementAndGet(); @@ -1162,9 +1150,7 @@ region.delete(lockid, column); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public void abort(Text regionName, long clientid, long lockid) throws IOException { requestCount.incrementAndGet(); @@ -1173,9 +1159,7 @@ region.abort(lockid); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public void commit(Text regionName, final long clientid, final long lockid, final long timestamp) throws IOException { requestCount.incrementAndGet(); @@ -1184,9 +1168,7 @@ region.commit(lockid, timestamp); } - /** - * [EMAIL PROTECTED] - */ + /** [EMAIL PROTECTED] */ public void renewLease(long lockid, long clientid) throws IOException { requestCount.incrementAndGet(); leases.renewLease(clientid, lockid); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Tue Aug 14 18:12:29 2007 @@ -52,11 +52,6 @@ // For row mutation operations - protected volatile long currentLockId; - protected volatile Text currentRegion; - protected volatile HRegionInterface currentServer; - protected volatile long clientid; - protected volatile boolean closed; protected void checkClosed() { @@ -81,7 +76,6 @@ this.rand = new Random(); tableServers = connection.getTableServers(tableName); this.batch = null; - this.currentLockId = -1L; closed = false; } @@ -116,10 +110,6 @@ closed = true; tableServers = null; batch = null; - currentLockId = -1L; - currentRegion = null; - currentServer = null; - clientid = -1L; connection.close(tableName); } @@ -127,8 +117,28 @@ * Verifies that no update is in progress */ public synchronized void checkUpdateInProgress() { - if (batch != null || currentLockId != -1L) { - throw new IllegalStateException("update in progress"); + updateInProgress(false); + } + + /* + * Checks to see if an update is in progress + * + * @param updateMustBeInProgress + * If true, an update must be in progress. An IllegalStateException will be + * thrown if not. + * + * If false, an update must not be in progress. An IllegalStateException + * will be thrown if an update is in progress. + */ + private void updateInProgress(boolean updateMustBeInProgress) { + if (updateMustBeInProgress) { + if (batch == null) { + throw new IllegalStateException("no update in progress"); + } + } else { + if (batch != null) { + throw new IllegalStateException("update in progress"); + } } } @@ -415,27 +425,25 @@ * * @param row name of row to be updated * @return lockid to be used in subsequent put, delete and commit calls + * + * Deprecated. Batch operations are now the default. startBatchUpdate is now + * implemented by @see [EMAIL PROTECTED] #startUpdate(Text)} */ + @Deprecated public synchronized long startBatchUpdate(final Text row) { - checkClosed(); - checkUpdateInProgress(); - batch = new BatchUpdate(); - return batch.startUpdate(row); + return startUpdate(row); } /** * Abort a batch mutation * @param lockid lock id returned by startBatchUpdate + * + * Deprecated. Batch operations are now the default. abortBatch is now + * implemented by @see [EMAIL PROTECTED] #abort(long)} */ + @Deprecated public synchronized void abortBatch(final long lockid) { - checkClosed(); - if (batch == null) { - throw new IllegalStateException("no batch update in progress"); - } - if (batch.getLockid() != lockid) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - batch = null; + abort(lockid); } /** @@ -443,9 +451,13 @@ * * @param lockid lock id returned by startBatchUpdate * @throws IOException + * + * Deprecated. Batch operations are now the default. commitBatch(long) is now + * implemented by @see [EMAIL PROTECTED] #commit(long)} */ + @Deprecated public void commitBatch(final long lockid) throws IOException { - commitBatch(lockid, System.currentTimeMillis()); + commit(lockid, System.currentTimeMillis()); } /** @@ -454,104 +466,32 @@ * @param lockid lock id returned by startBatchUpdate * @param timestamp time to associate with all the changes * @throws IOException + * + * Deprecated. Batch operations are now the default. commitBatch(long, long) + * is now implemented by @see [EMAIL PROTECTED] #commit(long, long)} */ + @Deprecated public synchronized void commitBatch(final long lockid, final long timestamp) throws IOException { - checkClosed(); - if (batch == null) { - throw new IllegalStateException("no batch update in progress"); - } - if (batch.getLockid() != lockid) { - throw new IllegalArgumentException("invalid lock id " + lockid); - } - - try { - for (int tries = 0; tries < numRetries; tries++) { - HRegionLocation r = getRegionLocation(batch.getRow()); - HRegionInterface server = - connection.getHRegionConnection(r.getServerAddress()); - - try { - server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch); - break; - - } catch (IOException e) { - if (tries < numRetries -1) { - tableServers = connection.reloadTableServers(tableName); - - } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - try { - Thread.sleep(pause); - - } catch (InterruptedException e) { - } - } - } finally { - batch = null; - } + commit(lockid, timestamp); } /** * Start an atomic row insertion/update. No changes are committed until the - * call to commit() returns. A call to abort() will abandon any updates in progress. - * - * Callers to this method are given a lease for each unique lockid; before the - * lease expires, either abort() or commit() must be called. If it is not - * called, the system will automatically call abort() on the client's behalf. + * call to commit() returns. + * + * A call to abort() will abandon any updates in progress. * - * The client can gain extra time with a call to renewLease(). - * Start an atomic row insertion or update * * @param row Name of row to start update against. * @return Row lockid. - * @throws IOException */ - public synchronized long startUpdate(final Text row) throws IOException { + public synchronized long startUpdate(final Text row) { checkClosed(); - checkUpdateInProgress(); - for (int tries = 0; tries < numRetries; tries++) { - IOException e = null; - HRegionLocation info = getRegionLocation(row); - try { - currentServer = - connection.getHRegionConnection(info.getServerAddress()); - - currentRegion = info.getRegionInfo().getRegionName(); - clientid = rand.nextLong(); - currentLockId = currentServer.startUpdate(currentRegion, clientid, row); - - break; - - } catch (IOException ex) { - e = ex; - } - if (tries < numRetries - 1) { - try { - Thread.sleep(this.pause); - - } catch (InterruptedException ex) { - } - try { - tableServers = connection.reloadTableServers(tableName); - - } catch (IOException ex) { - e = ex; - } - } else { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - return currentLockId; + updateInProgress(false); + batch = new BatchUpdate(); + return batch.startUpdate(row); } /** @@ -561,37 +501,14 @@ * @param lockid lock id returned from startUpdate * @param column column whose value is being set * @param val new value for column - * @throws IOException */ - public void put(long lockid, Text column, byte val[]) throws IOException { + public void put(long lockid, Text column, byte val[]) { checkClosed(); if (val == null) { throw new IllegalArgumentException("value cannot be null"); } - if (batch != null) { - batch.put(lockid, column, val); - return; - } - - if (lockid != currentLockId) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.put(this.currentRegion, this.clientid, lockid, column, - val); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } + updateInProgress(true); + batch.put(lockid, column, val); } /** @@ -599,67 +516,25 @@ * * @param lockid - lock id returned from startUpdate * @param column - name of column whose value is to be deleted - * @throws IOException */ - public void delete(long lockid, Text column) throws IOException { + public void delete(long lockid, Text column) { checkClosed(); - if (batch != null) { - batch.delete(lockid, column); - return; - } - - if (lockid != currentLockId) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.delete(this.currentRegion, this.clientid, lockid, - column); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } + updateInProgress(true); + batch.delete(lockid, column); } /** * Abort a row mutation * * @param lockid - lock id returned from startUpdate - * @throws IOException */ - public synchronized void abort(long lockid) throws IOException { + public synchronized void abort(long lockid) { checkClosed(); - if (batch != null) { - abortBatch(lockid); - return; - } - - if (lockid != currentLockId) { - throw new IllegalArgumentException("invalid lockid"); - } - - try { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e) { - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } finally { - currentLockId = -1L; + updateInProgress(true); + if (batch.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); } + batch = null; } /** @@ -679,32 +554,45 @@ * @param timestamp - time to associate with the change * @throws IOException */ - public synchronized void commit(long lockid, long timestamp) throws IOException { + public synchronized void commit(long lockid, long timestamp) + throws IOException { + checkClosed(); - if (batch != null) { - commitBatch(lockid, timestamp); - return; + updateInProgress(true); + if (batch.getLockid() != lockid) { + throw new IllegalArgumentException("invalid lock id " + lockid); } + + try { + for (int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(batch.getRow()); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); - if (lockid != currentLockId) { - throw new IllegalArgumentException("invalid lockid"); - } + try { + server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch); + break; - try { - try { - this.currentServer.commit(this.currentRegion, this.clientid, lockid, - timestamp); + } catch (IOException e) { + if (tries < numRetries -1) { + tableServers = connection.reloadTableServers(tableName); - } catch (IOException e) { - this.currentServer = null; - this.currentRegion = null; - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } else { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } + throw e; + } + } + try { + Thread.sleep(pause); + + } catch (InterruptedException e) { } - throw e; } } finally { - currentLockId = -1L; + batch = null; } } @@ -712,32 +600,12 @@ * Renew lease on update * * @param lockid - lock id returned from startUpdate - * @throws IOException + * + * Deprecated. Batch updates are now the default. Consequently this method + * does nothing. */ - public synchronized void renewLease(long lockid) throws IOException { - checkClosed(); - if (batch != null) { - return; - } - - if (lockid != currentLockId) { - throw new IllegalArgumentException("invalid lockid"); - } - try { - this.currentServer.renewLease(lockid, this.clientid); - } catch (IOException e) { - try { - this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch (IOException e2) { - LOG.warn(e2); - } - this.currentServer = null; - this.currentRegion = null; - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } + @Deprecated + public synchronized void renewLease(@SuppressWarnings("unused") long lockid) { } /** Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java Tue Aug 14 18:12:29 2007 @@ -63,7 +63,7 @@ /** the test case */ public void testBatchUpdate() { try { - table.commitBatch(-1L); + table.commit(-1L); } catch (IllegalStateException e) { // expected @@ -72,7 +72,7 @@ fail(); } - long lockid = table.startBatchUpdate(new Text("row1")); + long lockid = table.startUpdate(new Text("row1")); try { try { @@ -86,9 +86,9 @@ } table.put(lockid, CONTENTS, value); table.delete(lockid, CONTENTS); - table.commitBatch(lockid); + table.commit(lockid); - lockid = table.startBatchUpdate(new Text("row2")); + lockid = table.startUpdate(new Text("row2")); table.put(lockid, CONTENTS, value); table.commit(lockid); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHTable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHTable.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHTable.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHTable.java Tue Aug 14 18:12:29 2007 @@ -59,7 +59,7 @@ byte[] value = "value".getBytes(UTF8_ENCODING); HTable a = new HTable(conf, tableAname); - long lockid = a.startBatchUpdate(row); + long lockid = a.startUpdate(row); a.put(lockid, COLUMN_FAMILY, value); a.commit(lockid); @@ -77,7 +77,7 @@ HStoreKey key = new HStoreKey(); TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>(); while(s.next(key, results)) { - lockid = b.startBatchUpdate(key.getRow()); + lockid = b.startUpdate(key.getRow()); for(Map.Entry<Text, byte[]> e: results.entrySet()) { b.put(lockid, e.getKey(), e.getValue()); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Tue Aug 14 18:12:29 2007 @@ -280,7 +280,7 @@ private void removeRegionFromMETA(final HTable t, final Text regionName) throws IOException { try { - long lockid = t.startBatchUpdate(regionName); + long lockid = t.startUpdate(regionName); t.delete(lockid, HConstants.COL_REGIONINFO); t.delete(lockid, HConstants.COL_SERVER); t.delete(lockid, HConstants.COL_STARTCODE); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java?view=diff&rev=565993&r1=565992&r2=565993 ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Tue Aug 14 18:12:29 2007 @@ -461,7 +461,7 @@ for (char e = thirdCharStart; e <= LAST_CHAR; e++) { byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e}; Text t = new Text(new String(bytes)); - long lockid = table.startBatchUpdate(t); + long lockid = table.startUpdate(t); try { table.put(lockid, new Text(column), bytes); table.commit(lockid, System.currentTimeMillis());