http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java index c51a437..95d77a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java @@ -22,7 +22,7 @@ package org.apache.hadoop.hbase; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Set; +import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -67,7 +67,7 @@ public class ClusterStatus extends VersionedWritable { private Collection<ServerName> deadServers; private ServerName master; private Collection<ServerName> backupMasters; - private Set<RegionState> intransition; + private List<RegionState> intransition; private String clusterId; private String[] masterCoprocessors; private Boolean balancerOn; @@ -77,7 +77,7 @@ public class ClusterStatus extends VersionedWritable { final Collection<ServerName> deadServers, final ServerName master, final Collection<ServerName> backupMasters, - final Set<RegionState> rit, + final List<RegionState> rit, final String[] masterCoprocessors, final Boolean balancerOn) { this.hbaseVersion = hbaseVersion; @@ -248,7 +248,7 @@ public class ClusterStatus extends VersionedWritable { } @InterfaceAudience.Private - public Set<RegionState> getRegionsInTransition() { + public List<RegionState> getRegionsInTransition() { return this.intransition; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index bc93cc6..5b9cbec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -168,6 +169,15 @@ public class HRegionInfo implements Comparable<HRegionInfo> { } /** + * @return Return a String of short, printable names for <code>hris</code> + * (usually encoded name) for us logging. + */ + public static String getShortNameToLog(final List<HRegionInfo> hris) { + return hris.stream().map(hri -> hri.getShortNameToLog()). + collect(Collectors.toList()).toString(); + } + + /** * Use logging. * @param encodedRegionName The encoded regionname. * @return <code>hbase:meta</code> if passed <code>1028785192</code> else returns http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 15bc132..9f1be9f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -1966,8 +1966,8 @@ public class MetaTableAccessor { * @param regionsInfo list of regions to be deleted from META * @throws IOException */ - public static void deleteRegions(Connection connection, - List<HRegionInfo> regionsInfo, long ts) throws IOException { + public static void deleteRegions(Connection connection, List<HRegionInfo> regionsInfo, long ts) + throws IOException { List<Delete> deletes = new ArrayList<>(regionsInfo.size()); for (HRegionInfo hri: regionsInfo) { Delete e = new Delete(hri.getRegionName()); @@ -2002,10 +2002,10 @@ public class MetaTableAccessor { } mutateMetaTable(connection, mutation); if (regionsToRemove != null && regionsToRemove.size() > 0) { - LOG.debug("Deleted " + regionsToRemove); + LOG.debug("Deleted " + HRegionInfo.getShortNameToLog(regionsToRemove)); } if (regionsToAdd != null && regionsToAdd.size() > 0) { - LOG.debug("Added " + regionsToAdd); + LOG.debug("Added " + HRegionInfo.getShortNameToLog(regionsToAdd)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 6859cb3..9b3f1a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1323,6 +1323,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return stub.mergeTableRegions(controller, request); } + public MasterProtos.DispatchMergingRegionsResponse dispatchMergingRegions( + RpcController controller, MasterProtos.DispatchMergingRegionsRequest request) + throws ServiceException { + return stub.dispatchMergingRegions(controller, request); + } + @Override public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, MasterProtos.AssignRegionRequest request) throws ServiceException { @@ -1342,6 +1348,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } @Override + public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, + MasterProtos.SplitTableRegionRequest request) throws ServiceException { + return stub.splitRegion(controller, request); + } + + @Override public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, MasterProtos.DeleteTableRequest request) throws ServiceException { return stub.deleteTable(controller, request); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index e3b5b12..648fdca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -484,4 +484,15 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { return stub.listReplicationPeers(controller, request); } -} + @Override + public SplitTableRegionResponse splitRegion(RpcController controller, SplitTableRegionRequest request) + throws ServiceException { + return stub.splitRegion(controller, request); + } + + @Override + public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller, + DispatchMergingRegionsRequest request) throws ServiceException { + return stub.dispatchMergingRegions(controller, request); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index e69b42d..08533b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -226,8 +226,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { switch (idleEvt.state()) { case WRITER_IDLE: if (id2Call.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("shutdown connection to " + conn.remoteId().address + if (LOG.isTraceEnabled()) { + LOG.trace("shutdown connection to " + conn.remoteId().address + " because idle for a long time"); } // It may happen that there are still some pending calls in the event loop queue and http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index b5a7959..98d2256 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -129,7 +129,11 @@ abstract class RpcConnection { authMethod = AuthMethod.KERBEROS; } - if (LOG.isDebugEnabled()) { + // Log if debug AND non-default auth, else if trace enabled. + // No point logging obvious. + if ((LOG.isDebugEnabled() && !authMethod.equals(AuthMethod.SIMPLE)) || + LOG.isTraceEnabled()) { + // Only log if not default auth. LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName + ", sasl=" + useSasl); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java index 0e12ef6..7116763 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java @@ -36,10 +36,8 @@ public class RegionState { @InterfaceStability.Evolving public enum State { OFFLINE, // region is in an offline state - PENDING_OPEN, // same as OPENING, to be removed OPENING, // server has begun to open but not yet done OPEN, // server opened region and updated meta - PENDING_CLOSE, // same as CLOSING, to be removed CLOSING, // server has begun to close but not yet done CLOSED, // server closed region and updated meta SPLITTING, // server started split of a region @@ -64,18 +62,12 @@ public class RegionState { case OFFLINE: rs = ClusterStatusProtos.RegionState.State.OFFLINE; break; - case PENDING_OPEN: - rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN; - break; case OPENING: rs = ClusterStatusProtos.RegionState.State.OPENING; break; case OPEN: rs = ClusterStatusProtos.RegionState.State.OPEN; break; - case PENDING_CLOSE: - rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE; - break; case CLOSING: rs = ClusterStatusProtos.RegionState.State.CLOSING; break; @@ -124,8 +116,6 @@ public class RegionState { state = OFFLINE; break; case PENDING_OPEN: - state = PENDING_OPEN; - break; case OPENING: state = OPENING; break; @@ -133,8 +123,6 @@ public class RegionState { state = OPEN; break; case PENDING_CLOSE: - state = PENDING_CLOSE; - break; case CLOSING: state = CLOSING; break; @@ -231,22 +219,16 @@ public class RegionState { this.ritDuration += (this.stamp - previousStamp); } - /** - * PENDING_CLOSE (to be removed) is the same as CLOSING - */ public boolean isClosing() { - return state == State.PENDING_CLOSE || state == State.CLOSING; + return state == State.CLOSING; } public boolean isClosed() { return state == State.CLOSED; } - /** - * PENDING_OPEN (to be removed) is the same as OPENING - */ public boolean isOpening() { - return state == State.PENDING_OPEN || state == State.OPENING; + return state == State.OPENING; } public boolean isOpened() { http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index d4c4231..2bf386a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -20,19 +20,19 @@ package org.apache.hadoop.hbase.shaded.protobuf; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -90,11 +90,13 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.procedure2.LockInfo; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest; import org.apache.hadoop.hbase.quotas.QuotaScope; import org.apache.hadoop.hbase.quotas.QuotaType; import org.apache.hadoop.hbase.quotas.ThrottleType; import org.apache.hadoop.hbase.replication.ReplicationLoadSink; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.Authorizations; import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; @@ -108,8 +110,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -175,6 +175,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Methods; @@ -1840,33 +1841,6 @@ public final class ProtobufUtil { } /** - * A helper to close a region for split or merge - * using admin protocol. - * - * @param controller RPC controller - * @param admin Admin service - * @param server the RS that hosts the target region - * @param regionInfo the target region info - * @return true if the region is closed - * @throws IOException - */ - public static boolean closeRegionForSplitOrMerge( - final RpcController controller, - final AdminService.BlockingInterface admin, - final ServerName server, - final HRegionInfo... regionInfo) throws IOException { - CloseRegionForSplitOrMergeRequest closeRegionForRequest = - ProtobufUtil.buildCloseRegionForSplitOrMergeRequest(server, regionInfo); - try { - CloseRegionForSplitOrMergeResponse response = - admin.closeRegionForSplitOrMerge(controller, closeRegionForRequest); - return ResponseConverter.isClosed(response); - } catch (ServiceException se) { - throw getRemoteException(se); - } - } - - /** * A helper to warmup a region given a region name * using admin protocol * @@ -2018,6 +1992,46 @@ public final class ProtobufUtil { } } + /** + * A helper to merge regions using admin protocol. Send request to + * regionserver. + * @param admin + * @param region_a + * @param region_b + * @param forcible true if do a compulsory merge, otherwise we will only merge + * two adjacent regions + * @param user effective user + * @throws IOException + */ + public static void mergeRegions(final RpcController controller, + final AdminService.BlockingInterface admin, + final HRegionInfo region_a, final HRegionInfo region_b, + final boolean forcible, final User user) throws IOException { + final MergeRegionsRequest request = ProtobufUtil.buildMergeRegionsRequest( + region_a.getRegionName(), region_b.getRegionName(),forcible); + if (user != null) { + try { + user.runAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + admin.mergeRegions(controller, request); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } else { + try { + admin.mergeRegions(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + } + // End helpers for Admin /* @@ -3050,8 +3064,8 @@ public final class ProtobufUtil { backupMasters.add(ProtobufUtil.toServerName(sn)); } - Set<RegionState> rit = null; - rit = new HashSet<>(proto.getRegionsInTransitionList().size()); + List<RegionState> rit = + new ArrayList<>(proto.getRegionsInTransitionList().size()); for (RegionInTransition region : proto.getRegionsInTransitionList()) { RegionState value = RegionState.convert(region.getRegionState()); rit.add(value); @@ -3210,26 +3224,6 @@ public final class ProtobufUtil { } /** - * Create a CloseRegionForSplitOrMergeRequest for given regions - * - * @param server the RS server that hosts the region - * @param regionsToClose the info of the regions to close - * @return a CloseRegionForSplitRequest - */ - public static CloseRegionForSplitOrMergeRequest buildCloseRegionForSplitOrMergeRequest( - final ServerName server, - final HRegionInfo... regionsToClose) { - CloseRegionForSplitOrMergeRequest.Builder builder = - CloseRegionForSplitOrMergeRequest.newBuilder(); - for(int i = 0; i < regionsToClose.length; i++) { - RegionSpecifier regionToClose = RequestConverter.buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionsToClose[i].getRegionName()); - builder.addRegion(regionToClose); - } - return builder.build(); - } - - /** * Create a CloseRegionRequest for a given encoded region name * * @param encodedRegionName the name of the region to close @@ -3267,6 +3261,28 @@ public final class ProtobufUtil { return builder.build(); } + /** + * Create a MergeRegionsRequest for the given regions + * @param regionA name of region a + * @param regionB name of region b + * @param forcible true if it is a compulsory merge + * @return a MergeRegionsRequest + */ + public static MergeRegionsRequest buildMergeRegionsRequest( + final byte[] regionA, final byte[] regionB, final boolean forcible) { + MergeRegionsRequest.Builder builder = MergeRegionsRequest.newBuilder(); + RegionSpecifier regionASpecifier = RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionA); + RegionSpecifier regionBSpecifier = RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionB); + builder.setRegionA(regionASpecifier); + builder.setRegionB(regionBSpecifier); + builder.setForcible(forcible); + // send the master's wall clock time as well, so that the RS can refer to it + builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); + return builder.build(); + } + /** * Get a ServerName from the passed in data bytes. * @param data Data with a serialize server name in it; can handle the old style http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 366e050..de2544a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -118,7 +118,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOr import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; @@ -1114,19 +1113,6 @@ public final class RequestConverter { return builder.build(); } - public static SplitTableRegionRequest buildSplitTableRegionRequest( - final HRegionInfo regionInfo, - final byte[] splitPoint, - final long nonceGroup, - final long nonce) { - SplitTableRegionRequest.Builder builder = SplitTableRegionRequest.newBuilder(); - builder.setRegionInfo(HRegionInfo.convert(regionInfo)); - builder.setSplitRow(UnsafeByteOperations.unsafeWrap(splitPoint)); - builder.setNonceGroup(nonceGroup); - builder.setNonce(nonce); - return builder.build(); - } - /** * Create a protocol buffer AssignRegionRequest * @@ -1509,7 +1495,7 @@ public final class RequestConverter { /** * Create a RegionOpenInfo based on given region info and version of offline node */ - private static RegionOpenInfo buildRegionOpenInfo( + public static RegionOpenInfo buildRegionOpenInfo( final HRegionInfo region, final List<ServerName> favoredNodes, Boolean openForReplay) { RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java index ecadbbc..c489628 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.SingleResponse; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; @@ -254,18 +253,6 @@ public final class ResponseConverter { } /** - * Check if the region is closed from a CloseRegionForSplitResponse - * - * @param proto the CloseRegionForSplitResponse - * @return the region close state - */ - public static boolean isClosed - (final CloseRegionForSplitOrMergeResponse proto) { - if (proto == null || !proto.hasClosed()) return false; - return proto.getClosed(); - } - - /** * A utility to build a GetServerInfoResponse. * * @param serverName http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index afab54a..c11d896 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -439,6 +439,10 @@ public class MetaTableLocator { */ public static void setMetaLocation(ZooKeeperWatcher zookeeper, ServerName serverName, int replicaId, RegionState.State state) throws KeeperException { + if (serverName == null) { + LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required"); + return; + } LOG.info("Setting hbase:meta region location in ZooKeeper as " + serverName); // Make the MetaRegionServer pb and then get its bytes and save this as // the znode content. @@ -448,7 +452,8 @@ public class MetaTableLocator { .setState(state.convert()).build(); byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray()); try { - ZKUtil.setData(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId), data); + ZKUtil.setData(zookeeper, + zookeeper.znodePaths.getZNodeForReplica(replicaId), data); } catch(KeeperException.NoNodeException nne) { if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { LOG.debug("META region location doesn't exist, create it"); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java index 6104c22..36dabdd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java @@ -80,12 +80,11 @@ public class ProcedureInfo implements Cloneable { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("Procedure="); sb.append(procName); - sb.append(" (id="); + sb.append(" pid="); sb.append(procId); if (hasParentId()) { - sb.append(", parent="); + sb.append(", ppid="); sb.append(parentId); } if (hasOwner()) { @@ -107,7 +106,6 @@ public class ProcedureInfo implements Cloneable { sb.append(this.exception.getMessage()); sb.append("\""); } - sb.append(")"); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java index fa7bbec..2ebf8c9 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java @@ -47,6 +47,7 @@ public interface MetricsAssignmentManagerSource extends BaseSource { String RIT_OLDEST_AGE_NAME = "ritOldestAge"; String RIT_DURATION_NAME = "ritDuration"; String ASSIGN_TIME_NAME = "assign"; + String UNASSIGN_TIME_NAME = "unassign"; String BULK_ASSIGN_TIME_NAME = "bulkAssign"; String RIT_COUNT_DESC = "Current number of Regions In Transition (Gauge)."; @@ -56,9 +57,7 @@ public interface MetricsAssignmentManagerSource extends BaseSource { String RIT_DURATION_DESC = "Total durations in milliseconds for all Regions in Transition (Histogram)."; - void updateAssignmentTime(long time); - - void updateBulkAssignTime(long time); + String OPERATION_COUNT_NAME = "operationCount"; /** * Set the number of regions in transition. @@ -82,4 +81,19 @@ public interface MetricsAssignmentManagerSource extends BaseSource { void setRITOldestAge(long age); void updateRitDuration(long duration); + + /** + * Increment the count of assignment operation (assign/unassign). + */ + void incrementOperationCounter(); + + /** + * Add the time took to perform the last assign operation + */ + void updateAssignTime(long time); + + /** + * Add the time took to perform the last unassign operation + */ + void updateUnassignTime(long time); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java index faae044..14b7e71 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; @InterfaceAudience.Private @@ -32,8 +33,10 @@ public class MetricsAssignmentManagerSourceImpl private MutableGaugeLong ritCountOverThresholdGauge; private MutableGaugeLong ritOldestAgeGauge; private MetricHistogram ritDurationHisto; + + private MutableFastCounter operationCounter; private MetricHistogram assignTimeHisto; - private MetricHistogram bulkAssignTimeHisto; + private MetricHistogram unassignTimeHisto; public MetricsAssignmentManagerSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); @@ -51,30 +54,39 @@ public class MetricsAssignmentManagerSourceImpl RIT_COUNT_OVER_THRESHOLD_DESC,0l); ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, RIT_OLDEST_AGE_DESC, 0l); assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME); - bulkAssignTimeHisto = metricsRegistry.newTimeHistogram(BULK_ASSIGN_TIME_NAME); + unassignTimeHisto = metricsRegistry.newTimeHistogram(UNASSIGN_TIME_NAME); ritDurationHisto = metricsRegistry.newTimeHistogram(RIT_DURATION_NAME, RIT_DURATION_DESC); + operationCounter = metricsRegistry.getCounter(OPERATION_COUNT_NAME, 0l); } @Override - public void updateAssignmentTime(long time) { - assignTimeHisto.add(time); + public void setRIT(final int ritCount) { + ritGauge.set(ritCount); } @Override - public void updateBulkAssignTime(long time) { - bulkAssignTimeHisto.add(time); + public void setRITCountOverThreshold(final int ritCount) { + ritCountOverThresholdGauge.set(ritCount); } - public void setRIT(int ritCount) { - ritGauge.set(ritCount); + @Override + public void setRITOldestAge(final long ritCount) { + ritOldestAgeGauge.set(ritCount); } - public void setRITCountOverThreshold(int ritCount) { - ritCountOverThresholdGauge.set(ritCount); + @Override + public void incrementOperationCounter() { + operationCounter.incr(); } - public void setRITOldestAge(long ritCount) { - ritOldestAgeGauge.set(ritCount); + @Override + public void updateAssignTime(final long time) { + assignTimeHisto.add(time); + } + + @Override + public void updateUnassignTime(final long time) { + unassignTimeHisto.add(time); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index fbb066c..64c3e53 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public abstract class AbstractProcedureScheduler implements ProcedureScheduler { private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class); - private final ReentrantLock schedLock = new ReentrantLock(); - private final Condition schedWaitCond = schedLock.newCondition(); + private final ReentrantLock schedulerLock = new ReentrantLock(); + private final Condition schedWaitCond = schedulerLock.newCondition(); private boolean running = false; // TODO: metrics @@ -88,14 +88,14 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { - schedLock.lock(); + schedulerLock.lock(); try { enqueue(procedure, addFront); if (notify) { schedWaitCond.signal(); } } finally { - schedLock.unlock(); + schedulerLock.unlock(); } } @@ -219,11 +219,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { @Override public void suspendEvent(final ProcedureEvent event) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); + final boolean traceEnabled = LOG.isTraceEnabled(); synchronized (event) { event.setReady(false); - if (isTraceEnabled) { - LOG.trace("Suspend event " + event); + if (traceEnabled) { + LOG.trace("Suspend " + event); } } } @@ -235,18 +235,29 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { @Override public void wakeEvents(final int count, final ProcedureEvent... events) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); + final boolean traceEnabled = LOG.isTraceEnabled(); schedLock(); try { int waitingCount = 0; for (int i = 0; i < count; ++i) { final ProcedureEvent event = events[i]; synchronized (event) { - event.setReady(true); - if (isTraceEnabled) { - LOG.trace("Wake event " + event); + if (!event.isReady()) { + // Only set ready if we were not ready; i.e. suspended. Otherwise, we double-wake + // on this event and down in wakeWaitingProcedures, we double decrement this + // finish which messes up child procedure accounting. + event.setReady(true); + if (traceEnabled) { + LOG.trace("Unsuspend " + event); + } + waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); + } else { + ProcedureDeque q = event.getSuspendedProcedures(); + if (q != null && !q.isEmpty()) { + LOG.warn("Q is not empty! size=" + q.size() + "; PROCESSING..."); + waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); + } } - waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); } } wakePollIfNeeded(waitingCount); @@ -275,6 +286,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void wakeProcedure(final Procedure procedure) { + if (LOG.isTraceEnabled()) LOG.trace("Wake " + procedure); push(procedure, /* addFront= */ true, /* notify= */false); } @@ -282,11 +294,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { // Internal helpers // ========================================================================== protected void schedLock() { - schedLock.lock(); + schedulerLock.lock(); } protected void schedUnlock() { - schedLock.unlock(); + schedulerLock.unlock(); } protected void wakePollIfNeeded(final int waitingCount) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 591c0d0..0184d5d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -25,6 +25,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -43,21 +45,24 @@ import com.google.common.annotations.VisibleForTesting; * execute() is called each time the procedure is executed. * it may be called multiple times in case of failure and restart, so the * code must be idempotent. - * the return is a set of sub-procedures or null in case the procedure doesn't + * + * <p>The return is a set of sub-procedures or null in case the procedure doesn't * have sub-procedures. Once the sub-procedures are successfully completed * the execute() method is called again, you should think at it as a stack: + * <pre> * -> step 1 * ---> step 2 * -> step 1 - * - * rollback() is called when the procedure or one of the sub-procedures is failed. - * the rollback step is supposed to cleanup the resources created during the - * execute() step. in case of failure and restart rollback() may be called - * multiple times, so the code must be idempotent. + * </pre> + * <p>rollback() is called when the procedure or one of the sub-procedures + * has failed. the rollback step is supposed to cleanup the resources created + * during the execute() step. in case of failure and restart rollback() may be + * called multiple times, so again the code must be idempotent. */ @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { + private static final Log LOG = LogFactory.getLog(Procedure.class); public static final long NO_PROC_ID = -1; protected static final int NO_TIMEOUT = -1; @@ -275,11 +280,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { protected StringBuilder toStringSimpleSB() { final StringBuilder sb = new StringBuilder(); - sb.append("procId="); + sb.append("pid="); sb.append(getProcId()); if (hasParent()) { - sb.append(", parentProcId="); + sb.append(", ppid="); sb.append(getParentProcId()); } @@ -288,14 +293,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { sb.append(getOwner()); } - sb.append(", state="); + sb.append(", procState="); toStringState(sb); if (hasException()) { sb.append(", exception=" + getException()); } - sb.append(", "); + sb.append("; "); toStringClassDetails(sb); return sb; @@ -344,7 +349,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * @param builder the string builder to use to append the proc specific information */ protected void toStringClassDetails(StringBuilder builder) { - builder.append(getClass().getName()); + builder.append(getClass().getSimpleName()); } // ========================================================================== @@ -648,6 +653,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { @InterfaceAudience.Private protected synchronized void setChildrenLatch(final int numChildren) { this.childrenLatch = numChildren; + if (LOG.isTraceEnabled()) { + LOG.info("CHILD LATCH INCREMENT SET " + + this.childrenLatch, new Throwable(this.toString())); + } } /** @@ -657,15 +666,34 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { protected synchronized void incChildrenLatch() { // TODO: can this be inferred from the stack? I think so... this.childrenLatch++; + if (LOG.isTraceEnabled()) { + LOG.info("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString())); + } } /** * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. */ @InterfaceAudience.Private - protected synchronized boolean childrenCountDown() { + private synchronized boolean childrenCountDown() { assert childrenLatch > 0: this; - return --childrenLatch == 0; + boolean b = --childrenLatch == 0; + if (LOG.isTraceEnabled()) { + LOG.info("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString())); + } + return b; + } + + /** + * Try to set this procedure into RUNNABLE state. + * Succeeds if all subprocedures/children are done. + * @return True if we were able to move procedure to RUNNABLE state. + */ + synchronized boolean tryRunnable() { + // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT + boolean b = getState() == ProcedureState.WAITING && childrenCountDown(); + if (b) setState(ProcedureState.RUNNABLE); + return b; } @InterfaceAudience.Private @@ -732,6 +760,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Internal method called by the ProcedureExecutor that starts the user-level code execute(). + * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and + * skip out without changing states or releasing any locks held. */ @InterfaceAudience.Private protected Procedure[] doExecute(final TEnvironment env) http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java index 43cce3a..adb27a8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -50,6 +50,6 @@ public class ProcedureEvent<T> { @Override public String toString() { return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + - ", suspended procedures count=" + getSuspendedProcedures().size(); + ", " + getSuspendedProcedures(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 1bb6118..b1db2dc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -32,6 +32,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.DelayQueue; @@ -113,9 +115,11 @@ public class ProcedureExecutor<TEnvironment> { * Internal cleaner that removes the completed procedure results after a TTL. * NOTE: This is a special case handled in timeoutLoop(). * - * Since the client code looks more or less like: + * <p>Since the client code looks more or less like: + * <pre> * procId = master.doOperation() * while (master.getProcResult(procId) == ProcInProgress); + * </pre> * The master should not throw away the proc result as soon as the procedure is done * but should wait a result request from the client (see executor.removeResult(procId)) * The client will call something like master.isProcDone() or master.getProcResult() @@ -480,10 +484,10 @@ public class ProcedureExecutor<TEnvironment> { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; - LOG.info("Starting executor worker threads=" + corePoolSize); + LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize); // Create the Thread Group for the executors - threadGroup = new ThreadGroup("ProcedureExecutor"); + threadGroup = new ThreadGroup("ProcExecThrdGrp"); // Create the timeout executor timeoutExecutor = new TimeoutExecutorThread(threadGroup); @@ -1077,13 +1081,16 @@ public class ProcedureExecutor<TEnvironment> { final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback + LOG.warn("Rollback because parent is done/rolledback proc=" + proc); executeRollback(proc); return; } final RootProcedureState procStack = rollbackStack.get(rootProcId); - if (procStack == null) return; - + if (procStack == null) { + LOG.warn("RootProcedureState is null for " + proc.getProcId()); + return; + } do { // Try to acquire the execution if (!procStack.acquire(proc)) { @@ -1125,16 +1132,21 @@ public class ProcedureExecutor<TEnvironment> { // Execute the procedure assert proc.getState() == ProcedureState.RUNNABLE : proc; - switch (acquireLock(proc)) { + // Note that lock is NOT about concurrency but rather about ensuring + // ownership of a procedure of an entity such as a region or table. + LockState lockState = acquireLock(proc); + switch (lockState) { case LOCK_ACQUIRED: execProcedure(procStack, proc); releaseLock(proc, false); break; case LOCK_YIELD_WAIT: + LOG.info(lockState + " " + proc); scheduler.yield(proc); break; case LOCK_EVENT_WAIT: // someone will wake us up when the lock is available + LOG.debug(lockState + " " + proc); break; default: throw new UnsupportedOperationException(); @@ -1150,10 +1162,7 @@ public class ProcedureExecutor<TEnvironment> { if (proc.isSuccess()) { // update metrics on finishing the procedure proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); - - if (LOG.isDebugEnabled()) { - LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); - } + LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); // Finalize the procedure state if (proc.getProcId() == rootProcId) { procedureFinished(proc); @@ -1178,7 +1187,7 @@ public class ProcedureExecutor<TEnvironment> { private void releaseLock(final Procedure proc, final boolean force) { final TEnvironment env = getEnvironment(); - // for how the framework works, we know that we will always have the lock + // For how the framework works, we know that we will always have the lock // when we call releaseLock(), so we can avoid calling proc.hasLock() if (force || !proc.holdLock(env)) { proc.doReleaseLock(env); @@ -1193,6 +1202,8 @@ public class ProcedureExecutor<TEnvironment> { private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) { final Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); + // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are + // rolling back because the subprocedure does. Clarify. if (exception == null) { exception = procStack.getException(); rootProc.setFailure(exception); @@ -1269,7 +1280,7 @@ public class ProcedureExecutor<TEnvironment> { return LockState.LOCK_YIELD_WAIT; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e); + LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e); } // allows to kill the executor before something is stored to the wal. @@ -1305,29 +1316,54 @@ public class ProcedureExecutor<TEnvironment> { } /** - * Executes the specified procedure - * - calls the doExecute() of the procedure - * - if the procedure execution didn't fail (e.g. invalid user input) - * - ...and returned subprocedures - * - the subprocedures are initialized. - * - the subprocedures are added to the store - * - the subprocedures are added to the runnable queue - * - the procedure is now in a WAITING state, waiting for the subprocedures to complete - * - ...if there are no subprocedure - * - the procedure completed successfully - * - if there is a parent (WAITING) - * - the parent state will be set to RUNNABLE - * - in case of failure - * - the store is updated with the new state - * - the executor (caller of this method) will start the rollback of the procedure + * Executes <code>procedure</code> + * <ul> + * <li>Calls the doExecute() of the procedure + * <li>If the procedure execution didn't fail (i.e. valid user input) + * <ul> + * <li>...and returned subprocedures + * <ul><li>The subprocedures are initialized. + * <li>The subprocedures are added to the store + * <li>The subprocedures are added to the runnable queue + * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete + * </ul> + * </li> + * <li>...if there are no subprocedure + * <ul><li>the procedure completed successfully + * <li>if there is a parent (WAITING) + * <li>the parent state will be set to RUNNABLE + * </ul> + * </li> + * </ul> + * </li> + * <li>In case of failure + * <ul> + * <li>The store is updated with the new state</li> + * <li>The executor (caller of this method) will start the rollback of the procedure</li> + * </ul> + * </li> + * </ul> */ private void execProcedure(final RootProcedureState procStack, final Procedure procedure) { Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE); - // Execute the procedure + // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. + // The exception is caught below and then we hurry to the exit without disturbing state. The + // idea is that the processing of this procedure will be unsuspended later by an external event + // such the report of a region open. TODO: Currently, its possible for two worker threads + // to be working on the same procedure concurrently (locking in procedures is NOT about + // concurrency but about tying an entity to a procedure; i.e. a region to a particular + // procedure instance). This can make for issues if both threads are changing state. + // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + // in RegionTransitionProcedure#reportTransition for example of Procedure putting + // itself back on the scheduler making it possible for two threads running against + // the one Procedure. Might be ok if they are both doing different, idempotent sections. boolean suspended = false; + + // Whether to 're-' -execute; run through the loop again. boolean reExecute = false; - Procedure[] subprocs = null; + + Procedure<?>[] subprocs = null; do { reExecute = false; try { @@ -1336,14 +1372,18 @@ public class ProcedureExecutor<TEnvironment> { subprocs = null; } } catch (ProcedureSuspendedException e) { + LOG.info("Suspended " + procedure); suspended = true; } catch (ProcedureYieldException e) { if (LOG.isTraceEnabled()) { - LOG.trace("Yield " + procedure + ": " + e.getMessage()); + LOG.trace("Yield " + procedure + ": " + e.getMessage(), e); } scheduler.yield(procedure); return; } catch (InterruptedException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e); + } handleInterruptedException(procedure, e); scheduler.yield(procedure); return; @@ -1357,14 +1397,22 @@ public class ProcedureExecutor<TEnvironment> { if (!procedure.isFailed()) { if (subprocs != null) { if (subprocs.length == 1 && subprocs[0] == procedure) { - // quick-shortcut for a state machine like procedure + // Procedure returned itself. + // Quick-shortcut for a state machine like procedure subprocs = null; reExecute = true; + LOG.info("Short-circuit to rexecute for pid=" + procedure.getProcId()); } else { // yield the current procedure, and make the subprocedure runnable subprocs = initializeChildren(procStack, procedure, subprocs); + LOG.info("Initialized subprocedures=" + + Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). + collect(Collectors.toList()).toString()); } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + if (LOG.isTraceEnabled()) { + LOG.trace("Added to timeoutExecutor " + procedure); + } timeoutExecutor.add(procedure); } else if (!suspended) { // No subtask, so we are done @@ -1388,12 +1436,13 @@ public class ProcedureExecutor<TEnvironment> { // executor thread to stop. The statement following the method call below seems to check if // store is not running, to prevent scheduling children procedures, re-execution or yield // of this procedure. This may need more scrutiny and subsequent cleanup in future - // Commit the transaction + // + // Commit the transaction even if a suspend (state may have changed). Note this append + // can take a bunch of time to complete. updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting if (!store.isRunning()) return; - // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.isRunnable() && !suspended && procedure.isYieldAfterExecutionStep(getEnvironment())) { @@ -1403,14 +1452,14 @@ public class ProcedureExecutor<TEnvironment> { assert (reExecute && subprocs == null) || !reExecute; } while (reExecute); - // Submit the new subprocedures if (subprocs != null && !procedure.isFailed()) { submitChildrenProcedures(subprocs); } - // if the procedure is complete and has a parent, count down the children latch - if (procedure.isFinished() && procedure.hasParent()) { + // if the procedure is complete and has a parent, count down the children latch. + // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. + if (!suspended && procedure.isFinished() && procedure.hasParent()) { countDownChildren(procStack, procedure); } } @@ -1469,17 +1518,16 @@ public class ProcedureExecutor<TEnvironment> { } // If this procedure is the last child awake the parent procedure - final boolean traceEnabled = LOG.isTraceEnabled(); - if (traceEnabled) { - LOG.trace(parent + " child is done: " + procedure); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished suprocedure " + procedure); } - - if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { - parent.setState(ProcedureState.RUNNABLE); + if (parent.tryRunnable()) { + // If we succeeded in making the parent runnable -- i.e. all of its + // children have completed, move parent to front of the queue. store.update(parent); scheduler.addFront(parent); - if (traceEnabled) { - LOG.trace(parent + " all the children finished their work, resume."); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished ALL subprocedures of " + parent + "; resume."); } return; } @@ -1571,7 +1619,7 @@ public class ProcedureExecutor<TEnvironment> { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); public WorkerThread(final ThreadGroup group) { - super(group, "ProcExecWorker-" + workerId.incrementAndGet()); + super(group, "ProcExecWrkr-" + workerId.incrementAndGet()); } @Override @@ -1583,24 +1631,38 @@ public class ProcedureExecutor<TEnvironment> { public void run() { final boolean traceEnabled = LOG.isTraceEnabled(); long lastUpdate = EnvironmentEdgeManager.currentTime(); - while (isRunning() && keepAlive(lastUpdate)) { - final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (procedure == null) continue; - - store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); - executionStartTime.set(EnvironmentEdgeManager.currentTime()); - try { - if (traceEnabled) { - LOG.trace("Trying to start the execution of " + procedure); + try { + while (isRunning() && keepAlive(lastUpdate)) { + final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (procedure == null) continue; + int activeCount = activeExecutorCount.incrementAndGet(); + int runningCount = store.setRunningProcedureCount(activeCount); + if (LOG.isDebugEnabled()) { + LOG.debug("Run pid=" + procedure.getProcId() + + " current=" + runningCount + ", active=" + activeCount); + } + executionStartTime.set(EnvironmentEdgeManager.currentTime()); + try { + executeProcedure(procedure); + } catch (AssertionError e) { + LOG.info("ASSERT pid=" + procedure.getProcId(), e); + throw e; + } finally { + activeCount = activeExecutorCount.decrementAndGet(); + runningCount = store.setRunningProcedureCount(activeCount); + if (LOG.isDebugEnabled()) { + LOG.debug("Done pid=" + procedure.getProcId() + + " current=" + runningCount + ", active=" + activeCount); + } + lastUpdate = EnvironmentEdgeManager.currentTime(); + executionStartTime.set(Long.MAX_VALUE); } - executeProcedure(procedure); - } finally { - store.setRunningProcedureCount(activeExecutorCount.decrementAndGet()); - lastUpdate = EnvironmentEdgeManager.currentTime(); - executionStartTime.set(Long.MAX_VALUE); } + } catch (Throwable t) { + LOG.warn("Worker terminating because....", t); + } finally { + LOG.debug("Worker terminated."); } - LOG.debug("Worker thread terminated " + this); workerThreads.remove(this); } @@ -1617,14 +1679,15 @@ public class ProcedureExecutor<TEnvironment> { } } - // ========================================================================== - // Timeout Thread - // ========================================================================== + /** + * Runs task on a period such as check for stuck workers. + * @see InlineChore + */ private final class TimeoutExecutorThread extends StoppableThread { private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); public TimeoutExecutorThread(final ThreadGroup group) { - super(group, "ProcedureTimeoutExecutor"); + super(group, "ProcExecTimeout"); } @Override @@ -1634,7 +1697,7 @@ public class ProcedureExecutor<TEnvironment> { @Override public void run() { - final boolean isTraceEnabled = LOG.isTraceEnabled(); + final boolean traceEnabled = LOG.isTraceEnabled(); while (isRunning()) { final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); if (task == null || task == DelayedUtil.DELAYED_POISON) { @@ -1643,8 +1706,8 @@ public class ProcedureExecutor<TEnvironment> { continue; } - if (isTraceEnabled) { - LOG.trace("Trying to start the execution of " + task); + if (traceEnabled) { + LOG.trace("Executing " + task); } // execute the task @@ -1665,6 +1728,8 @@ public class ProcedureExecutor<TEnvironment> { public void add(final Procedure procedure) { assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; + LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() + + ", timestamp=" + procedure.getTimeoutTimestamp()); queue.add(new DelayedProcedure(procedure)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java index bdced10..48bb7d1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; /** * Special procedure used as a chore. - * instead of bringing the Chore class in (dependencies reason), + * Instead of bringing the Chore class in (dependencies reason), * we reuse the executor timeout thread for this special case. * * The assumption is that procedure is used as hook to dispatch other procedures http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java new file mode 100644 index 0000000..8d5ff3c --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.collect.ArrayListMultimap; + +/** + * A procedure dispatcher that aggregates and sends after elapsed time or after we hit + * count threshold. Creates its own threadpool to run RPCs with timeout. + * <ul> + * <li>Each server queue has a dispatch buffer</li> + * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> + * </ul> + * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done, + * call {@link #stop()}. + */ [email protected] +public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { + private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class); + + public static final String THREAD_POOL_SIZE_CONF_KEY = + "hbase.procedure.remote.dispatcher.threadpool.size"; + private static final int DEFAULT_THREAD_POOL_SIZE = 128; + + public static final String DISPATCH_DELAY_CONF_KEY = + "hbase.procedure.remote.dispatcher.delay.msec"; + private static final int DEFAULT_DISPATCH_DELAY = 150; + + public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = + "hbase.procedure.remote.dispatcher.max.queue.size"; + private static final int DEFAULT_MAX_QUEUE_SIZE = 32; + + private final AtomicBoolean running = new AtomicBoolean(false); + private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = + new ConcurrentHashMap<TRemote, BufferNode>(); + + private final int operationDelay; + private final int queueMaxSize; + private final int corePoolSize; + + private TimeoutExecutorThread timeoutExecutor; + private ThreadPoolExecutor threadPool; + + protected RemoteProcedureDispatcher(Configuration conf) { + this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); + this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); + this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); + } + + public boolean start() { + if (running.getAndSet(true)) { + LOG.warn("Already running"); + return false; + } + + LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize + + ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay); + + // Create the timeout executor + timeoutExecutor = new TimeoutExecutorThread(); + timeoutExecutor.start(); + + // Create the thread pool that will execute RPCs + threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, + Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), + getUncaughtExceptionHandler())); + return true; + } + + public boolean stop() { + if (!running.getAndSet(false)) { + return false; + } + + LOG.info("Stopping procedure remote dispatcher"); + + // send stop signals + timeoutExecutor.sendStopSignal(); + threadPool.shutdownNow(); + return true; + } + + public void join() { + assert !running.get() : "expected not running"; + + // wait the timeout executor + timeoutExecutor.awaitTermination(); + timeoutExecutor = null; + + // wait for the thread pool to terminate + threadPool.shutdownNow(); + try { + while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("Waiting for thread-pool to terminate"); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for thread-pool termination", e); + } + } + + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Failed to execute remote procedures " + t.getName(), e); + } + }; + } + + // ============================================================================================ + // Node Helpers + // ============================================================================================ + /** + * Add a node that will be able to execute remote procedures + * @param key the node identifier + */ + public void addNode(final TRemote key) { + assert key != null: "Tried to add a node with a null key"; + final BufferNode newNode = new BufferNode(key); + nodeMap.putIfAbsent(key, newNode); + } + + /** + * Add a remote rpc. Be sure to check result for successful add. + * @param key the node identifier + * @return True if we successfully added the operation. + */ + public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) { + assert key != null : "found null key for node"; + BufferNode node = nodeMap.get(key); + if (node == null) { + return false; + } + node.add(rp); + // Check our node still in the map; could have been removed by #removeNode. + return nodeMap.contains(node); + } + + /** + * Remove a remote node + * @param key the node identifier + */ + public boolean removeNode(final TRemote key) { + final BufferNode node = nodeMap.remove(key); + if (node == null) return false; + node.abortOperationsInQueue(); + return true; + } + + // ============================================================================================ + // Task Helpers + // ============================================================================================ + protected Future<Void> submitTask(Callable<Void> task) { + return threadPool.submit(task); + } + + protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) { + final FutureTask<Void> futureTask = new FutureTask(task); + timeoutExecutor.add(new DelayedTask(futureTask, delay, unit)); + return futureTask; + } + + protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); + protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); + + /** + * Data structure with reference to remote operation. + */ + public static abstract class RemoteOperation { + private final RemoteProcedure remoteProcedure; + + protected RemoteOperation(final RemoteProcedure remoteProcedure) { + this.remoteProcedure = remoteProcedure; + } + + public RemoteProcedure getRemoteProcedure() { + return remoteProcedure; + } + } + + /** + * Remote procedure reference. + * @param <TEnv> + * @param <TRemote> + */ + public interface RemoteProcedure<TEnv, TRemote> { + RemoteOperation remoteCallBuild(TEnv env, TRemote remote); + void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response); + void remoteCallFailed(TEnv env, TRemote remote, IOException exception); + } + + /** + * Account of what procedures are running on remote node. + * @param <TEnv> + * @param <TRemote> + */ + public interface RemoteNode<TEnv, TRemote> { + TRemote getKey(); + void add(RemoteProcedure<TEnv, TRemote> operation); + void dispatch(); + } + + protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, + final TRemote remote, final Set<RemoteProcedure> operations) { + final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); + for (RemoteProcedure proc: operations) { + RemoteOperation operation = proc.remoteCallBuild(env, remote); + requestByType.put(operation.getClass(), operation); + } + return requestByType; + } + + protected <T extends RemoteOperation> List<T> fetchType( + final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { + return (List<T>)requestByType.removeAll(type); + } + + // ============================================================================================ + // Timeout Helpers + // ============================================================================================ + private final class TimeoutExecutorThread extends Thread { + private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); + + public TimeoutExecutorThread() { + super("ProcedureDispatcherTimeoutThread"); + } + + @Override + public void run() { + while (running.get()) { + final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); + if (task == null || task == DelayedUtil.DELAYED_POISON) { + // the executor may be shutting down, and the task is just the shutdown request + continue; + } + if (task instanceof DelayedTask) { + threadPool.execute(((DelayedTask)task).getObject()); + } else { + ((BufferNode)task).dispatch(); + } + } + } + + public void add(final DelayedWithTimeout delayed) { + queue.add(delayed); + } + + public void remove(final DelayedWithTimeout delayed) { + queue.remove(delayed); + } + + public void sendStopSignal() { + queue.add(DelayedUtil.DELAYED_POISON); + } + + public void awaitTermination() { + try { + final long startTime = EnvironmentEdgeManager.currentTime(); + for (int i = 0; isAlive(); ++i) { + sendStopSignal(); + join(250); + if (i > 0 && (i % 8) == 0) { + LOG.warn("Waiting termination of thread " + getName() + ", " + + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); + } + } + } catch (InterruptedException e) { + LOG.warn(getName() + " join wait got interrupted", e); + } + } + } + + // ============================================================================================ + // Internals Helpers + // ============================================================================================ + + /** + * Node that contains a set of RemoteProcedures + */ + protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> + implements RemoteNode<TEnv, TRemote> { + private Set<RemoteProcedure> operations; + + protected BufferNode(final TRemote key) { + super(key, 0); + } + + public TRemote getKey() { + return getObject(); + } + + public synchronized void add(final RemoteProcedure operation) { + if (this.operations == null) { + this.operations = new HashSet<>(); + setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); + timeoutExecutor.add(this); + } + this.operations.add(operation); + if (this.operations.size() > queueMaxSize) { + timeoutExecutor.remove(this); + dispatch(); + } + } + + public synchronized void dispatch() { + if (operations != null) { + remoteDispatch(getKey(), operations); + this.operations = null; + } + } + + public synchronized void abortOperationsInQueue() { + if (operations != null) { + abortPendingOperations(getKey(), operations); + this.operations = null; + } + } + + @Override + public String toString() { + return super.toString() + ", operations=" + this.operations; + } + } + + /** + * Delayed object that holds a FutureTask. + * used to submit something later to the thread-pool. + */ + private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> { + public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) { + super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); + } + }; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java index 1a84070..64bb278 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -27,12 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData; /** - * A SequentialProcedure describes one step in a procedure chain. + * A SequentialProcedure describes one step in a procedure chain: + * <pre> * -> Step 1 -> Step 2 -> Step 3 - * + * </pre> * The main difference from a base Procedure is that the execute() of a - * SequentialProcedure will be called only once, there will be no second - * execute() call once the child are finished. which means once the child + * SequentialProcedure will be called only once; there will be no second + * execute() call once the children are finished. which means once the child * of a SequentialProcedure are completed the SequentialProcedure is completed too. */ @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index ea2a41f..0008c16 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,7 +57,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState> private int stateCount = 0; private int[] states = null; - private ArrayList<Procedure> subProcList = null; + private List<Procedure<?>> subProcList = null; protected enum Flow { HAS_MORE_STATE, @@ -131,12 +132,15 @@ public abstract class StateMachineProcedure<TEnvironment, TState> * Add a child procedure to execute * @param subProcedure the child procedure */ - protected void addChildProcedure(Procedure... subProcedure) { + protected void addChildProcedure(Procedure<?>... subProcedure) { + if (subProcedure == null) return; + final int len = subProcedure.length; + if (len == 0) return; if (subProcList == null) { - subProcList = new ArrayList<>(subProcedure.length); + subProcList = new ArrayList<>(len); } - for (int i = 0; i < subProcedure.length; ++i) { - Procedure proc = subProcedure[i]; + for (int i = 0; i < len; ++i) { + Procedure<?> proc = subProcedure[i]; if (!proc.hasOwner()) proc.setOwner(getOwner()); subProcList.add(proc); } @@ -148,21 +152,17 @@ public abstract class StateMachineProcedure<TEnvironment, TState> updateTimestamp(); try { if (!hasMoreState() || isFailed()) return null; - TState state = getCurrentState(); if (stateCount == 0) { setNextState(getStateId(state)); } - stateFlow = executeFromState(env, state); if (!hasMoreState()) setNextState(EOF_STATE); - - if (subProcList != null && subProcList.size() != 0) { + if (subProcList != null && !subProcList.isEmpty()) { Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); subProcList = null; return subProcedures; } - return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; } finally { updateTimestamp(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index c03e326..9e53f42 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void setRunningProcedureCount(final int count) { - // no-op + public int setRunningProcedureCount(final int count) { + return count; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ccbc9ec2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 385cedb..a690c81 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -153,8 +153,9 @@ public interface ProcedureStore { /** * Set the number of procedure running. * This can be used, for example, by the store to know how long to wait before a sync. + * @return how many procedures are running (may not be same as <code>count</code>). */ - void setRunningProcedureCount(int count); + int setRunningProcedureCount(int count); /** * Acquire the lease for the procedure store.
