This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit e79532a2d69429e8fdce3f75c1c3121f5673b856
Merge: 0c00ab519a 22a120f36f
Author: Christopher Tubbs <[email protected]>
AuthorDate: Mon Mar 13 13:38:19 2023 -0400

    Merge branch '1.10' into 2.1
    
    Fix merge of #3231 and #3235 from 1.10 to 2.1

 server/manager/src/main/java/org/apache/accumulo/manager/Manager.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 60931b9c2b,0000000000..5c5a5ad203
mode 100644,000000..100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -1,1812 -1,0 +1,1813 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static java.util.Collections.emptySortedMap;
 +import static java.util.concurrent.TimeUnit.HOURS;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 +
 +import java.io.IOException;
 +import java.lang.reflect.InvocationTargetException;
 +import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.InstanceId;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.AgeOffStore;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
 +import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
 +import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
 +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.manager.thrift.ManagerClientService;
 +import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 +import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
 +import org.apache.accumulo.core.manager.thrift.ManagerState;
 +import org.apache.accumulo.core.master.thrift.BulkImportState;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.TabletLocationState;
 +import org.apache.accumulo.core.metadata.TabletState;
 +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 +import org.apache.accumulo.core.metrics.MetricsUtil;
 +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
 +import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
 +import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 +import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
 +import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
 +import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 +import org.apache.accumulo.core.trace.TraceUtil;
 +import org.apache.accumulo.core.util.Halt;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.manager.metrics.ManagerMetrics;
 +import org.apache.accumulo.manager.recovery.RecoveryManager;
 +import org.apache.accumulo.manager.state.TableCounts;
 +import org.apache.accumulo.manager.tableOps.TraceRepo;
 +import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
 +import org.apache.accumulo.server.AbstractServer;
 +import org.apache.accumulo.server.HighlyAvailableService;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 +import org.apache.accumulo.server.manager.state.CurrentState;
 +import org.apache.accumulo.server.manager.state.DeadServerList;
 +import org.apache.accumulo.server.manager.state.MergeInfo;
 +import org.apache.accumulo.server.manager.state.MergeState;
 +import org.apache.accumulo.server.manager.state.TabletServerState;
 +import org.apache.accumulo.server.manager.state.TabletStateStore;
 +import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
 +import org.apache.accumulo.server.rpc.ServerAddress;
 +import org.apache.accumulo.server.rpc.TServerUtils;
 +import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import 
org.apache.accumulo.server.security.delegation.AuthenticationTokenKeyManager;
 +import 
org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyDistributor;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tables.TableObserver;
 +import org.apache.accumulo.server.util.ScanServerMetadataEntries;
 +import org.apache.accumulo.server.util.ServerBulkImportStatus;
 +import org.apache.accumulo.server.util.TableInfoUtil;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.NoAuthException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.ImmutableSortedMap;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +import io.opentelemetry.api.trace.Span;
 +import io.opentelemetry.context.Scope;
 +
 +/**
 + * The Manager is responsible for assigning and balancing tablets to tablet 
servers.
 + * <p>
 + * The manager will also coordinate log recoveries and reports general status.
 + */
 +public class Manager extends AbstractServer
 +    implements LiveTServerSet.Listener, TableObserver, CurrentState, 
HighlyAvailableService {
 +
 +  static final Logger log = LoggerFactory.getLogger(Manager.class);
 +
 +  static final int ONE_SECOND = 1000;
 +  static final long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  // made this less than TIME_TO_WAIT_BETWEEN_SCANS, so that the cache is 
cleared between cycles
 +  static final long TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE = 
TIME_TO_WAIT_BETWEEN_SCANS / 4;
 +  private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * 
ONE_SECOND;
 +  static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 +  private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 +  static final int MAX_TSERVER_WORK_CHUNK = 5000;
 +  private static final int MAX_BAD_STATUS_COUNT = 3;
 +  private static final double MAX_SHUTDOWNS_PER_SEC = 10D / 60D;
 +
 +  private final Object balancedNotifier = new Object();
 +  final LiveTServerSet tserverSet;
 +  private final List<TabletGroupWatcher> watchers = new ArrayList<>();
 +  final SecurityOperation security;
 +  final Map<TServerInstance,AtomicInteger> badServers =
 +      Collections.synchronizedMap(new HashMap<>());
 +  final Set<TServerInstance> serversToShutdown = 
Collections.synchronizedSet(new HashSet<>());
 +  final SortedMap<KeyExtent,TServerInstance> migrations =
 +      Collections.synchronizedSortedMap(new TreeMap<>());
 +  final EventCoordinator nextEvent = new EventCoordinator();
 +  private final Object mergeLock = new Object();
 +  private Thread replicationWorkThread;
 +  private Thread replicationAssignerThread;
 +  RecoveryManager recoveryManager = null;
 +  private final ManagerTime timeKeeper;
 +
 +  // Delegation Token classes
 +  private final boolean delegationTokensAvailable;
 +  private ZooAuthenticationKeyDistributor keyDistributor;
 +  private AuthenticationTokenKeyManager authenticationTokenKeyManager;
 +
 +  ServiceLock managerLock = null;
 +  private TServer clientService = null;
 +  private volatile TabletBalancer tabletBalancer;
 +  private final BalancerEnvironment balancerEnvironment;
 +
 +  private ManagerState state = ManagerState.INITIAL;
 +
 +  // fateReadyLatch and fateRef go together; when this latch is ready, then 
the fate reference
 +  // should already have been set; still need to use atomic reference or 
volatile for fateRef, so no
 +  // thread's cached view shows that fateRef is still null after the latch is 
ready
 +  private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
 +  private final AtomicReference<Fate<Manager>> fateRef = new 
AtomicReference<>(null);
 +
 +  volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = 
emptySortedMap();
 +  volatile SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancer = 
emptySortedMap();
 +  final ServerBulkImportStatus bulkImportStatus = new 
ServerBulkImportStatus();
 +
 +  private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
 +  private final AtomicBoolean managerUpgrading = new AtomicBoolean(false);
 +
 +  @Override
 +  public synchronized ManagerState getManagerState() {
 +    return state;
 +  }
 +
 +  public boolean stillManager() {
 +    return getManagerState() != ManagerState.STOP;
 +  }
 +
 +  /**
 +   * Retrieve the Fate object, blocking until it is ready. This could cause 
problems if Fate
 +   * operations are attempted to be used prior to the Manager being ready for 
them. If these
 +   * operations are triggered by a client side request from a tserver or 
client, it should be safe
 +   * to wait to handle those until Fate is ready, but if it occurs during an 
upgrade, or some other
 +   * time in the Manager before Fate is started, that may result in a 
deadlock and will need to be
 +   * fixed.
 +   *
 +   * @return the Fate object, only after the fate components are running and 
ready
 +   */
 +  Fate<Manager> fate() {
 +    try {
 +      // block up to 30 seconds until it's ready; if it's still not ready, 
introduce some logging
 +      if (!fateReadyLatch.await(30, SECONDS)) {
 +        String msgPrefix = "Unexpected use of fate in thread " + 
Thread.currentThread().getName()
 +            + " at time " + System.currentTimeMillis();
 +        // include stack trace so we know where it's coming from, in case we 
need to troubleshoot it
 +        log.warn("{} blocked until fate starts", msgPrefix,
 +            new IllegalStateException("Attempted fate action before manager 
finished starting up; "
 +                + "if this doesn't make progress, please report it as a bug 
to the developers"));
 +        int minutes = 0;
 +        while (!fateReadyLatch.await(5, MINUTES)) {
 +          minutes += 5;
 +          log.warn("{} still blocked after {} minutes; this is getting 
weird", msgPrefix, minutes);
 +        }
 +        log.debug("{} no longer blocked", msgPrefix);
 +      }
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException("Thread was interrupted; cannot 
proceed");
 +    }
 +    return fateRef.get();
 +  }
 +
 +  static final boolean X = true;
 +  static final boolean O = false;
 +  // @formatter:off
 +  static final boolean[][] transitionOK = {
 +      //                            INITIAL HAVE_LOCK SAFE_MODE NORMAL 
UNLOAD_META UNLOAD_ROOT STOP
 +      /* INITIAL */                 {X, X, O, O, O, O, X},
 +      /* HAVE_LOCK */               {O, X, X, X, O, O, X},
 +      /* SAFE_MODE */               {O, O, X, X, X, O, X},
 +      /* NORMAL */                  {O, O, X, X, X, O, X},
 +      /* UNLOAD_METADATA_TABLETS */ {O, O, X, X, X, X, X},
 +      /* UNLOAD_ROOT_TABLET */      {O, O, O, X, X, X, X},
 +      /* STOP */                    {O, O, O, O, O, X, X}};
 +  //@formatter:on
 +  synchronized void setManagerState(ManagerState newState) {
 +    if (state.equals(newState)) {
 +      return;
 +    }
 +    if (!transitionOK[state.ordinal()][newState.ordinal()]) {
 +      log.error("Programmer error: manager should not transition from {} to 
{}", state, newState);
 +    }
 +    ManagerState oldState = state;
 +    state = newState;
 +    nextEvent.event("State changed from %s to %s", oldState, newState);
 +    if (newState == ManagerState.STOP) {
 +      // Give the server a little time before shutdown so the client
 +      // thread requesting the stop can return
 +      ScheduledFuture<?> future = 
getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
 +        // This frees the main thread and will cause the manager to exit
 +        clientService.stop();
 +        Manager.this.nextEvent.event("stopped event loop");
 +      }, 100L, 1000L, MILLISECONDS);
 +      ThreadPools.watchNonCriticalScheduledTask(future);
 +    }
 +
 +    if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) {
 +      upgradeCoordinator.upgradeZookeeper(getContext(), nextEvent);
 +    }
 +
 +    if (oldState != newState && (newState == ManagerState.NORMAL)) {
 +      if (fateRef.get() != null) {
 +        throw new IllegalStateException("Access to Fate should not have been"
 +            + " initialized prior to the Manager finishing upgrades. Please 
save"
 +            + " all logs and file a bug.");
 +      }
 +      upgradeMetadataFuture = 
upgradeCoordinator.upgradeMetadata(getContext(), nextEvent);
 +    }
 +  }
 +
 +  private final UpgradeCoordinator upgradeCoordinator = new 
UpgradeCoordinator();
 +
 +  private Future<Void> upgradeMetadataFuture;
 +
 +  private FateServiceHandler fateServiceHandler;
 +  private ManagerClientServiceHandler managerClientHandler;
 +
 +  private int assignedOrHosted(TableId tableId) {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      TableCounts count = watcher.getStats(tableId);
 +      result += count.hosted() + count.assigned();
 +    }
 +    return result;
 +  }
 +
 +  private int totalAssignedOrHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.hosted();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private int nonMetaDataTabletsAssignedOrHosted() {
 +    return totalAssignedOrHosted() - assignedOrHosted(MetadataTable.ID)
 +        - assignedOrHosted(RootTable.ID);
 +  }
 +
 +  private int notHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.assignedToDeadServers() + 
counts.suspended();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  // The number of unassigned tablets that should be assigned: displayed on 
the monitor page
 +  int displayUnassigned() {
 +    int result = 0;
 +    switch (getManagerState()) {
 +      case NORMAL:
 +        // Count offline tablets for online tables
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableManager manager = getContext().getTableManager();
 +          for (Entry<TableId,TableCounts> entry : 
watcher.getStats().entrySet()) {
 +            TableId tableId = entry.getKey();
 +            TableCounts counts = entry.getValue();
 +            if (manager.getTableState(tableId) == TableState.ONLINE) {
 +              result += counts.unassigned() + counts.assignedToDeadServers() 
+ counts.assigned()
 +                  + counts.suspended();
 +            }
 +          }
 +        }
 +        break;
 +      case SAFE_MODE:
 +        // Count offline tablets for the metadata table
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableCounts counts = watcher.getStats(MetadataTable.ID);
 +          result += counts.unassigned() + counts.suspended();
 +        }
 +        break;
 +      case UNLOAD_METADATA_TABLETS:
 +      case UNLOAD_ROOT_TABLET:
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableCounts counts = watcher.getStats(MetadataTable.ID);
 +          result += counts.unassigned() + counts.suspended();
 +        }
 +        break;
 +      default:
 +        break;
 +    }
 +    return result;
 +  }
 +
 +  public void mustBeOnline(final TableId tableId) throws 
ThriftTableOperationException {
 +    ServerContext context = getContext();
 +    context.clearTableListCache();
 +    if (context.getTableState(tableId) != TableState.ONLINE) {
 +      throw new ThriftTableOperationException(tableId.canonical(), null, 
TableOperation.MERGE,
 +          TableOperationExceptionType.OFFLINE, "table is not online");
 +    }
 +  }
 +
 +  public TableManager getTableManager() {
 +    return getContext().getTableManager();
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    try (Manager manager = new Manager(new ServerOpts(), args)) {
 +      manager.runServer();
 +    }
 +  }
 +
 +  Manager(ServerOpts opts, String[] args) throws IOException {
 +    super("manager", opts, args);
 +    ServerContext context = super.getContext();
 +    balancerEnvironment = new BalancerEnvironmentImpl(context);
 +
 +    AccumuloConfiguration aconf = context.getConfiguration();
 +
 +    log.info("Version {}", Constants.VERSION);
 +    log.info("Instance {}", getInstanceID());
 +    timeKeeper = new ManagerTime(this, aconf);
 +    tserverSet = new LiveTServerSet(context, this);
 +    initializeBalancer();
 +
 +    this.security = context.getSecurityOperation();
 +
 +    final long tokenLifetime = 
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME);
 +
 +    authenticationTokenKeyManager = null;
 +    keyDistributor = null;
 +    if (getConfiguration().getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
 +      // SASL is enabled, create the key distributor (ZooKeeper) and manager 
(generates/rolls secret
 +      // keys)
 +      log.info("SASL is enabled, creating delegation token key manager and 
distributor");
 +      final long tokenUpdateInterval =
 +          
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL);
 +      keyDistributor = new 
ZooAuthenticationKeyDistributor(context.getZooReaderWriter(),
 +          getZooKeeperRoot() + Constants.ZDELEGATION_TOKEN_KEYS);
 +      authenticationTokenKeyManager = new 
AuthenticationTokenKeyManager(context.getSecretManager(),
 +          keyDistributor, tokenUpdateInterval, tokenLifetime);
 +      delegationTokensAvailable = true;
 +    } else {
 +      log.info("SASL is not enabled, delegation tokens will not be 
available");
 +      delegationTokensAvailable = false;
 +    }
 +  }
 +
 +  public InstanceId getInstanceID() {
 +    return getContext().getInstanceID();
 +  }
 +
 +  public String getZooKeeperRoot() {
 +    return getContext().getZooKeeperRoot();
 +  }
 +
 +  public TServerConnection getConnection(TServerInstance server) {
 +    return tserverSet.getConnection(server);
 +  }
 +
 +  public MergeInfo getMergeInfo(TableId tableId) {
 +    ServerContext context = getContext();
 +    synchronized (mergeLock) {
 +      try {
 +        String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId 
+ "/merge";
 +        if (!context.getZooReaderWriter().exists(path)) {
 +          return new MergeInfo();
 +        }
 +        byte[] data = context.getZooReaderWriter().getData(path);
 +        DataInputBuffer in = new DataInputBuffer();
 +        in.reset(data, data.length);
 +        MergeInfo info = new MergeInfo();
 +        info.readFields(in);
 +        return info;
 +      } catch (KeeperException.NoNodeException ex) {
 +        log.info("Error reading merge state, it probably just finished");
 +        return new MergeInfo();
 +      } catch (Exception ex) {
 +        log.warn("Unexpected error reading merge state", ex);
 +        return new MergeInfo();
 +      }
 +    }
 +  }
 +
 +  public void setMergeState(MergeInfo info, MergeState state)
 +      throws KeeperException, InterruptedException {
 +    ServerContext context = getContext();
 +    synchronized (mergeLock) {
 +      String path =
 +          getZooKeeperRoot() + Constants.ZTABLES + "/" + 
info.getExtent().tableId() + "/merge";
 +      info.setState(state);
 +      if (state.equals(MergeState.NONE)) {
 +        context.getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
 +      } else {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try {
 +          info.write(out);
 +        } catch (IOException ex) {
 +          throw new AssertionError("Unlikely", ex);
 +        }
 +        context.getZooReaderWriter().putPersistentData(path, out.getData(),
 +            state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
 +                : ZooUtil.NodeExistsPolicy.OVERWRITE);
 +      }
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
 +  }
 +
 +  public void clearMergeState(TableId tableId) throws KeeperException, 
InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + 
"/merge";
 +      getContext().getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s cleared", tableId);
 +  }
 +
 +  void setManagerGoalState(ManagerGoalState state) {
 +    try {
 +      getContext().getZooReaderWriter().putPersistentData(
 +          getZooKeeperRoot() + Constants.ZMANAGER_GOAL_STATE, 
state.name().getBytes(),
 +          NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception ex) {
 +      log.error("Unable to set manager goal state in zookeeper");
 +    }
 +  }
 +
 +  ManagerGoalState getManagerGoalState() {
 +    while (true) {
 +      try {
 +        byte[] data = getContext().getZooReaderWriter()
 +            .getData(getZooKeeperRoot() + Constants.ZMANAGER_GOAL_STATE);
 +        return ManagerGoalState.valueOf(new String(data));
 +      } catch (Exception e) {
 +        log.error("Problem getting real goal state from zookeeper: ", e);
 +        sleepUninterruptibly(1, SECONDS);
 +      }
 +    }
 +  }
 +
 +  public boolean hasCycled(long time) {
 +    for (TabletGroupWatcher watcher : watchers) {
 +      if (watcher.stats.lastScanFinished() < time) {
 +        return false;
 +      }
 +    }
 +
 +    return true;
 +  }
 +
 +  public void clearMigrations(TableId tableId) {
 +    synchronized (migrations) {
 +      migrations.keySet().removeIf(extent -> 
extent.tableId().equals(tableId));
 +    }
 +  }
 +
 +  enum TabletGoalState {
 +    HOSTED(TUnloadTabletGoal.UNKNOWN),
 +    UNASSIGNED(TUnloadTabletGoal.UNASSIGNED),
 +    DELETED(TUnloadTabletGoal.DELETED),
 +    SUSPENDED(TUnloadTabletGoal.SUSPENDED);
 +
 +    private final TUnloadTabletGoal unloadGoal;
 +
 +    TabletGoalState(TUnloadTabletGoal unloadGoal) {
 +      this.unloadGoal = unloadGoal;
 +    }
 +
 +    /** The purpose of unloading this tablet. */
 +    public TUnloadTabletGoal howUnload() {
 +      return unloadGoal;
 +    }
 +  }
 +
 +  TabletGoalState getSystemGoalState(TabletLocationState tls) {
 +    switch (getManagerState()) {
 +      case NORMAL:
 +        return TabletGoalState.HOSTED;
 +      case HAVE_LOCK: // fall-through intended
 +      case INITIAL: // fall-through intended
 +      case SAFE_MODE:
 +        if (tls.extent.isMeta()) {
 +          return TabletGoalState.HOSTED;
 +        }
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_METADATA_TABLETS:
 +        if (tls.extent.isRootTablet()) {
 +          return TabletGoalState.HOSTED;
 +        }
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_ROOT_TABLET:
 +        return TabletGoalState.UNASSIGNED;
 +      case STOP:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        throw new IllegalStateException("Unknown Manager State");
 +    }
 +  }
 +
 +  TabletGoalState getTableGoalState(KeyExtent extent) {
 +    TableState tableState = 
getContext().getTableManager().getTableState(extent.tableId());
 +    if (tableState == null) {
 +      return TabletGoalState.DELETED;
 +    }
 +    switch (tableState) {
 +      case DELETING:
 +        return TabletGoalState.DELETED;
 +      case OFFLINE:
 +      case NEW:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        return TabletGoalState.HOSTED;
 +    }
 +  }
 +
 +  TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
 +    KeyExtent extent = tls.extent;
 +    // Shutting down?
 +    TabletGoalState state = getSystemGoalState(tls);
 +    if (state == TabletGoalState.HOSTED) {
 +      if (!upgradeCoordinator.getStatus().isParentLevelUpgraded(extent)) {
 +        // The place where this tablet stores its metadata was not upgraded, 
so do not assign this
 +        // tablet yet.
 +        return TabletGoalState.UNASSIGNED;
 +      }
 +
 +      if (tls.current != null && serversToShutdown.contains(tls.current)) {
 +        return TabletGoalState.SUSPENDED;
 +      }
 +      // Handle merge transitions
 +      if (mergeInfo.getExtent() != null) {
 +
 +        final boolean overlaps = mergeInfo.overlaps(extent);
 +
 +        if (overlaps) {
 +          log.debug("mergeInfo overlaps: {} true", extent);
 +          switch (mergeInfo.getState()) {
 +            case NONE:
 +            case COMPLETE:
 +              break;
 +            case STARTED:
 +            case SPLITTING:
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_CHOPPED:
 +              if 
(tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) {
 +                if (tls.chopped) {
 +                  return TabletGoalState.UNASSIGNED;
 +                }
 +              } else if (tls.chopped && tls.walogs.isEmpty()) {
 +                return TabletGoalState.UNASSIGNED;
 +              }
 +
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_OFFLINE:
 +            case MERGING:
 +              return TabletGoalState.UNASSIGNED;
 +          }
 +        } else {
 +          log.trace("mergeInfo overlaps: {} false", extent);
 +        }
 +      }
 +
 +      // taking table offline?
 +      state = getTableGoalState(extent);
 +      if (state == TabletGoalState.HOSTED) {
 +        // Maybe this tablet needs to be migrated
 +        TServerInstance dest = migrations.get(extent);
 +        if (dest != null && tls.current != null && !dest.equals(tls.current)) 
{
 +          return TabletGoalState.UNASSIGNED;
 +        }
 +      }
 +    }
 +    return state;
 +  }
 +
 +  private class MigrationCleanupThread implements Runnable {
 +
 +    @Override
 +    public void run() {
 +      while (stillManager()) {
 +        if (!migrations.isEmpty()) {
 +          try {
 +            cleanupOfflineMigrations();
 +            cleanupNonexistentMigrations(getContext());
 +          } catch (Exception ex) {
 +            log.error("Error cleaning up migrations", ex);
 +          }
 +        }
 +        sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS);
 +      }
 +    }
 +
 +    /**
 +     * If a migrating tablet splits, and the tablet dies before sending the 
manager a message, the
 +     * migration will refer to a non-existing tablet, so it can never 
complete. Periodically scan
 +     * the metadata table and remove any migrating tablets that no longer 
exist.
 +     */
 +    private void cleanupNonexistentMigrations(final AccumuloClient 
accumuloClient)
 +        throws TableNotFoundException {
 +      Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
 +      TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      Set<KeyExtent> found = new HashSet<>();
 +      for (Entry<Key,Value> entry : scanner) {
 +        KeyExtent extent = KeyExtent.fromMetaPrevRow(entry);
 +        if (migrations.containsKey(extent)) {
 +          found.add(extent);
 +        }
 +      }
 +      migrations.keySet().retainAll(found);
 +    }
 +
 +    /**
 +     * If migrating a tablet for a table that is offline, the migration can 
never succeed because no
 +     * tablet server will load the tablet. check for offline tables and 
remove their migrations.
 +     */
 +    private void cleanupOfflineMigrations() {
 +      ServerContext context = getContext();
 +      TableManager manager = context.getTableManager();
 +      for (TableId tableId : context.getTableIdToNameMap().keySet()) {
 +        TableState state = manager.getTableState(tableId);
 +        if (state == TableState.OFFLINE) {
 +          clearMigrations(tableId);
 +        }
 +      }
 +    }
 +  }
 +
 +  private class StatusThread implements Runnable {
 +
 +    private boolean goodStats() {
 +      int start;
 +      switch (getManagerState()) {
 +        case UNLOAD_METADATA_TABLETS:
 +          start = 1;
 +          break;
 +        case UNLOAD_ROOT_TABLET:
 +          start = 2;
 +          break;
 +        default:
 +          start = 0;
 +      }
 +      for (int i = start; i < watchers.size(); i++) {
 +        TabletGroupWatcher watcher = watchers.get(i);
 +        if (watcher.stats.getLastManagerState() != getManagerState()) {
 +          log.debug("{}: {} != {}", watcher.getName(), 
watcher.stats.getLastManagerState(),
 +              getManagerState());
 +          return false;
 +        }
 +      }
 +      return true;
 +    }
 +
 +    @Override
 +    public void run() {
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      while (stillManager()) {
 +        long wait = DEFAULT_WAIT_FOR_WATCHER;
 +        try {
 +          switch (getManagerGoalState()) {
 +            case NORMAL:
 +              setManagerState(ManagerState.NORMAL);
 +              break;
 +            case SAFE_MODE:
 +              if (getManagerState() == ManagerState.NORMAL) {
 +                setManagerState(ManagerState.SAFE_MODE);
 +              }
 +              if (getManagerState() == ManagerState.HAVE_LOCK) {
 +                setManagerState(ManagerState.SAFE_MODE);
 +              }
 +              break;
 +            case CLEAN_STOP:
 +              switch (getManagerState()) {
 +                case NORMAL:
 +                  setManagerState(ManagerState.SAFE_MODE);
 +                  break;
 +                case SAFE_MODE: {
 +                  int count = nonMetaDataTabletsAssignedOrHosted();
 +                  log.debug(
 +                      String.format("There are %d non-metadata tablets 
assigned or hosted", count));
 +                  if (count == 0 && goodStats()) {
 +                    setManagerState(ManagerState.UNLOAD_METADATA_TABLETS);
 +                  }
 +                }
 +                  break;
 +                case UNLOAD_METADATA_TABLETS: {
 +                  int count = assignedOrHosted(MetadataTable.ID);
 +                  log.debug(
 +                      String.format("There are %d metadata tablets assigned 
or hosted", count));
 +                  if (count == 0 && goodStats()) {
 +                    setManagerState(ManagerState.UNLOAD_ROOT_TABLET);
 +                  }
 +                }
 +                  break;
 +                case UNLOAD_ROOT_TABLET:
 +                  int count = assignedOrHosted(MetadataTable.ID);
 +                  if (count > 0 && goodStats()) {
 +                    log.debug(String.format("%d metadata tablets online", 
count));
 +                    setManagerState(ManagerState.UNLOAD_ROOT_TABLET);
 +                  }
 +                  int root_count = assignedOrHosted(RootTable.ID);
 +                  if (root_count > 0 && goodStats()) {
 +                    log.debug("The root tablet is still assigned or hosted");
 +                  }
 +                  if (count + root_count == 0 && goodStats()) {
 +                    Set<TServerInstance> currentServers = 
tserverSet.getCurrentServers();
 +                    log.debug("stopping {} tablet servers", 
currentServers.size());
 +                    for (TServerInstance server : currentServers) {
 +                      try {
 +                        serversToShutdown.add(server);
 +                        
tserverSet.getConnection(server).fastHalt(managerLock);
 +                      } catch (TException e) {
 +                        // its probably down, and we don't care
 +                      } finally {
 +                        tserverSet.remove(server);
 +                      }
 +                    }
 +                    if (currentServers.isEmpty()) {
 +                      setManagerState(ManagerState.STOP);
 +                    }
 +                  }
 +                  break;
 +                default:
 +                  break;
 +              }
 +          }
 +        } catch (Exception t) {
 +          log.error("Error occurred reading / switching manager goal state. 
Will"
 +              + " continue with attempt to update status", t);
 +        }
 +
 +        Span span = TraceUtil.startSpan(this.getClass(), "run::updateStatus");
 +        try (Scope scope = span.makeCurrent()) {
 +          wait = updateStatus();
 +          eventListener.waitForEvents(wait);
 +        } catch (Exception t) {
 +          TraceUtil.setException(span, t, false);
 +          log.error("Error balancing tablets, will wait for {} (seconds) and 
then retry ",
 +              WAIT_BETWEEN_ERRORS / ONE_SECOND, t);
 +          sleepUninterruptibly(WAIT_BETWEEN_ERRORS, MILLISECONDS);
 +        } finally {
 +          span.end();
 +        }
 +      }
 +    }
 +
 +    private long updateStatus() {
 +      Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +      TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>();
 +      tserverStatus = gatherTableInformation(currentServers, temp);
 +      tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp);
 +      checkForHeldServer(tserverStatus);
 +
 +      if (!badServers.isEmpty()) {
 +        log.debug("not balancing because the balance information is 
out-of-date {}",
 +            badServers.keySet());
 +      } else if (notHosted() > 0) {
 +        log.debug("not balancing because there are unhosted tablets: {}", 
notHosted());
 +      } else if (getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
 +        log.debug("not balancing because the manager is attempting to stop 
cleanly");
 +      } else if (!serversToShutdown.isEmpty()) {
 +        log.debug("not balancing while shutting down servers {}", 
serversToShutdown);
 +      } else {
 +        for (TabletGroupWatcher tgw : watchers) {
 +          if (!tgw.isSameTserversAsLastScan(currentServers)) {
 +            log.debug("not balancing just yet, as collection of live tservers 
is in flux");
 +            return DEFAULT_WAIT_FOR_WATCHER;
 +          }
 +        }
 +        return balanceTablets();
 +      }
 +      return DEFAULT_WAIT_FOR_WATCHER;
 +    }
 +
 +    private void 
checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) 
{
 +      TServerInstance instance = null;
 +      int crazyHoldTime = 0;
 +      int someHoldTime = 0;
 +      final long maxWait = 
getConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);
 +      for (Entry<TServerInstance,TabletServerStatus> entry : 
tserverStatus.entrySet()) {
 +        if (entry.getValue().getHoldTime() > 0) {
 +          someHoldTime++;
 +          if (entry.getValue().getHoldTime() > maxWait) {
 +            instance = entry.getKey();
 +            crazyHoldTime++;
 +          }
 +        }
 +      }
 +      if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 
1) {
 +        log.warn("Tablet server {} exceeded maximum hold time: attempting to 
kill it", instance);
 +        try {
 +          TServerConnection connection = tserverSet.getConnection(instance);
 +          if (connection != null) {
 +            connection.fastHalt(managerLock);
 +          }
 +        } catch (TException e) {
 +          log.error("{}", e.getMessage(), e);
 +        }
 +        badServers.putIfAbsent(instance, new AtomicInteger(1));
 +      }
 +    }
 +
 +    private long balanceTablets() {
 +      BalanceParamsImpl params = 
BalanceParamsImpl.fromThrift(tserverStatusForBalancer,
 +          tserverStatus, migrationsSnapshot());
 +      long wait = tabletBalancer.balance(params);
 +
 +      for (TabletMigration m : 
checkMigrationSanity(tserverStatusForBalancer.keySet(),
 +          params.migrationsOut())) {
 +        KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
 +        if (migrations.containsKey(ke)) {
 +          log.warn("balancer requested migration more than once, skipping 
{}", m);
 +          continue;
 +        }
 +        TServerInstance tserverInstance = 
TabletServerIdImpl.toThrift(m.getNewTabletServer());
 +        migrations.put(ke, tserverInstance);
 +        log.debug("migration {}", m);
 +      }
 +      if (params.migrationsOut().isEmpty()) {
 +        synchronized (balancedNotifier) {
 +          balancedNotifier.notifyAll();
 +        }
 +      } else {
 +        nextEvent.event("Migrating %d more tablets, %d total", 
params.migrationsOut().size(),
 +            migrations.size());
 +      }
 +      return wait;
 +    }
 +
 +    private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> 
current,
 +        List<TabletMigration> migrations) {
 +      return migrations.stream().filter(m -> {
 +        boolean includeMigration = false;
 +        if (m.getTablet() == null) {
 +          log.error("Balancer gave back a null tablet {}", m);
 +        } else if (m.getNewTabletServer() == null) {
 +          log.error("Balancer did not set the destination {}", m);
 +        } else if (m.getOldTabletServer() == null) {
 +          log.error("Balancer did not set the source {}", m);
 +        } else if (!current.contains(m.getOldTabletServer())) {
 +          log.warn("Balancer wants to move a tablet from a server that is not 
current: {}", m);
 +        } else if (!current.contains(m.getNewTabletServer())) {
 +          log.warn("Balancer wants to move a tablet to a server that is not 
current: {}", m);
 +        } else {
 +          includeMigration = true;
 +        }
 +        return includeMigration;
 +      }).collect(Collectors.toList());
 +    }
 +
 +  }
 +
 +  private SortedMap<TServerInstance,TabletServerStatus> 
gatherTableInformation(
 +      Set<TServerInstance> currentServers, 
SortedMap<TabletServerId,TServerStatus> balancerMap) {
 +    final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
 +    int threads = 
getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
 +    ExecutorService tp = ThreadPools.getServerThreadPools()
 +        .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
 +    long start = System.currentTimeMillis();
 +    final SortedMap<TServerInstance,TabletServerStatus> result = new 
ConcurrentSkipListMap<>();
 +    final RateLimiter shutdownServerRateLimiter = 
RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
 +    for (TServerInstance serverInstance : currentServers) {
 +      final TServerInstance server = serverInstance;
 +      if (threads == 0) {
 +        // Since an unbounded thread pool is being used, rate limit how fast 
task are added to the
 +        // executor. This prevents the threads from growing large unless 
there are lots of
 +        // unresponsive tservers.
 +        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), MILLISECONDS);
 +      }
 +      tp.execute(() -> {
 +        try {
 +          Thread t = Thread.currentThread();
 +          String oldName = t.getName();
 +          try {
 +            String message = "Getting status from " + server;
 +            t.setName(message);
 +            long startForServer = System.currentTimeMillis();
 +            log.trace(message);
 +            TServerConnection connection1 = tserverSet.getConnection(server);
 +            if (connection1 == null) {
 +              throw new IOException("No connection to " + server);
 +            }
 +            TabletServerStatus status = connection1.getTableMap(false);
 +            result.put(server, status);
 +
 +            long duration = System.currentTimeMillis() - startForServer;
 +            log.trace("Got status from {} in {} ms", server, duration);
 +
 +          } finally {
 +            t.setName(oldName);
 +          }
 +        } catch (Exception ex) {
 +          log.error("unable to get tablet server status {} {}", server, 
ex.toString());
 +          log.debug("unable to get tablet server status {}", server, ex);
 +          // Attempt to shutdown server only if able to acquire. If unable, 
this tablet server
 +          // will be removed from the badServers set below and status will be 
reattempted again
 +          // MAX_BAD_STATUS_COUNT times
 +          if (badServers.computeIfAbsent(server, k -> new 
AtomicInteger(0)).incrementAndGet()
 +              > MAX_BAD_STATUS_COUNT) {
 +            if (shutdownServerRateLimiter.tryAcquire()) {
 +              log.warn("attempting to stop {}", server);
 +              try {
 +                TServerConnection connection2 = 
tserverSet.getConnection(server);
 +                if (connection2 != null) {
 +                  connection2.halt(managerLock);
 +                }
 +              } catch (TTransportException e1) {
 +                // ignore: it's probably down
 +              } catch (Exception e2) {
 +                log.info("error talking to troublesome tablet server", e2);
 +              }
 +            } else {
 +              log.warn("Unable to shutdown {} as over the shutdown limit of 
{} per minute", server,
 +                  MAX_SHUTDOWNS_PER_SEC * 60);
 +            }
 +            badServers.remove(server);
 +          }
 +        }
 +      });
 +    }
 +    tp.shutdown();
 +    try {
 +      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS);
 +    } catch (InterruptedException e) {
 +      log.debug("Interrupted while fetching status");
 +    }
 +
 +    tp.shutdownNow();
 +
 +    // Threads may still modify map after shutdownNow is called, so create an 
immutable snapshot.
 +    SortedMap<TServerInstance,TabletServerStatus> info = 
ImmutableSortedMap.copyOf(result);
 +    tserverStatus.forEach((tsi, status) -> balancerMap.put(new 
TabletServerIdImpl(tsi),
 +        TServerStatusImpl.fromThrift(status)));
 +
 +    synchronized (badServers) {
 +      badServers.keySet().retainAll(currentServers);
 +      badServers.keySet().removeAll(info.keySet());
 +    }
 +    log.debug(String.format("Finished gathering information from %d of %d 
servers in %.2f seconds",
 +        info.size(), currentServers.size(), (System.currentTimeMillis() - 
start) / 1000.));
 +
 +    return info;
 +  }
 +
 +  @Override
 +  public void run() {
 +    final ServerContext context = getContext();
 +    final String zroot = getZooKeeperRoot();
 +
 +    // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a 
sign of process health
 +    // when a hot-standby
 +    //
 +    // Start the Manager's Fate Service
 +    fateServiceHandler = new FateServiceHandler(this);
 +    managerClientHandler = new ManagerClientServiceHandler(this);
 +    // Start the Manager's Client service
 +    // Ensure that calls before the manager gets the lock fail
 +    ManagerClientService.Iface haProxy =
 +        HighlyAvailableServiceWrapper.service(managerClientHandler, this);
 +
 +    ServerAddress sa;
 +    var processor =
 +        ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, 
haProxy, getContext());
 +
 +    try {
 +      sa = TServerUtils.startServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT, processor,
 +          "Manager", "Manager Client Service Handler", null, 
Property.MANAGER_MINTHREADS,
 +          Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK,
 +          Property.GENERAL_MAX_MESSAGE_SIZE);
 +    } catch (UnknownHostException e) {
 +      throw new IllegalStateException("Unable to start server on host " + 
getHostname(), e);
 +    }
 +    clientService = sa.server;
 +    log.info("Started Manager client service at {}", sa.address);
 +
 +    // block until we can obtain the ZK lock for the manager
 +    try {
 +      getManagerLock(ServiceLock.path(zroot + Constants.ZMANAGER_LOCK));
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Exception getting manager lock", e);
 +    }
 +
 +    // If UpgradeStatus is not at complete by this moment, then things are 
currently
 +    // upgrading.
 +    if (upgradeCoordinator.getStatus() != 
UpgradeCoordinator.UpgradeStatus.COMPLETE) {
 +      managerUpgrading.set(true);
 +    }
 +
 +    try {
 +      MetricsUtil.initializeMetrics(getContext().getConfiguration(), 
this.applicationName,
 +          sa.getAddress());
 +      ManagerMetrics.init(getConfiguration(), this);
 +    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException
 +        | IllegalArgumentException | InvocationTargetException | 
NoSuchMethodException
 +        | SecurityException e1) {
 +      log.error("Error initializing metrics, metrics will not be emitted.", 
e1);
 +    }
 +
 +    recoveryManager = new RecoveryManager(this, 
TIME_TO_CACHE_RECOVERY_WAL_EXISTENCE);
 +
 +    context.getTableManager().addObserver(this);
 +
 +    Thread statusThread = Threads.createThread("Status Thread", new 
StatusThread());
 +    statusThread.start();
 +
 +    Threads.createThread("Migration Cleanup Thread", new 
MigrationCleanupThread()).start();
 +
 +    tserverSet.startListeningForTabletServerChanges();
 +
 +    try {
 +      blockForTservers();
 +    } catch (InterruptedException ex) {
 +      Thread.currentThread().interrupt();
 +    }
 +
 +    ZooReaderWriter zReaderWriter = context.getZooReaderWriter();
 +
 +    try {
 +      zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
 +        @Override
 +        public void process(WatchedEvent event) {
 +          nextEvent.event("Noticed recovery changes %s", event.getType());
 +          try {
 +            // watcher only fires once, add it back
 +            zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, this);
 +          } catch (Exception e) {
 +            log.error("Failed to add log recovery watcher back", e);
 +          }
 +        }
 +      });
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Unable to read " + zroot + 
Constants.ZRECOVERY, e);
 +    }
 +
 +    watchers.add(new TabletGroupWatcher(this,
 +        TabletStateStore.getStoreForLevel(DataLevel.USER, context, this), 
null) {
 +      @Override
 +      boolean canSuspendTablets() {
 +        // Always allow user data tablets to enter suspended state.
 +        return true;
 +      }
 +    });
 +
 +    watchers.add(new TabletGroupWatcher(this,
 +        TabletStateStore.getStoreForLevel(DataLevel.METADATA, context, this), 
watchers.get(0)) {
 +      @Override
 +      boolean canSuspendTablets() {
 +        // Allow metadata tablets to enter suspended state only if so 
configured. Generally
 +        // we'll want metadata tablets to
 +        // be immediately reassigned, even if there's a global 
table.suspension.duration
 +        // setting.
 +        return 
getConfiguration().getBoolean(Property.MANAGER_METADATA_SUSPENDABLE);
 +      }
 +    });
 +
 +    watchers.add(new TabletGroupWatcher(this,
 +        TabletStateStore.getStoreForLevel(DataLevel.ROOT, context), 
watchers.get(1)) {
 +      @Override
 +      boolean canSuspendTablets() {
 +        // Never allow root tablet to enter suspended state.
 +        return false;
 +      }
 +    });
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.start();
 +    }
 +
 +    // Once we are sure the upgrade is complete, we can safely allow fate use.
 +    try {
 +      // wait for metadata upgrade running in background to complete
 +      if (null != upgradeMetadataFuture) {
 +        upgradeMetadataFuture.get();
 +      }
 +      // Everything is fully upgraded by this point.
 +      managerUpgrading.set(false);
 +    } catch (ExecutionException | InterruptedException e) {
 +      throw new IllegalStateException("Metadata upgrade failed", e);
 +    }
 +
 +    try {
 +      final AgeOffStore<Manager> store = new AgeOffStore<>(
 +          new org.apache.accumulo.core.fate.ZooStore<>(getZooKeeperRoot() + 
Constants.ZFATE,
 +              context.getZooReaderWriter()),
 +          HOURS.toMillis(8), System::currentTimeMillis);
 +
 +      Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString);
 +      f.startTransactionRunners(getConfiguration());
 +      fateRef.set(f);
 +      fateReadyLatch.countDown();
 +
 +      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 +          .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS));
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Exception setting up FaTE cleanup 
thread", e);
 +    }
 +
 +    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 +        .scheduleWithFixedDelay(() -> 
ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));
 +
 +    initializeZkForReplication(zReaderWriter, zroot);
 +
 +    // Make sure that we have a secret key (either a new one or an old one 
from ZK) before we start
 +    // the manager client service.
 +    Thread authenticationTokenKeyManagerThread = null;
 +    if (authenticationTokenKeyManager != null && keyDistributor != null) {
 +      log.info("Starting delegation-token key manager");
 +      try {
 +        keyDistributor.initialize();
 +      } catch (KeeperException | InterruptedException e) {
 +        throw new IllegalStateException("Exception setting up 
delegation-token key manager", e);
 +      }
 +      authenticationTokenKeyManagerThread =
 +          Threads.createThread("Delegation Token Key Manager", 
authenticationTokenKeyManager);
 +      authenticationTokenKeyManagerThread.start();
 +      boolean logged = false;
 +      while (!authenticationTokenKeyManager.isInitialized()) {
 +        // Print out a status message when we start waiting for the key 
manager to get initialized
 +        if (!logged) {
 +          log.info("Waiting for AuthenticationTokenKeyManager to be 
initialized");
 +          logged = true;
 +        }
 +        sleepUninterruptibly(200, MILLISECONDS);
 +      }
 +      // And log when we are initialized
 +      log.info("AuthenticationTokenSecretManager is initialized");
 +    }
 +
 +    String address = sa.address.toString();
 +    log.info("Setting manager lock data to {}", address);
 +    try {
 +      managerLock.replaceLockData(address.getBytes());
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Exception updating manager lock", e);
 +    }
 +
 +    while (!clientService.isServing()) {
 +      sleepUninterruptibly(100, MILLISECONDS);
 +    }
 +
 +    // if the replication name is ever set, then start replication services
 +    final AtomicReference<TServer> replServer = new AtomicReference<>();
 +    ScheduledFuture<?> future = 
context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
 +      try {
 +        @SuppressWarnings("deprecation")
 +        Property p = Property.REPLICATION_NAME;
 +        if ((replServer.get() == null) && 
!getConfiguration().get(p).isEmpty()) {
 +          log.info("{} was set, starting repl services.", p.getKey());
 +          replServer.set(setupReplication());
 +        }
 +      } catch (UnknownHostException | KeeperException | InterruptedException 
e) {
 +        log.error("Error occurred starting replication services. ", e);
 +      }
 +    }, 0, 5000, MILLISECONDS);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +
 +    // checking stored user hashes if any of them uses an outdated algorithm
 +    security.validateStoredUserCreditentials();
 +
 +    // The manager is fully initialized. Clients are allowed to connect now.
 +    managerInitialized.set(true);
 +
 +    while (clientService.isServing()) {
 +      sleepUninterruptibly(500, MILLISECONDS);
 +    }
 +    log.info("Shutting down fate.");
 +    fate().shutdown();
 +
 +    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
 +    try {
 +      statusThread.join(remaining(deadline));
 +      if (null != replicationAssignerThread) {
 +        replicationAssignerThread.join(remaining(deadline));
 +      }
 +      if (null != replicationWorkThread) {
 +        replicationWorkThread.join(remaining(deadline));
 +      }
 +    } catch (InterruptedException e) {
 +      throw new IllegalStateException("Exception stopping replication 
workers", e);
 +    }
 +    var nullableReplServer = replServer.get();
 +    if (nullableReplServer != null) {
 +      nullableReplServer.stop();
 +    }
 +
 +    // Signal that we want it to stop, and wait for it to do so.
 +    if (authenticationTokenKeyManager != null) {
 +      authenticationTokenKeyManager.gracefulStop();
 +      try {
 +        if (null != authenticationTokenKeyManagerThread) {
 +          authenticationTokenKeyManagerThread.join(remaining(deadline));
 +        }
 +      } catch (InterruptedException e) {
 +        throw new IllegalStateException("Exception waiting on 
delegation-token key manager", e);
 +      }
 +    }
 +
 +    // quit, even if the tablet servers somehow jam up and the watchers
 +    // don't stop
 +    for (TabletGroupWatcher watcher : watchers) {
 +      try {
 +        watcher.join(remaining(deadline));
 +      } catch (InterruptedException e) {
 +        throw new IllegalStateException("Exception waiting on watcher", e);
 +      }
 +    }
 +    log.info("exiting");
 +  }
 +
 +  @Deprecated
 +  private void initializeZkForReplication(ZooReaderWriter zReaderWriter, 
String zroot) {
 +    try {
 +      org.apache.accumulo.server.replication.ZooKeeperInitialization
 +          .ensureZooKeeperInitialized(zReaderWriter, zroot);
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new IllegalStateException("Exception while ensuring ZooKeeper is 
initialized", e);
 +    }
 +  }
 +
 +  /**
 +   * Allows property configuration to block manager start-up waiting for a 
minimum number of
 +   * tservers to register in zookeeper. It also accepts a maximum time to 
wait - if the time
 +   * expires, the start-up will continue with any tservers available. This 
check is only performed
 +   * at manager initialization, when the manager acquires the lock. The 
following properties are
 +   * used to control the behaviour:
 +   * <ul>
 +   * <li>MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT - when set to 0 or less, no 
blocking occurs
 +   * (default behaviour) otherwise will block until the number of tservers 
are available.</li>
 +   * <li>MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT - time to wait in 
milliseconds. When set to 0 or
 +   * less, will block indefinitely.</li>
 +   * </ul>
 +   *
 +   * @throws InterruptedException if interrupted while blocking, propagated 
for caller to handle.
 +   */
 +  private void blockForTservers() throws InterruptedException {
 +    long waitStart = System.nanoTime();
 +
 +    long minTserverCount =
 +        
getConfiguration().getCount(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT);
 +
 +    if (minTserverCount <= 0) {
 +      log.info("tserver availability check disabled, continuing with-{} 
servers. To enable, set {}",
 +          tserverSet.size(), 
Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey());
 +      return;
 +    }
 +    long userWait = MILLISECONDS.toSeconds(
 +        
getConfiguration().getTimeInMillis(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT));
++
 +    // Setting retry values for defined wait timeouts
 +    long retries = 10;
 +    // Set these to the same value so the max possible wait time always 
matches the provided maxWait
 +    long initialWait = userWait / retries;
 +    long maxWaitPeriod = initialWait;
 +    long waitIncrement = 0;
 +
 +    if (userWait <= 0) {
 +      log.info("tserver availability check set to block indefinitely, To 
change, set {} > 0.",
-           Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey());
++          Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT.getKey());
 +      userWait = Long.MAX_VALUE;
 +
 +      // If indefinitely blocking, change retry values to support incremental 
backoff and logging.
 +      retries = userWait;
 +      initialWait = 1;
 +      maxWaitPeriod = 30;
 +      waitIncrement = 5;
 +    }
 +
 +    Retry tserverRetry = 
Retry.builder().maxRetries(retries).retryAfter(initialWait, SECONDS)
 +        .incrementBy(waitIncrement, SECONDS).maxWait(maxWaitPeriod, 
SECONDS).backOffFactor(1)
 +        .logInterval(30, SECONDS).createRetry();
 +
 +    log.info("Checking for tserver availability - need to reach {} servers. 
Have {}",
 +        minTserverCount, tserverSet.size());
 +
 +    boolean needTservers = tserverSet.size() < minTserverCount;
 +
 +    while (needTservers && tserverRetry.canRetry()) {
 +
 +      tserverRetry.waitForNextAttempt(log, "block until minimum tservers 
reached");
 +
 +      needTservers = tserverSet.size() < minTserverCount;
 +
 +      // suppress last message once threshold reached.
 +      if (needTservers) {
 +        tserverRetry.logRetry(log, String.format(
 +            "Blocking for tserver availability - need to reach %s servers. 
Have %s Time spent blocking %s seconds.",
 +            minTserverCount, tserverSet.size(),
 +            NANOSECONDS.toSeconds(System.nanoTime() - waitStart)));
 +      }
 +      tserverRetry.useRetry();
 +    }
 +
 +    if (tserverSet.size() < minTserverCount) {
 +      log.warn(
 +          "tserver availability check time expired - continuing. Requested 
{}, have {} tservers on line. "
 +              + " Time waiting {} sec",
 +          tserverSet.size(), minTserverCount, 
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
 +
 +    } else {
 +      log.info(
 +          "tserver availability check completed. Requested {}, have {} 
tservers on line. "
 +              + " Time waiting {} sec",
 +          tserverSet.size(), minTserverCount, 
NANOSECONDS.toSeconds(System.nanoTime() - waitStart));
 +    }
 +  }
 +
 +  @Deprecated
 +  private TServer setupReplication()
 +      throws UnknownHostException, KeeperException, InterruptedException {
 +    ServerContext context = getContext();
 +    // Start the replication coordinator which assigns tservers to service 
replication requests
 +    var impl = new 
org.apache.accumulo.manager.replication.ManagerReplicationCoordinator(this);
 +    ReplicationCoordinator.Iface haReplicationProxy =
 +        HighlyAvailableServiceWrapper.service(impl, this);
 +
 +    var processor =
 +        
ThriftProcessorTypes.getReplicationCoordinatorTProcessor(haReplicationProxy, 
getContext());
 +
 +    ServerAddress replAddress = TServerUtils.startServer(context, 
getHostname(),
 +        Property.MANAGER_REPLICATION_COORDINATOR_PORT, processor, "Manager 
Replication Coordinator",
 +        "Replication Coordinator", null, 
Property.MANAGER_REPLICATION_COORDINATOR_MINTHREADS, null,
 +        Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
 +
 +    log.info("Started replication coordinator service at " + 
replAddress.address);
 +    // Start the daemon to scan the replication table and make units of work
 +    replicationWorkThread = Threads.createThread("Replication Driver",
 +        new org.apache.accumulo.manager.replication.ReplicationDriver(this));
 +    replicationWorkThread.start();
 +
 +    // Start the daemon to assign work to tservers to replicate to our peers
 +    var wd = new org.apache.accumulo.manager.replication.WorkDriver(this);
 +    replicationAssignerThread = Threads.createThread(wd.getName(), wd);
 +    replicationAssignerThread.start();
 +
 +    // Advertise that port we used so peers don't have to be told what it is
 +    context.getZooReaderWriter().putPersistentData(
 +        getZooKeeperRoot() + Constants.ZMANAGER_REPLICATION_COORDINATOR_ADDR,
 +        replAddress.address.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
 +    return replAddress.server;
 +  }
 +
 +  private long remaining(long deadline) {
 +    return Math.max(1, deadline - System.currentTimeMillis());
 +  }
 +
 +  public ServiceLock getManagerLock() {
 +    return managerLock;
 +  }
 +
 +  private static class ManagerLockWatcher implements 
ServiceLock.AccumuloLockWatcher {
 +
 +    boolean acquiredLock = false;
 +    boolean failedToAcquireLock = false;
 +
 +    @Override
 +    public void lostLock(LockLossReason reason) {
 +      Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
 +    }
 +
 +    @Override
 +    public void unableToMonitorLockNode(final Exception e) {
 +      // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
 +      Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager 
lock node", e));
 +
 +    }
 +
 +    @Override
 +    public synchronized void acquiredLock() {
 +      log.debug("Acquired manager lock");
 +
 +      if (acquiredLock || failedToAcquireLock) {
 +        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
 +      }
 +
 +      acquiredLock = true;
 +      notifyAll();
 +    }
 +
 +    @Override
 +    public synchronized void failedToAcquireLock(Exception e) {
 +      log.warn("Failed to get manager lock", e);
 +
 +      if (e instanceof NoAuthException) {
 +        String msg = "Failed to acquire manager lock due to incorrect 
ZooKeeper authentication.";
 +        log.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
 +        Halt.halt(msg, -1);
 +      }
 +
 +      if (acquiredLock) {
 +        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + 
failedToAcquireLock,
 +            -1);
 +      }
 +
 +      failedToAcquireLock = true;
 +      notifyAll();
 +    }
 +
 +    public synchronized void waitForChange() {
 +      while (!acquiredLock && !failedToAcquireLock) {
 +        try {
 +          wait();
 +        } catch (InterruptedException e) {}
 +      }
 +    }
 +  }
 +
 +  private void getManagerLock(final ServiceLockPath zManagerLoc)
 +      throws KeeperException, InterruptedException {
 +    var zooKeeper = getContext().getZooReaderWriter().getZooKeeper();
 +    log.info("trying to get manager lock");
 +
 +    final String managerClientAddress =
 +        getHostname() + ":" + 
getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0];
 +
 +    UUID zooLockUUID = UUID.randomUUID();
 +    while (true) {
 +
 +      ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher();
 +      managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
 +      managerLock.lock(managerLockWatcher, managerClientAddress.getBytes());
 +
 +      managerLockWatcher.waitForChange();
 +
 +      if (managerLockWatcher.acquiredLock) {
 +        break;
 +      }
 +
 +      if (!managerLockWatcher.failedToAcquireLock) {
 +        throw new IllegalStateException("manager lock in unknown state");
 +      }
 +
 +      managerLock.tryToCancelAsyncLockOrUnlock();
 +
 +      sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS);
 +    }
 +
 +    setManagerState(ManagerState.HAVE_LOCK);
 +  }
 +
 +  @Override
 +  public void update(LiveTServerSet current, Set<TServerInstance> deleted,
 +      Set<TServerInstance> added) {
 +    // if we have deleted or added tservers, then adjust our dead server list
 +    if (!deleted.isEmpty() || !added.isEmpty()) {
 +      DeadServerList obit = new DeadServerList(getContext());
 +      if (!added.isEmpty()) {
 +        log.info("New servers: {}", added);
 +        for (TServerInstance up : added) {
 +          obit.delete(up.getHostPort());
 +        }
 +      }
 +      for (TServerInstance dead : deleted) {
 +        String cause = "unexpected failure";
 +        if (serversToShutdown.contains(dead)) {
 +          cause = "clean shutdown"; // maybe an incorrect assumption
 +        }
 +        if (!getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP)) {
 +          obit.post(dead.getHostPort(), cause);
 +        }
 +      }
 +
 +      Set<TServerInstance> unexpected = new HashSet<>(deleted);
 +      unexpected.removeAll(this.serversToShutdown);
 +      if (!unexpected.isEmpty()
 +          && (stillManager() && 
!getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP))) {
 +        log.warn("Lost servers {}", unexpected);
 +      }
 +      serversToShutdown.removeAll(deleted);
 +      badServers.keySet().removeAll(deleted);
 +      // clear out any bad server with the same host/port as a new server
 +      synchronized (badServers) {
 +        cleanListByHostAndPort(badServers.keySet(), deleted, added);
 +      }
 +      synchronized (serversToShutdown) {
 +        cleanListByHostAndPort(serversToShutdown, deleted, added);
 +      }
 +
 +      synchronized (migrations) {
 +        Iterator<Entry<KeyExtent,TServerInstance>> iter = 
migrations.entrySet().iterator();
 +        while (iter.hasNext()) {
 +          Entry<KeyExtent,TServerInstance> entry = iter.next();
 +          if (deleted.contains(entry.getValue())) {
 +            log.info("Canceling migration of {} to {}", entry.getKey(), 
entry.getValue());
 +            iter.remove();
 +          }
 +        }
 +      }
 +      nextEvent.event("There are now %d tablet servers", current.size());
 +    }
 +
 +    // clear out any servers that are no longer current
 +    // this is needed when we are using a fate operation to shutdown a 
tserver as it
 +    // will continue to add the server to the serversToShutdown 
(ACCUMULO-4410)
 +    serversToShutdown.retainAll(current.getCurrentServers());
 +  }
 +
 +  private static void cleanListByHostAndPort(Collection<TServerInstance> 
badServers,
 +      Set<TServerInstance> deleted, Set<TServerInstance> added) {
 +    Iterator<TServerInstance> badIter = badServers.iterator();
 +    while (badIter.hasNext()) {
 +      TServerInstance bad = badIter.next();
 +      for (TServerInstance add : added) {
 +        if (bad.getHostPort().equals(add.getHostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +      for (TServerInstance del : deleted) {
 +        if (bad.getHostPort().equals(del.getHostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void stateChanged(TableId tableId, TableState state) {
 +    nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, 
state);
 +    if (state == TableState.OFFLINE) {
 +      clearMigrations(tableId);
 +    }
 +  }
 +
 +  @Override
 +  public void initialize() {}
 +
 +  @Override
 +  public void sessionExpired() {}
 +
 +  @Override
 +  public Set<TableId> onlineTables() {
 +    Set<TableId> result = new HashSet<>();
 +    if (getManagerState() != ManagerState.NORMAL) {
 +      if (getManagerState() != ManagerState.UNLOAD_METADATA_TABLETS) {
 +        result.add(MetadataTable.ID);
 +      }
 +      if (getManagerState() != ManagerState.UNLOAD_ROOT_TABLET) {
 +        result.add(RootTable.ID);
 +      }
 +      return result;
 +    }
 +    ServerContext context = getContext();
 +    TableManager manager = context.getTableManager();
 +
 +    for (TableId tableId : context.getTableIdToNameMap().keySet()) {
 +      TableState state = manager.getTableState(tableId);
 +      if ((state != null) && (state == TableState.ONLINE)) {
 +        result.add(tableId);
 +      }
 +    }
 +    return result;
 +  }
 +
 +  @Override
 +  public Set<TServerInstance> onlineTabletServers() {
 +    return tserverSet.getCurrentServers();
 +  }
 +
 +  @Override
 +  public Collection<MergeInfo> merges() {
 +    List<MergeInfo> result = new ArrayList<>();
 +    for (TableId tableId : getContext().getTableIdToNameMap().keySet()) {
 +      result.add(getMergeInfo(tableId));
 +    }
 +    return result;
 +  }
 +
 +  // recovers state from the persistent transaction to shutdown a server
 +  public void shutdownTServer(TServerInstance server) {
 +    nextEvent.event("Tablet Server shutdown requested for %s", server);
 +    serversToShutdown.add(server);
 +  }
 +
 +  public EventCoordinator getEventCoordinator() {
 +    return nextEvent;
 +  }
 +
 +  public VolumeManager getVolumeManager() {
 +    return getContext().getVolumeManager();
 +  }
 +
 +  public void assignedTablet(KeyExtent extent) {
 +    if (extent.isMeta() && 
getManagerState().equals(ManagerState.UNLOAD_ROOT_TABLET)) {
 +      setManagerState(ManagerState.UNLOAD_METADATA_TABLETS);
 +    }
 +    // probably too late, but try anyhow
 +    if (extent.isRootTablet() && getManagerState().equals(ManagerState.STOP)) 
{
 +      setManagerState(ManagerState.UNLOAD_ROOT_TABLET);
 +    }
 +  }
 +
 +  @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs 
triage")
 +  public void waitForBalance() {
 +    synchronized (balancedNotifier) {
 +      long eventCounter;
 +      do {
 +        eventCounter = nextEvent.waitForEvents(0, 0);
 +        try {
 +          balancedNotifier.wait();
 +        } catch (InterruptedException e) {
 +          log.debug(e.toString(), e);
 +        }
 +      } while (displayUnassigned() > 0 || !migrations.isEmpty()
 +          || eventCounter != nextEvent.waitForEvents(0, 0));
 +    }
 +  }
 +
 +  public ManagerMonitorInfo getManagerMonitorInfo() {
 +    final ManagerMonitorInfo result = new ManagerMonitorInfo();
 +
 +    result.tServerInfo = new ArrayList<>();
 +    result.tableMap = new HashMap<>();
 +    for (Entry<TServerInstance,TabletServerStatus> serverEntry : 
tserverStatus.entrySet()) {
 +      final TabletServerStatus status = serverEntry.getValue();
 +      result.tServerInfo.add(status);
 +      for (Entry<String,TableInfo> entry : status.tableMap.entrySet()) {
 +        TableInfoUtil.add(result.tableMap.computeIfAbsent(entry.getKey(), k 
-> new TableInfo()),
 +            entry.getValue());
 +      }
 +    }
 +    result.badTServers = new HashMap<>();
 +    synchronized (badServers) {
 +      for (TServerInstance bad : badServers.keySet()) {
 +        result.badTServers.put(bad.getHostPort(), 
TabletServerState.UNRESPONSIVE.getId());
 +      }
 +    }
 +    result.state = getManagerState();
 +    result.goalState = getManagerGoalState();
 +    result.unassignedTablets = displayUnassigned();
 +    result.serversShuttingDown = new HashSet<>();
 +    synchronized (serversToShutdown) {
 +      for (TServerInstance server : serversToShutdown) {
 +        result.serversShuttingDown.add(server.getHostPort());
 +      }
 +    }
 +    DeadServerList obit = new DeadServerList(getContext());
 +    result.deadTabletServers = obit.getList();
 +    result.bulkImports = bulkImportStatus.getBulkLoadStatus();
 +    return result;
 +  }
 +
 +  /**
 +   * Can delegation tokens be generated for users
 +   */
 +  public boolean delegationTokensAvailable() {
 +    return delegationTokensAvailable;
 +  }
 +
 +  @Override
 +  public Set<KeyExtent> migrationsSnapshot() {
 +    Set<KeyExtent> migrationKeys;
 +    synchronized (migrations) {
 +      migrationKeys = new HashSet<>(migrations.keySet());
 +    }
 +    return Collections.unmodifiableSet(migrationKeys);
 +  }
 +
 +  @Override
 +  public Set<TServerInstance> shutdownServers() {
 +    synchronized (serversToShutdown) {
 +      return new HashSet<>(serversToShutdown);
 +    }
 +  }
 +
 +  public void updateBulkImportStatus(String directory, BulkImportState state) 
{
 +    
bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), 
state);
 +  }
 +
 +  public void removeBulkImportStatus(String directory) {
 +    
bulkImportStatus.removeBulkImportStatus(Collections.singletonList(directory));
 +  }
 +
 +  /**
 +   * Return how long (in milliseconds) there has been a manager overseeing 
this cluster. This is an
 +   * approximately monotonic clock, which will be approximately consistent 
between different
 +   * managers or different runs of the same manager.
 +   */
 +  public Long getSteadyTime() {
 +    return timeKeeper.getTime();
 +  }
 +
 +  @Override
 +  public boolean isActiveService() {
 +    return managerInitialized.get();
 +  }
 +
 +  @Override
 +  public boolean isUpgrading() {
 +    return managerUpgrading.get();
 +  }
 +
 +  void initializeBalancer() {
 +    var localTabletBalancer = 
Property.createInstanceFromPropertyName(getConfiguration(),
 +        Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new 
SimpleLoadBalancer());
 +    localTabletBalancer.init(balancerEnvironment);
 +    tabletBalancer = localTabletBalancer;
 +  }
 +
 +  Class<?> getBalancerClass() {
 +    return tabletBalancer.getClass();
 +  }
 +
 +  void getAssignments(SortedMap<TServerInstance,TabletServerStatus> 
currentStatus,
 +      Map<KeyExtent,TServerInstance> unassigned, 
Map<KeyExtent,TServerInstance> assignedOut) {
 +    AssignmentParamsImpl params =
 +        AssignmentParamsImpl.fromThrift(currentStatus, unassigned, 
assignedOut);
 +    tabletBalancer.getAssignments(params);
 +  }
 +}

Reply via email to