Repository: hbase Updated Branches: refs/heads/master fb789b340 -> 540ede376
HBASE-16648 [JDK8] Use computeIfAbsent instead of get and putIfAbsent Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/540ede37 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/540ede37 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/540ede37 Branch: refs/heads/master Commit: 540ede376ba25db13cdb11ba86830fe0e6cec118 Parents: fb789b3 Author: zhangduo <zhang...@apache.org> Authored: Thu Dec 1 12:12:17 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Dec 1 21:17:55 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 19 +-- .../hbase/client/ConnectionImplementation.java | 156 ++++++------------- .../apache/hadoop/hbase/client/MetaCache.java | 23 +-- .../hadoop/hbase/client/MetricsConnection.java | 63 +++----- .../client/PreemptiveFastFailInterceptor.java | 50 +++--- .../hbase/client/ServerStatisticTracker.java | 26 +--- .../hadoop/hbase/util/CollectionUtils.java | 26 +++- .../ZKSplitLogManagerCoordination.java | 15 +- .../hadoop/hbase/master/ServerManager.java | 32 ++-- .../hadoop/hbase/master/SplitLogManager.java | 15 +- .../apache/hadoop/hbase/quotas/QuotaCache.java | 36 ++--- .../hadoop/hbase/regionserver/HRegion.java | 16 +- .../regionserver/wal/SequenceIdAccounting.java | 14 +- .../hbase/security/access/TableAuthManager.java | 28 ++-- .../hbase/wal/BoundedGroupingStrategy.java | 13 +- .../hadoop/hbase/master/TestMasterShutdown.java | 2 +- .../hbase/master/TestSplitLogManager.java | 20 ++- 17 files changed, 189 insertions(+), 365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index debb602..50a2a11 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -645,23 +647,10 @@ class AsyncProcess { protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) { tasksInProgress.incrementAndGet(); - AtomicInteger serverCnt = taskCounterPerServer.get(sn); - if (serverCnt == null) { - taskCounterPerServer.putIfAbsent(sn, new AtomicInteger()); - serverCnt = taskCounterPerServer.get(sn); - } - serverCnt.incrementAndGet(); + computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); for (byte[] regBytes : regions) { - AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); - if (regionCnt == null) { - regionCnt = new AtomicInteger(); - AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt); - if (oldCnt != null) { - regionCnt = oldCnt; - } - } - regionCnt.incrementAndGet(); + computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 96452f9..aa984b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx; import com.google.common.annotations.VisibleForTesting; @@ -921,11 +923,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } // Map keyed by service name + regionserver to service stub implementation - private final ConcurrentHashMap<String, Object> stubs = - new ConcurrentHashMap<String, Object>(); - // Map of locks used creating service stubs per regionserver. - private final ConcurrentHashMap<String, String> connectionLock = - new ConcurrentHashMap<String, String>(); + private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<String, Object>(); /** * State of the MasterService connection/setup. @@ -1008,7 +1006,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { long result; ServerErrors errorStats = errorsByServer.get(server); if (errorStats != null) { - result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount()); + result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); } else { result = 0; // yes, if the server is not in our list we don't wait before retrying. } @@ -1017,19 +1015,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { /** * Reports that there was an error on the server to do whatever bean-counting necessary. - * * @param server The server in question. */ void reportServerError(ServerName server) { - ServerErrors errors = errorsByServer.get(server); - if (errors != null) { - errors.addError(); - } else { - errors = errorsByServer.putIfAbsent(server, new ServerErrors()); - if (errors != null){ - errors.addError(); - } - } + computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); } long getStartTrackingTime() { @@ -1053,32 +1042,26 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } /** - * Makes a client-side stub for master services. Sub-class to specialize. - * Depends on hosting class so not static. Exists so we avoid duplicating a bunch of code - * when setting up the MasterMonitorService and MasterAdminService. + * Class to make a MasterServiceStubMaker stub. */ - abstract class StubMaker { - /** - * Returns the name of the service stub being created. - */ - protected abstract String getServiceName(); - - /** - * Make stub and cache it internal so can be used later doing the isMasterRunning call. - */ - protected abstract Object makeStub(final BlockingRpcChannel channel); + private final class MasterServiceStubMaker { - /** - * Once setup, check it works by doing isMasterRunning check. - */ - protected abstract void isMasterRunning() throws IOException; + private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) + throws IOException { + try { + stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); + } catch (ServiceException e) { + throw ProtobufUtil.handleRemoteException(e); + } + } /** - * Create a stub. Try once only. It is not typed because there is no common type to - * protobuf services nor their interfaces. Let the caller do appropriate casting. + * Create a stub. Try once only. It is not typed because there is no common type to protobuf + * services nor their interfaces. Let the caller do appropriate casting. * @return A stub for master services. */ - private Object makeStubNoRetries() throws IOException, KeeperException { + private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() + throws IOException, KeeperException { ZooKeeperKeepAliveConnection zkw; try { zkw = getKeepAliveZooKeeperWatcher(); @@ -1098,18 +1081,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key - String key = getStubKey(getServiceName(), sn, hostnamesCanChange); - connectionLock.putIfAbsent(key, key); - Object stub = null; - synchronized (connectionLock.get(key)) { - stub = stubs.get(key); - if (stub == null) { - BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); - stub = makeStub(channel); - isMasterRunning(); - stubs.put(key, stub); - } - } + String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, + hostnamesCanChange); + MasterProtos.MasterService.BlockingInterface stub = + (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); + return MasterProtos.MasterService.newBlockingStub(channel); + }); + isMasterRunning(stub); return stub; } finally { zkw.close(); @@ -1121,9 +1100,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable { * @return A stub to do <code>intf</code> against the master * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running */ - Object makeStub() throws IOException { + MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { // The lock must be at the beginning to prevent multiple master creations - // (and leaks) in a multithread context + // (and leaks) in a multithread context synchronized (masterAndZKLock) { Exception exceptionCaught = null; if (!closed) { @@ -1142,80 +1121,33 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } - /** - * Class to make a MasterServiceStubMaker stub. - */ - class MasterServiceStubMaker extends StubMaker { - private MasterProtos.MasterService.BlockingInterface stub; - @Override - protected String getServiceName() { - return MasterProtos.MasterService.getDescriptor().getName(); - } - - @Override - MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { - return (MasterProtos.MasterService.BlockingInterface)super.makeStub(); - } - - @Override - protected Object makeStub(BlockingRpcChannel channel) { - this.stub = MasterProtos.MasterService.newBlockingStub(channel); - return this.stub; - } - - @Override - protected void isMasterRunning() throws IOException { - try { - this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - } - @Override - public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName) + public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) throws IOException { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, this.hostnamesCanChange); - this.connectionLock.putIfAbsent(key, key); - AdminProtos.AdminService.BlockingInterface stub; - synchronized (this.connectionLock.get(key)) { - stub = (AdminProtos.AdminService.BlockingInterface)this.stubs.get(key); - if (stub == null) { - BlockingRpcChannel channel = + return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { + BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); - stub = AdminProtos.AdminService.newBlockingStub(channel); - this.stubs.put(key, stub); - } - } - return stub; + return AdminProtos.AdminService.newBlockingStub(channel); + }); } @Override - public BlockingInterface getClient(final ServerName sn) - throws IOException { - if (isDeadServer(sn)) { - throw new RegionServerStoppedException(sn + " is dead."); + public BlockingInterface getClient(ServerName serverName) throws IOException { + if (isDeadServer(serverName)) { + throw new RegionServerStoppedException(serverName + " is dead."); } - String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn, - this.hostnamesCanChange); - this.connectionLock.putIfAbsent(key, key); - ClientProtos.ClientService.BlockingInterface stub = null; - synchronized (this.connectionLock.get(key)) { - stub = (ClientProtos.ClientService.BlockingInterface)this.stubs.get(key); - if (stub == null) { - BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); - stub = ClientProtos.ClientService.newBlockingStub(channel); - // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. - // Just fail on first actual call rather than in here on setup. - this.stubs.put(key, stub); - } - } - return stub; + String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), + serverName, this.hostnamesCanChange); + return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { + BlockingRpcChannel channel = + this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); + return ClientProtos.ClientService.newBlockingStub(channel); + }); } private ZooKeeperKeepAliveConnection keepAliveZookeeper; http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 3914df5..14e0afd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -49,8 +51,7 @@ public class MetaCache { * Map of table to table {@link HRegionLocation}s. */ private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], RegionLocations>> - cachedRegionLocations = - new CopyOnWriteArrayMap<>(); + cachedRegionLocations = new CopyOnWriteArrayMap<>(); // The presence of a server in the map implies it's likely that there is an // entry in cachedRegionLocations that map to this server; but the absence @@ -191,21 +192,11 @@ public class MetaCache { * @param tableName * @return Map of cached locations for passed <code>tableName</code> */ - private ConcurrentNavigableMap<byte[], RegionLocations> - getTableLocations(final TableName tableName) { + private ConcurrentNavigableMap<byte[], RegionLocations> getTableLocations( + final TableName tableName) { // find the map of cached locations for this table - ConcurrentNavigableMap<byte[], RegionLocations> result; - result = this.cachedRegionLocations.get(tableName); - // if tableLocations for this table isn't built yet, make one - if (result == null) { - result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR); - ConcurrentNavigableMap<byte[], RegionLocations> old = - this.cachedRegionLocations.putIfAbsent(tableName, result); - if (old != null) { - return old; - } - } - return result; + return computeIfAbsent(cachedRegionLocations, tableName, + () -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR)); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 36627bd..64b1661 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -17,29 +17,31 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import static com.codahale.metrics.MetricRegistry.name; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.RatioGauge; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; -import org.apache.hadoop.hbase.util.Bytes; +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static com.codahale.metrics.MetricRegistry.name; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.util.Bytes; /** * This class is for maintaining the various connection statistics and publishing them through @@ -207,32 +209,15 @@ public class MetricsConnection implements StatisticTrackable { } @Override - public void updateRegionStats(ServerName serverName, byte[] regionName, - RegionLoadStats stats) { + public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); - ConcurrentMap<byte[], RegionStats> rsStats = null; - if (serverStats.containsKey(serverName)) { - rsStats = serverStats.get(serverName); - } else { - rsStats = serverStats.putIfAbsent(serverName, - new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR)); - if (rsStats == null) { - rsStats = serverStats.get(serverName); - } - } - RegionStats regionStats = null; - if (rsStats.containsKey(regionName)) { - regionStats = rsStats.get(regionName); - } else { - regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name)); - if (regionStats == null) { - regionStats = rsStats.get(regionName); - } - } + ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, + () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + RegionStats regionStats = + computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); regionStats.update(stats); } - /** A lambda for dispatching to the appropriate metric factory method */ private static interface NewMetric<T> { T newMetric(Class<?> clazz, String name, String scope); @@ -407,13 +392,7 @@ public class MetricsConnection implements StatisticTrackable { * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. */ private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { - T t = map.get(key); - if (t == null) { - t = factory.newMetric(this.getClass(), key, scope); - T tmp = map.putIfAbsent(key, t); - t = (tmp == null) ? t : tmp; - } - return t; + return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); } /** Update call stats for non-critical-path methods */ http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index 7ac5c45..448e5b1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -35,34 +39,29 @@ import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; -import com.google.common.annotations.VisibleForTesting; - /** - * * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail * feature. - * - * The motivation is as follows : - * In case where a large number of clients try and talk to a particular region server in hbase, if - * the region server goes down due to network problems, we might end up in a scenario where - * the clients would go into a state where they all start to retry. + * <p> + * The motivation is as follows : In case where a large number of clients try and talk to a + * particular region server in hbase, if the region server goes down due to network problems, we + * might end up in a scenario where the clients would go into a state where they all start to retry. * This behavior will set off many of the threads in pretty much the same path and they all would be * sleeping giving rise to a state where the client either needs to create more threads to send new * requests to other hbase machines or block because the client cannot create anymore threads. - * + * <p> * In most cases the clients might prefer to have a bound on the number of threads that are created * in order to send requests to hbase. This would mostly result in the client thread starvation. - * - * To circumvent this problem, the approach that is being taken here under is to let 1 of the many - * threads who are trying to contact the regionserver with connection problems and let the other - * threads get a {@link PreemptiveFastFailException} so that they can move on and take other - * requests. - * - * This would give the client more flexibility on the kind of action he would want to take in cases - * where the regionserver is down. He can either discard the requests and send a nack upstream - * faster or have an application level retry or buffer the requests up so as to send them down to - * hbase later. - * + * <p> + * To circumvent this problem, the approach that is being taken here under is to let 1 of the many + * threads who are trying to contact the regionserver with connection problems and let the other + * threads get a {@link PreemptiveFastFailException} so that they can move on and take other + * requests. + * <p> + * This would give the client more flexibility on the kind of action he would want to take in cases + * where the regionserver is down. He can either discard the requests and send a nack upstream + * faster or have an application level retry or buffer the requests up so as to send them down to + * hbase later. */ @InterfaceAudience.Private class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { @@ -155,15 +154,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { return; } long currentTime = EnvironmentEdgeManager.currentTime(); - FailureInfo fInfo = repeatedFailuresMap.get(serverName); - if (fInfo == null) { - fInfo = new FailureInfo(currentTime); - FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo); - - if (oldfInfo != null) { - fInfo = oldfInfo; - } - } + FailureInfo fInfo = + computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime)); fInfo.timeOfLatestAttemptMilliSec = currentTime; fInfo.numConsecutiveFailures.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java index cb21e8b..f66e7fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -17,15 +17,18 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.ConcurrentHashMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; -import java.util.concurrent.ConcurrentHashMap; - /** * Tracks the statistics for multiple regions */ @@ -36,23 +39,8 @@ public class ServerStatisticTracker implements StatisticTrackable { new ConcurrentHashMap<ServerName, ServerStatistics>(); @Override - public void updateRegionStats(ServerName server, byte[] region, RegionLoadStats - currentStats) { - ServerStatistics stat = stats.get(server); - - if (stat == null) { - stat = stats.get(server); - // We don't have stats for that server yet, so we need to make an entry. - // If we race with another thread it's a harmless unnecessary allocation. - if (stat == null) { - stat = new ServerStatistics(); - ServerStatistics old = stats.putIfAbsent(server, stat); - if (old != null) { - stat = old; - } - } - } - stat.update(region, currentStats); + public void updateRegionStats(ServerName server, byte[] region, RegionLoadStats currentStats) { + computeIfAbsent(stats, server, ServerStatistics::new).update(region, currentStats); } public ServerStatistics getStats(ServerName server) { http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index 4e19b77..8cc71a3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -110,15 +110,12 @@ public class CollectionUtils { /** * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the - * value already exists. So here we copy the implementation of - * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and - * putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee - * that the supplier will only be executed once. + * value already exists. Notice that the implementation does not guarantee that the supplier will + * only be executed once. */ public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier) { - V v, newValue; - return ((v = map.get(key)) == null && (newValue = supplier.get()) != null - && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; + return computeIfAbsent(map, key, supplier, () -> { + }); } /** @@ -142,4 +139,19 @@ public class CollectionUtils { return ((v = map.get(key)) == null && (newValue = supplier.get()) != null && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; } + + public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier, + Runnable actionIfAbsent) { + V v = map.get(key); + if (v != null) { + return v; + } + V newValue = supplier.get(); + v = map.putIfAbsent(key, newValue); + if (v != null) { + return v; + } + actionIfAbsent.run(); + return newValue; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index cb5df75..a226eb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.coordination; +import static org.apache.hadoop.hbase.util.CollectionUtils.*; import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; @@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLo import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; @@ -449,7 +451,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements setDone(path, FAILURE); return; } - data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); + data = RecoverableZooKeeper.removeMetaData(data); SplitLogTask slt = SplitLogTask.parseFrom(data); if (slt.isUnassigned()) { LOG.debug("task not yet acquired " + path + " ver = " + version); @@ -531,16 +533,11 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements return; } - Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(); - Task task; - task = details.getTasks().putIfAbsent(path, orphanTask); - if (task == null) { + private Task findOrCreateOrphanTask(String path) { + return computeIfAbsent(details.getTasks(), path, Task::new, () -> { LOG.info("creating orphan task " + path); SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); - task = orphanTask; - } - return task; + }); } private void heartbeat(String path, int new_version, ServerName workerName) { http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 278030f..a567e1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; @@ -56,6 +60,11 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -67,9 +76,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.Reg import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionOpeningState; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RetryCounter; @@ -78,11 +84,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; - /** * The ServerManager class manages info about region servers. * <p> @@ -273,18 +274,6 @@ public class ServerManager { return sn; } - private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId( - byte[] regionName) { - ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId = - storeFlushedSequenceIdsByRegion.get(regionName); - if (storeFlushedSequenceId != null) { - return storeFlushedSequenceId; - } - storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR); - ConcurrentNavigableMap<byte[], Long> alreadyPut = - storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId); - return alreadyPut == null ? storeFlushedSequenceId : alreadyPut; - } /** * Updates last flushed sequence Ids for the regions on server sn * @param sn @@ -309,7 +298,8 @@ public class ServerManager { + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring."); } ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId = - getOrCreateStoreFlushedSequenceId(encodedRegionName); + computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName, + () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) { byte[] family = storeSeqId.getFamilyName().toByteArray(); existingValue = storeFlushedSequenceId.get(family); http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 9328687..589da14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -117,7 +117,8 @@ public class SplitLogManager { */ protected final ReentrantLock recoveringRegionLock = new ReentrantLock(); - private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>(); + @VisibleForTesting + final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>(); private TimeoutMonitor timeoutMonitor; private volatile Set<ServerName> deadWorkers = null; @@ -504,18 +505,6 @@ public class SplitLogManager { } } - Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(); - Task task; - task = tasks.putIfAbsent(path, orphanTask); - if (task == null) { - LOG.info("creating orphan task " + path); - SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); - task = orphanTask; - } - return task; - } - public void stop() { if (choreService != null) { choreService.shutdown(); http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 15962d2..1451052 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.quotas; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -38,8 +42,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; -import com.google.common.annotations.VisibleForTesting; - /** * Cache that keeps track of the quota settings for the users and tables that * are interacting with it. @@ -114,20 +116,12 @@ public class QuotaCache implements Stoppable { /** * Returns the QuotaState associated to the specified user. - * * @param ugi the user * @return the quota info associated to specified user */ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { - String key = ugi.getShortUserName(); - UserQuotaState quotaInfo = userQuotaCache.get(key); - if (quotaInfo == null) { - quotaInfo = new UserQuotaState(); - if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) { - triggerCacheRefresh(); - } - } - return quotaInfo; + return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, + this::triggerCacheRefresh); } /** @@ -151,24 +145,12 @@ public class QuotaCache implements Stoppable { } /** - * Returns the QuotaState requested. - * If the quota info is not in cache an empty one will be returned - * and the quota request will be enqueued for the next cache refresh. + * Returns the QuotaState requested. If the quota info is not in cache an empty one will be + * returned and the quota request will be enqueued for the next cache refresh. */ private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, final K key) { - QuotaState quotaInfo = quotasMap.get(key); - if (quotaInfo == null) { - quotaInfo = new QuotaState(); - if (quotasMap.putIfAbsent(key, quotaInfo) == null) { - triggerCacheRefresh(); - } - } - return quotaInfo; - } - - private Configuration getConfiguration() { - return rsServices.getConfiguration(); + return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 19e1235..d2a1838 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -5314,19 +5315,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Keep trying until we have a lock or error out. // TODO: do we need to add a time component here? while (result == null) { - - // Try adding a RowLockContext to the lockedRows. - // If we can add it then there's no other transactions currently running. - rowLockContext = new RowLockContext(rowKey); - RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); - - // if there was a running transaction then there's already a context. - if (existingContext != null) { - rowLockContext = existingContext; - } - + rowLockContext = computeIfAbsent(lockedRows, rowKey, () -> new RowLockContext(rowKey)); // Now try an get the lock. - // // This can fail as if (readLock) { result = rowLockContext.newReadLock(); http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 62dea53..6e7ad9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; @@ -215,16 +217,8 @@ class SequenceIdAccounting { @VisibleForTesting ConcurrentMap<ImmutableByteArray, Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) { // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. - ConcurrentMap<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds - .get(encodedRegionName); - if (m != null) { - return m; - } - m = new ConcurrentHashMap<>(); - // Another thread may have added it ahead of us. - ConcurrentMap<ImmutableByteArray, Long> alreadyPut = this.lowestUnflushedSequenceIds - .putIfAbsent(encodedRegionName, m); - return alreadyPut == null ? m : alreadyPut; + return computeIfAbsent(this.lowestUnflushedSequenceIds, encodedRegionName, + ConcurrentHashMap::new); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 25cfc8b..5032d96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,13 @@ package org.apache.hadoop.hbase.security.access; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; + import java.io.Closeable; import java.io.IOException; import java.util.HashMap; @@ -28,11 +35,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -41,11 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; - /** * Performs authorization checks for a given user's assigned permissions */ @@ -276,17 +278,11 @@ public class TableAuthManager implements Closeable { } private PermissionCache<TablePermission> getTablePermissions(TableName table) { - if (!tableCache.containsKey(table)) { - tableCache.putIfAbsent(table, new PermissionCache<TablePermission>()); - } - return tableCache.get(table); + return computeIfAbsent(tableCache, table, PermissionCache::new); } private PermissionCache<TablePermission> getNamespacePermissions(String namespace) { - if (!nsCache.containsKey(namespace)) { - nsCache.putIfAbsent(namespace, new PermissionCache<TablePermission>()); - } - return nsCache.get(namespace); + return computeIfAbsent(nsCache, namespace, PermissionCache::new); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java index 06f8792..5b32347 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -44,15 +46,8 @@ public class BoundedGroupingStrategy implements RegionGroupingStrategy{ @Override public String group(byte[] identifier, byte[] namespace) { String idStr = Bytes.toString(identifier); - String groupName = groupNameCache.get(idStr); - if (null == groupName) { - groupName = groupNames[getAndIncrAtomicInteger(counter, groupNames.length)]; - String extantName = groupNameCache.putIfAbsent(idStr, groupName); - if (extantName != null) { - return extantName; - } - } - return groupName; + return computeIfAbsent(groupNameCache, idStr, + () -> groupNames[getAndIncrAtomicInteger(counter, groupNames.length)]); } // Non-blocking incrementing & resetting of AtomicInteger. http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java index 7f95e75..ebf16b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java @@ -122,7 +122,7 @@ public class TestMasterShutdown { final MasterThread master = cluster.getMasters().get(MASTER_INDEX); master.start(); LOG.info("Called master start on " + master.getName()); - Thread shutdownThread = new Thread() { + Thread shutdownThread = new Thread("Shutdown-Thread") { public void run() { LOG.info("Before call to shutdown master"); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/540ede37/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 8791e3e..022e7b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -58,8 +58,8 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; @@ -196,6 +196,14 @@ public class TestSplitLogManager { assertEquals(newval, e.eval()); } + private Task findOrCreateOrphanTask(String path) { + return slm.tasks.computeIfAbsent(path, k -> { + LOG.info("creating orphan task " + k); + SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); + return new Task(); + }); + } + private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); @@ -205,7 +213,7 @@ public class TestSplitLogManager { slm.enqueueSplitTask(name, batch); assertEquals(1, batch.installed); - assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); + assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); assertEquals(1L, tot_mgr_node_create_queued.get()); LOG.debug("waiting for task node creation"); @@ -244,7 +252,7 @@ public class TestSplitLogManager { slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); - Task task = slm.findOrCreateOrphanTask(tasknode); + Task task = findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); @@ -270,12 +278,12 @@ public class TestSplitLogManager { slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); - Task task = slm.findOrCreateOrphanTask(tasknode); + Task task = findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created - waitForCounter(tot_mgr_rescan, 0, 1, to/2); - Task task2 = slm.findOrCreateOrphanTask(tasknode); + waitForCounter(tot_mgr_rescan, 0, 1, to / 2); + Task task2 = findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); assertEquals(1L, tot_mgr_resubmit.get());