Author: nkeywal Date: Tue Nov 19 13:39:50 2013 New Revision: 1543428 URL: http://svn.apache.org/r1543428 Log: HBASE-9988 DOn't use HRI#getEncodedName in the client
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1543428&r1=1543427&r2=1543428&view=diff ============================================================================== --- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original) +++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Tue Nov 19 13:39:50 2013 @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.cloudera.htrace.Trace; @@ -95,13 +97,12 @@ class AsyncProcess<CResult> { protected final ExecutorService pool; protected final AsyncProcessCallback<CResult> callback; protected final BatchErrors errors = new BatchErrors(); - protected final BatchErrors retriedErrors = new BatchErrors(); protected final AtomicBoolean hasError = new AtomicBoolean(false); protected final AtomicLong tasksSent = new AtomicLong(0); protected final AtomicLong tasksDone = new AtomicLong(0); protected final AtomicLong retriesCnt = new AtomicLong(0); - protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion = - new ConcurrentHashMap<String, AtomicInteger>(); + protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = + new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<ServerName, AtomicInteger>(); @@ -289,7 +290,7 @@ class AsyncProcess<CResult> { // Remember the previous decisions about regions or region servers we put in the // final multi. - Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>(); + Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; @@ -369,15 +370,14 @@ class AsyncProcess<CResult> { * We're taking into account the past decision; if we have already accepted * operation on a given region, we accept all operations for this region. * - * * @param loc; the region and the server name we want to use. * @return true if this region is considered as busy. */ protected boolean canTakeOperation(HRegionLocation loc, - Map<String, Boolean> regionsIncluded, + Map<Long, Boolean> regionsIncluded, Map<ServerName, Boolean> serversIncluded) { - String encodedRegionName = loc.getRegionInfo().getEncodedName(); - Boolean regionPrevious = regionsIncluded.get(encodedRegionName); + long regionId = loc.getRegionInfo().getRegionId(); + Boolean regionPrevious = regionsIncluded.get(regionId); if (regionPrevious != null) { // We already know what to do with this region. @@ -387,14 +387,14 @@ class AsyncProcess<CResult> { Boolean serverPrevious = serversIncluded.get(loc.getServerName()); if (Boolean.FALSE.equals(serverPrevious)) { // It's a new region, on a region server that we have already excluded. - regionsIncluded.put(encodedRegionName, Boolean.FALSE); + regionsIncluded.put(regionId, Boolean.FALSE); return false; } - AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName); + AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { // Too many tasks on this region already. - regionsIncluded.put(encodedRegionName, Boolean.FALSE); + regionsIncluded.put(regionId, Boolean.FALSE); return false; } @@ -417,7 +417,7 @@ class AsyncProcess<CResult> { } if (!ok) { - regionsIncluded.put(encodedRegionName, Boolean.FALSE); + regionsIncluded.put(regionId, Boolean.FALSE); serversIncluded.put(loc.getServerName(), Boolean.FALSE); return false; } @@ -427,7 +427,7 @@ class AsyncProcess<CResult> { assert serverPrevious.equals(Boolean.TRUE); } - regionsIncluded.put(encodedRegionName, Boolean.TRUE); + regionsIncluded.put(regionId, Boolean.TRUE); return true; } @@ -582,18 +582,18 @@ class AsyncProcess<CResult> { if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) { canRetry = false; } - byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes(); + byte[] region = null; if (canRetry && callback != null) { + region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes(); canRetry = callback.retriableFailure(originalIndex, row, region, throwable); } - if (canRetry) { - if (LOG.isTraceEnabled()) { - retriedErrors.add(throwable, row, location); - } - } else { + if (!canRetry) { if (callback != null) { + if (region == null && location != null) { + region = location.getRegionInfo().getEncodedNameAsBytes(); + } callback.failure(originalIndex, region, row, throwable); } errors.add(throwable, row, location); @@ -875,7 +875,6 @@ class AsyncProcess<CResult> { */ public void clearErrors() { errors.clear(); - retriedErrors.clear(); hasError.set(false); } @@ -897,11 +896,13 @@ class AsyncProcess<CResult> { serverCnt.incrementAndGet(); for (byte[] regBytes : regions) { - String encodedRegionName = HRegionInfo.encodeRegionName(regBytes); - AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName); + AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); if (regionCnt == null) { - taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger()); - regionCnt = taskCounterPerRegion.get(encodedRegionName); + regionCnt = new AtomicInteger(); + AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt); + if (oldCnt != null) { + regionCnt = oldCnt; + } } regionCnt.incrementAndGet(); } @@ -912,8 +913,7 @@ class AsyncProcess<CResult> { */ protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) { for (byte[] regBytes : regions) { - String encodedRegionName = HRegionInfo.encodeRegionName(regBytes); - AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName); + AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); regionCnt.decrementAndGet(); } Modified: hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1543428&r1=1543427&r2=1543428&view=diff ============================================================================== --- hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original) +++ hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Tue Nov 19 13:39:50 2013 @@ -60,11 +60,12 @@ public class TestAsyncProcess { private static ServerName sn = new ServerName("localhost:10,1254"); private static ServerName sn2 = new ServerName("localhost:140,12540"); - private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2); + private static HRegionInfo hri1 = + new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); private static HRegionInfo hri2 = - new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW); + new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); private static HRegionInfo hri3 = - new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW); + new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); private static HRegionLocation loc1 = new HRegionLocation(hri1, sn); private static HRegionLocation loc2 = new HRegionLocation(hri2, sn); private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2); @@ -264,7 +265,7 @@ public class TestAsyncProcess { puts.add(createPut(2, true)); // <== new region, but the rs is ok ap.submit(puts, false); - Assert.assertEquals(1, puts.size()); + Assert.assertEquals(" puts=" + puts, 1, puts.size()); ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); ap.submit(puts, false); @@ -338,7 +339,7 @@ public class TestAsyncProcess { final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf); ap.tasksSent.incrementAndGet(); final AtomicInteger ai = new AtomicInteger(1); - ap.taskCounterPerRegion.put(hri1.getEncodedName(), ai); + ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); final AtomicBoolean checkPoint = new AtomicBoolean(false); final AtomicBoolean checkPoint2 = new AtomicBoolean(false); @@ -716,7 +717,7 @@ public class TestAsyncProcess { List<Get> gets = new ArrayList<Get>(NB_REGS); for (int i = 0; i < NB_REGS; i++) { HRegionInfo hri = new HRegionInfo( - DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L)); + DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i); HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); hrls.add(hrl);