http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9537f4e,0000000..83e83cf
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1,1315 -1,0 +1,1317 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Namespaces;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import 
org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import 
org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.master.thrift.MasterState;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.security.NamespacePermission;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AgeOffStore;
 +import org.apache.accumulo.fate.Fate;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.master.recovery.RecoveryManager;
 +import org.apache.accumulo.master.state.TableCounts;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.init.Initialize;
 +import org.apache.accumulo.server.master.LiveTServerSet;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 +import org.apache.accumulo.server.master.balancer.TabletBalancer;
 +import org.apache.accumulo.server.master.state.CurrentState;
 +import org.apache.accumulo.server.master.state.DeadServerList;
 +import org.apache.accumulo.server.master.state.MergeInfo;
 +import org.apache.accumulo.server.master.state.MergeState;
 +import org.apache.accumulo.server.master.state.MetaDataStateStore;
 +import org.apache.accumulo.server.master.state.RootTabletStateStore;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.master.state.ZooStore;
 +import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.security.handler.ZKPermHandler;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tables.TableObserver;
 +import org.apache.accumulo.server.util.DefaultMap;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.util.RpcWrapper;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.accumulo.start.classloader.vfs.ContextManager;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.Stat;
 +
 +import com.google.common.collect.Iterables;
 +
 +/**
 + * The Master is responsible for assigning and balancing tablets to tablet 
servers.
 + *
 + * The master will also coordinate log recoveries and reports general status.
 + */
 +public class Master implements LiveTServerSet.Listener, TableObserver, 
CurrentState {
 +
 +  final static Logger log = Logger.getLogger(Master.class);
 +
 +  final static int ONE_SECOND = 1000;
 +  final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID);
 +  final private static Text ROOT_TABLE_ID = new Text(RootTable.ID);
 +  final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * 
ONE_SECOND;
 +  final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 +  final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 +  final static int MAX_TSERVER_WORK_CHUNK = 5000;
 +  final private static int MAX_BAD_STATUS_COUNT = 3;
 +
 +  final VolumeManager fs;
 +  final private Instance instance;
 +  final private String hostname;
 +  final LiveTServerSet tserverSet;
 +  final private List<TabletGroupWatcher> watchers = new 
ArrayList<TabletGroupWatcher>();
 +  final SecurityOperation security;
 +  final Map<TServerInstance,AtomicInteger> badServers = 
Collections.synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new 
AtomicInteger()));
 +  final Set<TServerInstance> serversToShutdown = 
Collections.synchronizedSet(new HashSet<TServerInstance>());
 +  final SortedMap<KeyExtent,TServerInstance> migrations = 
Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
 +  final EventCoordinator nextEvent = new EventCoordinator();
 +  final private Object mergeLock = new Object();
 +  RecoveryManager recoveryManager = null;
 +
 +  ZooLock masterLock = null;
 +  private TServer clientService = null;
 +  TabletBalancer tabletBalancer;
 +
 +  private MasterState state = MasterState.INITIAL;
 +
 +  Fate<Master> fate;
 +
 +  volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = 
Collections.unmodifiableSortedMap(new 
TreeMap<TServerInstance,TabletServerStatus>());
 +
 +  synchronized MasterState getMasterState() {
 +    return state;
 +  }
 +
 +  public boolean stillMaster() {
 +    return getMasterState() != MasterState.STOP;
 +  }
 +
 +  static final boolean X = true;
 +  static final boolean _ = false;
 +  // @formatter:off
 +  static final boolean transitionOK[][] = {
 +      //                              INITIAL HAVE_LOCK SAFE_MODE NORMAL 
UNLOAD_META UNLOAD_ROOT STOP
 +      /* INITIAL */                   {X,     X,        _,        _,      _,  
       _,          X},
 +      /* HAVE_LOCK */                 {_,     X,        X,        X,      _,  
       _,          X},
 +      /* SAFE_MODE */                 {_,     _,        X,        X,      X,  
       _,          X},
 +      /* NORMAL */                    {_,     _,        X,        X,      X,  
       _,          X},
 +      /* UNLOAD_METADATA_TABLETS */   {_,     _,        X,        X,      X,  
       X,          X},
 +      /* UNLOAD_ROOT_TABLET */        {_,     _,        _,        X,      X,  
       X,          X},
 +      /* STOP */                      {_,     _,        _,        _,      _,  
       X,          X}};
 +  //@formatter:on
 +  synchronized void setMasterState(MasterState newState) {
 +    if (state.equals(newState))
 +      return;
 +    if (!transitionOK[state.ordinal()][newState.ordinal()]) {
 +      log.error("Programmer error: master should not transition from " + 
state + " to " + newState);
 +    }
 +    MasterState oldState = state;
 +    state = newState;
 +    nextEvent.event("State changed from %s to %s", oldState, newState);
 +    if (newState == MasterState.STOP) {
 +      // Give the server a little time before shutdown so the client
 +      // thread requesting the stop can return
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +        @Override
 +        public void run() {
 +          // This frees the main thread and will cause the master to exit
 +          clientService.stop();
 +          Master.this.nextEvent.event("stopped event loop");
 +        }
 +
 +      }, 100l, 1000l);
 +    }
 +
 +    if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
 +      upgradeZookeeper();
 +    }
 +
 +    if (oldState != newState && (newState == MasterState.NORMAL)) {
 +      upgradeMetadata();
 +    }
 +  }
 +
 +  private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws 
Exception {
 +    String dirZPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_PATH;
 +
 +    if (!zoo.exists(dirZPath)) {
 +      Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + 
"/root_tablet");
 +      if (fs.exists(oldPath)) {
 +        String newPath = fs.choose(ServerConstants.getBaseUris()) + 
Constants.HDFS_TABLES_DIR + Path.SEPARATOR + RootTable.ID;
 +        fs.mkdirs(new Path(newPath));
 +        if (!fs.rename(oldPath, new Path(newPath))) {
 +          throw new IOException("Failed to move root tablet from " + oldPath 
+ " to " + newPath);
 +        }
 +
 +        log.info("Upgrade renamed " + oldPath + " to " + newPath);
 +      }
 +
 +      Path location = null;
 +
 +      for (String basePath : ServerConstants.getTablesDirs()) {
 +        Path path = new Path(basePath + "/" + RootTable.ID + 
RootTable.ROOT_TABLET_LOCATION);
 +        if (fs.exists(path)) {
 +          if (location != null) {
 +            throw new IllegalStateException("Root table at multiple locations 
" + location + " " + path);
 +          }
 +
 +          location = path;
 +        }
 +      }
 +
 +      if (location == null)
 +        throw new IllegalStateException("Failed to find root tablet");
 +
 +      log.info("Upgrade setting root table location in zookeeper " + 
location);
 +      zoo.putPersistentData(dirZPath, location.toString().getBytes(), 
NodeExistsPolicy.FAIL);
 +    }
 +  }
 +
 +  private boolean haveUpgradedZooKeeper = false;
 +
 +  private void upgradeZookeeper() {
 +    // 1.5.1 and 1.6.0 both do some state checking after obtaining the 
zoolock for the
 +    // monitor and before starting up. It's not tied to the data version at 
all (and would
 +    // introduce unnecessary complexity to try to make the master do it), but 
be aware
 +    // that the master is not the only thing that may alter zookeeper before 
starting.
 +
 +    final int accumuloPersistentVersion = 
Accumulo.getAccumuloPersistentVersion(fs);
 +    if (Accumulo.persistentVersionNeedsUpgrade(accumuloPersistentVersion)) {
 +      // This Master hasn't started Fate yet, so any outstanding transactions 
must be from before the upgrade.
 +      // Change to Guava's Verify once we use Guava 17.
 +      if (null != fate) {
 +        throw new IllegalStateException(
 +            "Access to Fate should not have been initialized prior to the 
Master transitioning to active. Please save all logs and file a bug.");
 +      }
 +      Accumulo.abortIfFateTransactions();
 +      try {
 +        log.info("Upgrading zookeeper");
 +
 +        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +        final String zooRoot = ZooUtil.getRoot(instance);
 +
 +        if (accumuloPersistentVersion == 
ServerConstants.TWO_DATA_VERSIONS_AGO) {
 +          log.debug("Handling updates for version " + 
ServerConstants.TWO_DATA_VERSIONS_AGO);
 +
 +          log.debug("Cleaning out remnants of logger role.");
 +          zoo.recursiveDelete(zooRoot + "/loggers", NodeMissingPolicy.SKIP);
 +          zoo.recursiveDelete(zooRoot + "/dead/loggers", 
NodeMissingPolicy.SKIP);
 +
 +          final byte[] zero = new byte[] {'0'};
 +          log.debug("Initializing recovery area.");
 +          zoo.putPersistentData(zooRoot + Constants.ZRECOVERY, zero, 
NodeExistsPolicy.SKIP);
 +
 +          for (String id : zoo.getChildren(zooRoot + Constants.ZTABLES)) {
 +            log.debug("Prepping table " + id + " for compaction 
cancellations.");
 +            zoo.putPersistentData(zooRoot + Constants.ZTABLES + "/" + id + 
Constants.ZTABLE_COMPACT_CANCEL_ID, zero, NodeExistsPolicy.SKIP);
 +          }
 +        }
 +
 +        // create initial namespaces
 +        String namespaces = ZooUtil.getRoot(instance) + Constants.ZNAMESPACES;
 +        zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
 +        for (Pair<String,String> namespace : Iterables.concat(
 +            Collections.singleton(new 
Pair<String,String>(Namespaces.ACCUMULO_NAMESPACE, 
Namespaces.ACCUMULO_NAMESPACE_ID)),
 +            Collections.singleton(new 
Pair<String,String>(Namespaces.DEFAULT_NAMESPACE, 
Namespaces.DEFAULT_NAMESPACE_ID)))) {
 +          String ns = namespace.getFirst();
 +          String id = namespace.getSecond();
 +          log.debug("Upgrade creating namespace \"" + ns + "\" (ID: " + id + 
")");
 +          if (!Namespaces.exists(instance, id))
 +            TableManager.prepareNewNamespaceState(instance.getInstanceID(), 
id, ns, NodeExistsPolicy.SKIP);
 +        }
 +
 +        // create root table
 +        log.debug("Upgrade creating table " + RootTable.NAME + " (ID: " + 
RootTable.ID + ")");
 +        TableManager.prepareNewTableState(instance.getInstanceID(), 
RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, 
TableState.ONLINE,
 +            NodeExistsPolicy.SKIP);
 +        Initialize.initMetadataConfig();
 +        // ensure root user can flush root table
 +        
security.grantTablePermission(SystemCredentials.get().toThrift(instance), 
security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE,
 +            Namespaces.ACCUMULO_NAMESPACE_ID);
 +
 +        // put existing tables in the correct namespaces
 +        String tables = ZooUtil.getRoot(instance) + Constants.ZTABLES;
 +        for (String tableId : zoo.getChildren(tables)) {
 +          String targetNamespace = (MetadataTable.ID.equals(tableId) || 
RootTable.ID.equals(tableId)) ? Namespaces.ACCUMULO_NAMESPACE_ID
 +              : Namespaces.DEFAULT_NAMESPACE_ID;
-           log.debug("Upgrade moving table " + new String(zoo.getData(tables + 
"/" + tableId + Constants.ZTABLE_NAME, null), Constants.UTF8) + " (ID: "
++          log.debug("Upgrade moving table " + new String(zoo.getData(tables + 
"/" + tableId + Constants.ZTABLE_NAME, null), UTF_8) + " (ID: "
 +              + tableId + ") into namespace with ID " + targetNamespace);
-           zoo.putPersistentData(tables + "/" + tableId + 
Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(Constants.UTF8), 
NodeExistsPolicy.SKIP);
++          zoo.putPersistentData(tables + "/" + tableId + 
Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(UTF_8), 
NodeExistsPolicy.SKIP);
 +        }
 +
 +        // rename metadata table
 +        log.debug("Upgrade renaming table " + MetadataTable.OLD_NAME + " (ID: 
" + MetadataTable.ID + ") to " + MetadataTable.NAME);
-         zoo.putPersistentData(tables + "/" + MetadataTable.ID + 
Constants.ZTABLE_NAME, 
Tables.qualify(MetadataTable.NAME).getSecond().getBytes(Constants.UTF8),
++        zoo.putPersistentData(tables + "/" + MetadataTable.ID + 
Constants.ZTABLE_NAME, 
Tables.qualify(MetadataTable.NAME).getSecond().getBytes(UTF_8),
 +            NodeExistsPolicy.OVERWRITE);
 +
 +        moveRootTabletToRootTable(zoo);
 +
 +        // add system namespace permissions to existing users
 +        ZKPermHandler perm = new ZKPermHandler();
 +        perm.initialize(instance.getInstanceID(), true);
 +        String users = ZooUtil.getRoot(instance) + "/users";
 +        for (String user : zoo.getChildren(users)) {
 +          zoo.putPersistentData(users + "/" + user + "/Namespaces", new 
byte[0], NodeExistsPolicy.SKIP);
 +          perm.grantNamespacePermission(user, 
Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
 +        }
 +        perm.grantNamespacePermission("root", 
Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
 +        haveUpgradedZooKeeper = true;
 +      } catch (Exception ex) {
 +        log.fatal("Error performing upgrade", ex);
 +        System.exit(1);
 +      }
 +    }
 +  }
 +
 +  private final AtomicBoolean upgradeMetadataRunning = new 
AtomicBoolean(false);
 +  private final CountDownLatch waitForMetadataUpgrade = new CountDownLatch(1);
 +
 +  private final ServerConfiguration serverConfig;
 +
 +  private void upgradeMetadata() {
 +    // we make sure we're only doing the rest of this method once so that we 
can signal to other threads that an upgrade wasn't needed.
 +    if (upgradeMetadataRunning.compareAndSet(false, true)) {
 +      final int accumuloPersistentVersion = 
Accumulo.getAccumuloPersistentVersion(fs);
 +      if (Accumulo.persistentVersionNeedsUpgrade(accumuloPersistentVersion)) {
 +        // sanity check that we passed the Fate verification prior to 
ZooKeeper upgrade, and that Fate still hasn't been started.
 +        // Change both to use Guava's Verify once we use Guava 17.
 +        if (!haveUpgradedZooKeeper) {
 +          throw new IllegalStateException(
 +              "We should only attempt to upgrade Accumulo's metadata table if 
we've already upgraded ZooKeeper. Please save all logs and file a bug.");
 +        }
 +        if (null != fate) {
 +          throw new IllegalStateException(
 +              "Access to Fate should not have been initialized prior to the 
Master finishing upgrades. Please save all logs and file a bug.");
 +        }
 +        Runnable upgradeTask = new Runnable() {
 +          @Override
 +          public void run() {
 +            try {
 +              log.info("Starting to upgrade metadata table.");
 +              if (accumuloPersistentVersion == 
ServerConstants.TWO_DATA_VERSIONS_AGO) {
 +                log.info("Updating Delete Markers in metadata table for 
version 1.4");
 +                MetadataTableUtil.moveMetaDeleteMarkersFrom14(instance, 
SystemCredentials.get());
 +              } else {
 +                log.info("Updating Delete Markers in metadata table.");
 +                MetadataTableUtil.moveMetaDeleteMarkers(instance, 
SystemCredentials.get());
 +              }
 +              log.info("Updating persistent data version.");
 +              Accumulo.updateAccumuloVersion(fs, accumuloPersistentVersion);
 +              log.info("Upgrade complete");
 +              waitForMetadataUpgrade.countDown();
 +            } catch (Exception ex) {
 +              log.fatal("Error performing upgrade", ex);
 +              System.exit(1);
 +            }
 +
 +          }
 +        };
 +
 +        // need to run this in a separate thread because a lock is held that 
prevents metadata tablets from being assigned and this task writes to the
 +        // metadata table
 +        new Thread(upgradeTask).start();
 +      } else {
 +        waitForMetadataUpgrade.countDown();
 +      }
 +    }
 +  }
 +
 +  private int assignedOrHosted(Text tableId) {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      TableCounts count = watcher.getStats(tableId);
 +      result += count.hosted() + count.assigned();
 +    }
 +    return result;
 +  }
 +
 +  private int totalAssignedOrHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.hosted();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private int nonMetaDataTabletsAssignedOrHosted() {
 +    return totalAssignedOrHosted() - assignedOrHosted(new 
Text(MetadataTable.ID)) - assignedOrHosted(new Text(RootTable.ID));
 +  }
 +
 +  private int notHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.assignedToDeadServers();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  // The number of unassigned tablets that should be assigned: displayed on 
the monitor page
 +  int displayUnassigned() {
 +    int result = 0;
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        // Count offline tablets for online tables
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableManager manager = TableManager.getInstance();
 +          for (Entry<Text,TableCounts> entry : watcher.getStats().entrySet()) 
{
 +            Text tableId = entry.getKey();
 +            TableCounts counts = entry.getValue();
 +            TableState tableState = manager.getTableState(tableId.toString());
 +            if (tableState != null && tableState.equals(TableState.ONLINE)) {
 +              result += counts.unassigned() + counts.assignedToDeadServers() 
+ counts.assigned();
 +            }
 +          }
 +        }
 +        break;
 +      case SAFE_MODE:
 +        // Count offline tablets for the metadata table
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(METADATA_TABLE_ID).unassigned();
 +        }
 +        break;
 +      case UNLOAD_METADATA_TABLETS:
 +      case UNLOAD_ROOT_TABLET:
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(METADATA_TABLE_ID).unassigned();
 +        }
 +        break;
 +      default:
 +        break;
 +    }
 +    return result;
 +  }
 +
 +  public void mustBeOnline(final String tableId) throws 
ThriftTableOperationException {
 +    Tables.clearCache(instance);
 +    if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE))
 +      throw new ThriftTableOperationException(tableId, null, 
TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not 
online");
 +  }
 +
 +  public Connector getConnector() throws AccumuloException, 
AccumuloSecurityException {
 +    return instance.getConnector(SystemCredentials.get().getPrincipal(), 
SystemCredentials.get().getToken());
 +  }
 +
 +  private Master(ServerConfiguration config, VolumeManager fs, String 
hostname) throws IOException {
 +    this.serverConfig = config;
 +    this.instance = config.getInstance();
 +    this.fs = fs;
 +    this.hostname = hostname;
 +
 +    AccumuloConfiguration aconf = serverConfig.getConfiguration();
 +
 +    log.info("Version " + Constants.VERSION);
 +    log.info("Instance " + instance.getInstanceID());
 +    
ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +    security = AuditedSecurityOperation.getInstance();
 +    tserverSet = new LiveTServerSet(instance, config.getConfiguration(), 
this);
 +    this.tabletBalancer = 
aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, 
TabletBalancer.class, new DefaultLoadBalancer());
 +    this.tabletBalancer.init(serverConfig);
 +
 +    try {
 +      AccumuloVFSClassLoader.getContextManager().setContextConfig(new 
ContextManager.DefaultContextsConfig(new Iterable<Entry<String,String>>() {
 +        @Override
 +        public Iterator<Entry<String,String>> iterator() {
 +          return getSystemConfiguration().iterator();
 +        }
 +      }));
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +  }
 +
 +  public TServerConnection getConnection(TServerInstance server) {
 +    return tserverSet.getConnection(server);
 +  }
 +
 +  public MergeInfo getMergeInfo(Text tableId) {
 +    synchronized (mergeLock) {
 +      try {
 +        String path = ZooUtil.getRoot(instance.getInstanceID()) + 
Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +        if (!ZooReaderWriter.getInstance().exists(path))
 +          return new MergeInfo();
 +        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
 +        DataInputBuffer in = new DataInputBuffer();
 +        in.reset(data, data.length);
 +        MergeInfo info = new MergeInfo();
 +        info.readFields(in);
 +        return info;
 +      } catch (KeeperException.NoNodeException ex) {
 +        log.info("Error reading merge state, it probably just finished");
 +        return new MergeInfo();
 +      } catch (Exception ex) {
 +        log.warn("Unexpected error reading merge state", ex);
 +        return new MergeInfo();
 +      }
 +    }
 +  }
 +
 +  public void setMergeState(MergeInfo info, MergeState state) throws 
IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + 
Constants.ZTABLES + "/" + info.getExtent().getTableId().toString() + "/merge";
 +      info.setState(state);
 +      if (state.equals(MergeState.NONE)) {
 +        ZooReaderWriter.getInstance().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
 +      } else {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try {
 +          info.write(out);
 +        } catch (IOException ex) {
 +          throw new RuntimeException("Unlikely", ex);
 +        }
 +        ZooReaderWriter.getInstance().putPersistentData(path, out.getData(),
 +            state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL 
: ZooUtil.NodeExistsPolicy.OVERWRITE);
 +      }
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
 +  }
 +
 +  public void clearMergeState(Text tableId) throws IOException, 
KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + 
Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +      ZooReaderWriter.getInstance().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s cleared", tableId);
 +  }
 +
 +  void setMasterGoalState(MasterGoalState state) {
 +    try {
 +      
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + 
Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
 +          NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception ex) {
 +      log.error("Unable to set master goal state in zookeeper");
 +    }
 +  }
 +
 +  MasterGoalState getMasterGoalState() {
 +    while (true)
 +      try {
 +        byte[] data = 
ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + 
Constants.ZMASTER_GOAL_STATE, null);
 +        return MasterGoalState.valueOf(new String(data));
 +      } catch (Exception e) {
 +        log.error("Problem getting real goal state: " + e);
 +        UtilWaitThread.sleep(1000);
 +      }
 +  }
 +
 +  public boolean hasCycled(long time) {
 +    for (TabletGroupWatcher watcher : watchers) {
 +      if (watcher.stats.lastScanFinished() < time)
 +        return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  public void clearMigrations(String tableId) {
 +    synchronized (migrations) {
 +      Iterator<KeyExtent> iterator = migrations.keySet().iterator();
 +      while (iterator.hasNext()) {
 +        KeyExtent extent = iterator.next();
 +        if (extent.getTableId().toString().equals(tableId)) {
 +          iterator.remove();
 +        }
 +      }
 +    }
 +  }
 +
 +  static enum TabletGoalState {
 +    HOSTED, UNASSIGNED, DELETED
 +  };
 +
 +  TabletGoalState getSystemGoalState(TabletLocationState tls) {
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        return TabletGoalState.HOSTED;
 +      case HAVE_LOCK: // fall-through intended
 +      case INITIAL: // fall-through intended
 +      case SAFE_MODE:
 +        if (tls.extent.isMeta())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_METADATA_TABLETS:
 +        if (tls.extent.isRootTablet())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_ROOT_TABLET:
 +        return TabletGoalState.UNASSIGNED;
 +      case STOP:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        throw new IllegalStateException("Unknown Master State");
 +    }
 +  }
 +
 +  TabletGoalState getTableGoalState(KeyExtent extent) {
 +    TableState tableState = 
TableManager.getInstance().getTableState(extent.getTableId().toString());
 +    if (tableState == null)
 +      return TabletGoalState.DELETED;
 +    switch (tableState) {
 +      case DELETING:
 +        return TabletGoalState.DELETED;
 +      case OFFLINE:
 +      case NEW:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        return TabletGoalState.HOSTED;
 +    }
 +  }
 +
 +  TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
 +    KeyExtent extent = tls.extent;
 +    // Shutting down?
 +    TabletGoalState state = getSystemGoalState(tls);
 +    if (state == TabletGoalState.HOSTED) {
 +      if (tls.current != null && serversToShutdown.contains(tls.current)) {
 +        return TabletGoalState.UNASSIGNED;
 +      }
 +      // Handle merge transitions
 +      if (mergeInfo.getExtent() != null) {
 +        log.debug("mergeInfo overlaps: " + extent + " " + 
mergeInfo.overlaps(extent));
 +        if (mergeInfo.overlaps(extent)) {
 +          switch (mergeInfo.getState()) {
 +            case NONE:
 +            case COMPLETE:
 +              break;
 +            case STARTED:
 +            case SPLITTING:
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_CHOPPED:
 +              if 
(tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) {
 +                if (tls.chopped)
 +                  return TabletGoalState.UNASSIGNED;
 +              } else {
 +                if (tls.chopped && tls.walogs.isEmpty())
 +                  return TabletGoalState.UNASSIGNED;
 +              }
 +
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_OFFLINE:
 +            case MERGING:
 +              return TabletGoalState.UNASSIGNED;
 +          }
 +        }
 +      }
 +
 +      // taking table offline?
 +      state = getTableGoalState(extent);
 +      if (state == TabletGoalState.HOSTED) {
 +        // Maybe this tablet needs to be migrated
 +        TServerInstance dest = migrations.get(extent);
 +        if (dest != null && tls.current != null && !dest.equals(tls.current)) 
{
 +          return TabletGoalState.UNASSIGNED;
 +        }
 +      }
 +    }
 +    return state;
 +  }
 +
 +  private class MigrationCleanupThread extends Daemon {
 +
 +    @Override
 +    public void run() {
 +      setName("Migration Cleanup Thread");
 +      while (stillMaster()) {
 +        if (!migrations.isEmpty()) {
 +          try {
 +            cleanupOfflineMigrations();
 +            cleanupNonexistentMigrations(getConnector());
 +          } catch (Exception ex) {
 +            log.error("Error cleaning up migrations", ex);
 +          }
 +        }
 +        UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS);
 +      }
 +    }
 +
 +    /**
 +     * If a migrating tablet splits, and the tablet dies before sending the 
master a message, the migration will refer to a non-existing tablet, so it can 
never
 +     * complete. Periodically scan the metadata table and remove any 
migrating tablets that no longer exist.
 +     */
 +    private void cleanupNonexistentMigrations(final Connector connector) 
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +      Scanner scanner = connector.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
 +      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      Set<KeyExtent> found = new HashSet<KeyExtent>();
 +      for (Entry<Key,Value> entry : scanner) {
 +        KeyExtent extent = new KeyExtent(entry.getKey().getRow(), 
entry.getValue());
 +        if (migrations.containsKey(extent)) {
 +          found.add(extent);
 +        }
 +      }
 +      migrations.keySet().retainAll(found);
 +    }
 +
 +    /**
 +     * If migrating a tablet for a table that is offline, the migration can 
never succeed because no tablet server will load the tablet. check for offline
 +     * tables and remove their migrations.
 +     */
 +    private void cleanupOfflineMigrations() {
 +      TableManager manager = TableManager.getInstance();
 +      for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +        TableState state = manager.getTableState(tableId);
 +        if (TableState.OFFLINE == state) {
 +          clearMigrations(tableId);
 +        }
 +      }
 +    }
 +  }
 +
 +  private class StatusThread extends Daemon {
 +
 +    @Override
 +    public void run() {
 +      setName("Status Thread");
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      while (stillMaster()) {
 +        long wait = DEFAULT_WAIT_FOR_WATCHER;
 +        try {
 +          switch (getMasterGoalState()) {
 +            case NORMAL:
 +              setMasterState(MasterState.NORMAL);
 +              break;
 +            case SAFE_MODE:
 +              if (getMasterState() == MasterState.NORMAL) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              if (getMasterState() == MasterState.HAVE_LOCK) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              break;
 +            case CLEAN_STOP:
 +              switch (getMasterState()) {
 +                case NORMAL:
 +                  setMasterState(MasterState.SAFE_MODE);
 +                  break;
 +                case SAFE_MODE: {
 +                  int count = nonMetaDataTabletsAssignedOrHosted();
 +                  log.debug(String.format("There are %d non-metadata tablets 
assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +                }
 +                  break;
 +                case UNLOAD_METADATA_TABLETS: {
 +                  int count = assignedOrHosted(METADATA_TABLE_ID);
 +                  log.debug(String.format("There are %d metadata tablets 
assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                }
 +                  break;
 +                case UNLOAD_ROOT_TABLET: {
 +                  int count = assignedOrHosted(METADATA_TABLE_ID);
 +                  if (count > 0) {
 +                    log.debug(String.format("%d metadata tablets online", 
count));
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                  }
 +                  int root_count = assignedOrHosted(ROOT_TABLE_ID);
 +                  if (root_count > 0)
 +                    log.debug("The root tablet is still assigned or hosted");
 +                  if (count + root_count == 0) {
 +                    Set<TServerInstance> currentServers = 
tserverSet.getCurrentServers();
 +                    log.debug("stopping " + currentServers.size() + " tablet 
servers");
 +                    for (TServerInstance server : currentServers) {
 +                      try {
 +                        serversToShutdown.add(server);
 +                        tserverSet.getConnection(server).fastHalt(masterLock);
 +                      } catch (TException e) {
 +                        // its probably down, and we don't care
 +                      } finally {
 +                        tserverSet.remove(server);
 +                      }
 +                    }
 +                    if (currentServers.size() == 0)
 +                      setMasterState(MasterState.STOP);
 +                  }
 +                }
 +                  break;
 +                default:
 +                  break;
 +              }
 +          }
 +          wait = updateStatus();
 +          eventListener.waitForEvents(wait);
 +        } catch (Throwable t) {
 +          log.error("Error balancing tablets", t);
 +          UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS);
 +        }
 +      }
 +    }
 +
 +    private long updateStatus() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException {
 +      tserverStatus = 
Collections.synchronizedSortedMap(gatherTableInformation());
 +      checkForHeldServer(tserverStatus);
 +
 +      if (!badServers.isEmpty()) {
 +        log.debug("not balancing because the balance information is 
out-of-date " + badServers.keySet());
 +      } else if (notHosted() > 0) {
 +        log.debug("not balancing because there are unhosted tablets: " + 
notHosted());
 +      } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
 +        log.debug("not balancing because the master is attempting to stop 
cleanly");
 +      } else if (!serversToShutdown.isEmpty()) {
 +        log.debug("not balancing while shutting down servers " + 
serversToShutdown);
 +      } else {
 +        return balanceTablets();
 +      }
 +      return DEFAULT_WAIT_FOR_WATCHER;
 +    }
 +
 +    private void 
checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) 
{
 +      TServerInstance instance = null;
 +      int crazyHoldTime = 0;
 +      int someHoldTime = 0;
 +      final long maxWait = 
getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);
 +      for (Entry<TServerInstance,TabletServerStatus> entry : 
tserverStatus.entrySet()) {
 +        if (entry.getValue().getHoldTime() > 0) {
 +          someHoldTime++;
 +          if (entry.getValue().getHoldTime() > maxWait) {
 +            instance = entry.getKey();
 +            crazyHoldTime++;
 +          }
 +        }
 +      }
 +      if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 
1) {
 +        log.warn("Tablet server " + instance + " exceeded maximum hold time: 
attempting to kill it");
 +        try {
 +          TServerConnection connection = tserverSet.getConnection(instance);
 +          if (connection != null)
 +            connection.fastHalt(masterLock);
 +        } catch (TException e) {
 +          log.error(e, e);
 +        }
 +        tserverSet.remove(instance);
 +      }
 +    }
 +
 +    private long balanceTablets() {
 +      List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
 +      Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>();
 +      synchronized (migrations) {
 +        migrationsCopy.addAll(migrations.keySet());
 +      }
 +      long wait = 
tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), 
Collections.unmodifiableSet(migrationsCopy), migrationsOut);
 +
 +      for (TabletMigration m : 
TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) {
 +        if (migrations.containsKey(m.tablet)) {
 +          log.warn("balancer requested migration more than once, skipping " + 
m);
 +          continue;
 +        }
 +        migrations.put(m.tablet, m.newServer);
 +        log.debug("migration " + m);
 +      }
 +      if (migrationsOut.size() > 0) {
 +        nextEvent.event("Migrating %d more tablets, %d total", 
migrationsOut.size(), migrations.size());
 +      }
 +      return wait;
 +    }
 +
 +  }
 +
 +  private SortedMap<TServerInstance,TabletServerStatus> 
gatherTableInformation() {
 +    long start = System.currentTimeMillis();
 +    SortedMap<TServerInstance,TabletServerStatus> result = new 
TreeMap<TServerInstance,TabletServerStatus>();
 +    Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +    for (TServerInstance server : currentServers) {
 +      try {
 +        Thread t = Thread.currentThread();
 +        String oldName = t.getName();
 +        try {
 +          t.setName("Getting status from " + server);
 +          TServerConnection connection = tserverSet.getConnection(server);
 +          if (connection == null)
 +            throw new IOException("No connection to " + server);
 +          TabletServerStatus status = connection.getTableMap(false);
 +          result.put(server, status);
 +        } finally {
 +          t.setName(oldName);
 +        }
 +      } catch (Exception ex) {
 +        log.error("unable to get tablet server status " + server + " " + 
ex.toString());
 +        log.debug("unable to get tablet server status " + server, ex);
 +        if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
 +          log.warn("attempting to stop " + server);
 +          try {
 +            TServerConnection connection = tserverSet.getConnection(server);
 +            if (connection != null)
 +              connection.halt(masterLock);
 +          } catch (TTransportException e) {
 +            // ignore: it's probably down
 +          } catch (Exception e) {
 +            log.info("error talking to troublesome tablet server ", e);
 +          }
 +          badServers.remove(server);
 +          tserverSet.remove(server);
 +        }
 +      }
 +    }
 +    synchronized (badServers) {
 +      badServers.keySet().retainAll(currentServers);
 +      badServers.keySet().removeAll(result.keySet());
 +    }
 +    log.debug(String.format("Finished gathering information from %d servers 
in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.));
 +    return result;
 +  }
 +
 +  public void run() throws IOException, InterruptedException, KeeperException 
{
 +    final String zroot = ZooUtil.getRoot(instance);
 +
 +    getMasterLock(zroot + Constants.ZMASTER_LOCK);
 +
 +    recoveryManager = new RecoveryManager(this);
 +
 +    TableManager.getInstance().addObserver(this);
 +
 +    StatusThread statusThread = new StatusThread();
 +    statusThread.start();
 +
 +    MigrationCleanupThread migrationCleanupThread = new 
MigrationCleanupThread();
 +    migrationCleanupThread.start();
 +
 +    tserverSet.startListeningForTabletServerChanges();
 +
 +    ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, 
new Watcher() {
 +      @Override
 +      public void process(WatchedEvent event) {
 +        nextEvent.event("Noticed recovery changes", event.getType());
 +        try {
 +          // watcher only fires once, add it back
 +          ZooReaderWriter.getInstance().getChildren(zroot + 
Constants.ZRECOVERY, this);
 +        } catch (Exception e) {
 +          log.error("Failed to add log recovery watcher back", e);
 +        }
 +      }
 +    });
 +
 +    Credentials systemCreds = SystemCredentials.get();
 +    watchers.add(new TabletGroupWatcher(this, new 
MetaDataStateStore(instance, systemCreds, this), null));
 +    watchers.add(new TabletGroupWatcher(this, new 
RootTabletStateStore(instance, systemCreds, this), watchers.get(0)));
 +    watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new 
ZooStore(zroot)), watchers.get(1)));
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.start();
 +    }
 +
 +    // Once we are sure the upgrade is complete, we can safely allow fate use.
 +    waitForMetadataUpgrade.await();
 +
 +    try {
 +      final AgeOffStore<Master> store = new AgeOffStore<Master>(new 
org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance) + 
Constants.ZFATE,
 +          ZooReaderWriter.getInstance()), 1000 * 60 * 60 * 8);
 +
 +      int threads = 
this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
 +
 +      fate = new Fate<Master>(this, store, threads);
 +
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +
 +        @Override
 +        public void run() {
 +          store.ageOff();
 +        }
 +      }, 63000, 63000);
 +    } catch (KeeperException e) {
 +      throw new IOException(e);
 +    } catch (InterruptedException e) {
 +      throw new IOException(e);
 +    }
 +
 +    Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new 
MasterClientServiceHandler(this)));
 +    ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), 
hostname, Property.MASTER_CLIENTPORT, processor, "Master",
 +        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, 
Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
 +    clientService = sa.server;
 +    String address = sa.address.toString();
 +    log.info("Setting master lock data to " + address);
 +    masterLock.replaceLockData(address.getBytes());
 +
 +    while (!clientService.isServing()) {
 +      UtilWaitThread.sleep(100);
 +    }
 +    while (clientService.isServing()) {
 +      UtilWaitThread.sleep(500);
 +    }
 +    log.info("Shutting down fate.");
 +    fate.shutdown();
 +
 +    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
 +    statusThread.join(remaining(deadline));
 +
 +    // quit, even if the tablet servers somehow jam up and the watchers
 +    // don't stop
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.join(remaining(deadline));
 +    }
 +    log.info("exiting");
 +  }
 +
 +  private long remaining(long deadline) {
 +    return Math.max(1, deadline - System.currentTimeMillis());
 +  }
 +
 +  public ZooLock getMasterLock() {
 +    return masterLock;
 +  }
 +
 +  private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
 +
 +    boolean acquiredLock = false;
 +    boolean failedToAcquireLock = false;
 +
 +    @Override
 +    public void lostLock(LockLossReason reason) {
 +      Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
 +    }
 +
 +    @Override
 +    public void unableToMonitorLockNode(final Throwable e) {
 +      Halt.halt(-1, new Runnable() {
 +        @Override
 +        public void run() {
 +          log.fatal("No longer able to monitor master lock node", e);
 +        }
 +      });
 +
 +    }
 +
 +    @Override
 +    public synchronized void acquiredLock() {
 +      log.debug("Acquired master lock");
 +
 +      if (acquiredLock || failedToAcquireLock) {
 +        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
 +      }
 +
 +      acquiredLock = true;
 +      notifyAll();
 +    }
 +
 +    @Override
 +    public synchronized void failedToAcquireLock(Exception e) {
 +      log.warn("Failed to get master lock " + e);
 +
 +      if (acquiredLock) {
 +        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + 
failedToAcquireLock, -1);
 +      }
 +
 +      failedToAcquireLock = true;
 +      notifyAll();
 +    }
 +
 +    public synchronized void waitForChange() {
 +      while (!acquiredLock && !failedToAcquireLock) {
 +        try {
 +          wait();
 +        } catch (InterruptedException e) {}
 +      }
 +    }
 +  }
 +
 +  private void getMasterLock(final String zMasterLoc) throws KeeperException, 
InterruptedException {
 +    log.info("trying to get master lock");
 +
 +    final String masterClientAddress = hostname + ":" + 
getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
 +
 +    while (true) {
 +
 +      MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
 +      masterLock = new ZooLock(zMasterLoc);
 +      masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
 +
 +      masterLockWatcher.waitForChange();
 +
 +      if (masterLockWatcher.acquiredLock) {
 +        break;
 +      }
 +
 +      if (!masterLockWatcher.failedToAcquireLock) {
 +        throw new IllegalStateException("master lock in unknown state");
 +      }
 +
 +      masterLock.tryToCancelAsyncLockOrUnlock();
 +
 +      UtilWaitThread.sleep(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS);
 +    }
 +
 +    setMasterState(MasterState.HAVE_LOCK);
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    try {
 +      SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +
 +      ServerOpts opts = new ServerOpts();
 +      final String app = "master";
 +      opts.parseArgs(app, args);
 +      String hostname = opts.getAddress();
 +      Accumulo.setupLogging(app);
 +      Instance instance = HdfsZooInstance.getInstance();
 +      ServerConfiguration conf = new ServerConfiguration(instance);
 +      VolumeManager fs = VolumeManagerImpl.get();
 +      Accumulo.init(fs, conf, app);
 +      Master master = new Master(conf, fs, hostname);
 +      Accumulo.enableTracing(hostname, app);
 +      master.run();
 +    } catch (Exception ex) {
 +      log.error("Unexpected exception, exiting", ex);
 +      System.exit(1);
 +    }
 +  }
 +
 +  @Override
 +  public void update(LiveTServerSet current, Set<TServerInstance> deleted, 
Set<TServerInstance> added) {
 +    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + 
Constants.ZDEADTSERVERS);
 +    if (added.size() > 0) {
 +      log.info("New servers: " + added);
 +      for (TServerInstance up : added)
 +        obit.delete(up.hostPort());
 +    }
 +    for (TServerInstance dead : deleted) {
 +      String cause = "unexpected failure";
 +      if (serversToShutdown.contains(dead))
 +        cause = "clean shutdown"; // maybe an incorrect assumption
 +      if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP))
 +        obit.post(dead.hostPort(), cause);
 +    }
 +
 +    Set<TServerInstance> unexpected = new HashSet<TServerInstance>(deleted);
 +    unexpected.removeAll(this.serversToShutdown);
 +    if (unexpected.size() > 0) {
 +      if (stillMaster() && 
!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) {
 +        log.warn("Lost servers " + unexpected);
 +      }
 +    }
 +    serversToShutdown.removeAll(deleted);
 +    badServers.keySet().removeAll(deleted);
 +    // clear out any bad server with the same host/port as a new server
 +    synchronized (badServers) {
 +      cleanListByHostAndPort(badServers.keySet(), deleted, added);
 +    }
 +    synchronized (serversToShutdown) {
 +      cleanListByHostAndPort(serversToShutdown, deleted, added);
 +    }
 +
 +    synchronized (migrations) {
 +      Iterator<Entry<KeyExtent,TServerInstance>> iter = 
migrations.entrySet().iterator();
 +      while (iter.hasNext()) {
 +        Entry<KeyExtent,TServerInstance> entry = iter.next();
 +        if (deleted.contains(entry.getValue())) {
 +          log.info("Canceling migration of " + entry.getKey() + " to " + 
entry.getValue());
 +          iter.remove();
 +        }
 +      }
 +    }
 +    nextEvent.event("There are now %d tablet servers", current.size());
 +  }
 +
 +  private static void cleanListByHostAndPort(Collection<TServerInstance> 
badServers, Set<TServerInstance> deleted, Set<TServerInstance> added) {
 +    Iterator<TServerInstance> badIter = badServers.iterator();
 +    while (badIter.hasNext()) {
 +      TServerInstance bad = badIter.next();
 +      for (TServerInstance add : added) {
 +        if (bad.hostPort().equals(add.hostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +      for (TServerInstance del : deleted) {
 +        if (bad.hostPort().equals(del.hostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void stateChanged(String tableId, TableState state) {
 +    nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, 
state);
 +    if (TableState.OFFLINE == state) {
 +      clearMigrations(tableId);
 +    }
 +  }
 +
 +  @Override
 +  public void initialize(Map<String,TableState> tableIdToStateMap) {}
 +
 +  @Override
 +  public void sessionExpired() {}
 +
 +  @Override
 +  public Set<String> onlineTables() {
 +    Set<String> result = new HashSet<String>();
 +    if (getMasterState() != MasterState.NORMAL) {
 +      if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)
 +        result.add(MetadataTable.ID);
 +      if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET)
 +        result.add(RootTable.ID);
 +      return result;
 +    }
 +    TableManager manager = TableManager.getInstance();
 +
 +    for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +      TableState state = manager.getTableState(tableId);
 +      if (state != null) {
 +        if (state == TableState.ONLINE)
 +          result.add(tableId);
 +      }
 +    }
 +    return result;
 +  }
 +
 +  @Override
 +  public Set<TServerInstance> onlineTabletServers() {
 +    return tserverSet.getCurrentServers();
 +  }
 +
 +  @Override
 +  public Collection<MergeInfo> merges() {
 +    List<MergeInfo> result = new ArrayList<MergeInfo>();
 +    for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +      result.add(getMergeInfo(new Text(tableId)));
 +    }
 +    return result;
 +  }
 +
 +  // recovers state from the persistent transaction to shutdown a server
 +  public void shutdownTServer(TServerInstance server) {
 +    nextEvent.event("Tablet Server shutdown requested for %s", server);
 +    serversToShutdown.add(server);
 +  }
 +
 +  public EventCoordinator getEventCoordinator() {
 +    return nextEvent;
 +  }
 +
 +  public Instance getInstance() {
 +    return this.instance;
 +  }
 +
 +  public AccumuloConfiguration getSystemConfiguration() {
 +    return serverConfig.getConfiguration();
 +  }
 +
 +  public ServerConfiguration getConfiguration() {
 +    return serverConfig;
 +  }
 +
 +  public VolumeManager getFileSystem() {
 +    return this.fs;
 +  }
 +
 +  public void assignedTablet(KeyExtent extent) {
 +    if (extent.isMeta()) {
 +      if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) {
 +        setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +      }
 +    }
 +    if (extent.isRootTablet()) {
 +      // probably too late, but try anyhow
 +      if (getMasterState().equals(MasterState.STOP)) {
 +        setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index 0c2e1f0,0000000..5e478cb
mode 100644,000000..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@@ -1,190 -1,0 +1,192 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.recovery;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.util.NamingThreadFactory;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.fs.VolumeUtil;
 +import org.apache.accumulo.server.log.SortedLogState;
 +import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
 +import org.apache.accumulo.server.master.recovery.LogCloser;
 +import org.apache.accumulo.server.master.recovery.RecoveryPath;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +
 +public class RecoveryManager {
 +
 +  private static Logger log = Logger.getLogger(RecoveryManager.class);
 +
 +  private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
 +  private Set<String> closeTasksQueued = new HashSet<String>();
 +  private Set<String> sortsQueued = new HashSet<String>();
 +  private ScheduledExecutorService executor;
 +  private Master master;
 +  private ZooCache zooCache;
 +
 +  public RecoveryManager(Master master) {
 +    this.master = master;
 +    executor = Executors.newScheduledThreadPool(4, new 
NamingThreadFactory("Walog sort starter "));
 +    zooCache = new ZooCache();
 +    try {
 +      List<String> workIDs = new 
DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY).getWorkQueued();
 +      sortsQueued.addAll(workIDs);
 +    } catch (Exception e) {
 +      log.warn(e, e);
 +    }
 +  }
 +
 +  private class LogSortTask implements Runnable {
 +    private String source;
 +    private String destination;
 +    private String sortId;
 +    private LogCloser closer;
 +
 +    public LogSortTask(LogCloser closer, String source, String destination, 
String sortId) {
 +      this.closer = closer;
 +      this.source = source;
 +      this.destination = destination;
 +      this.sortId = sortId;
 +    }
 +
 +    @Override
 +    public void run() {
 +      boolean rescheduled = false;
 +      try {
 +
 +        long time = 
closer.close(master.getConfiguration().getConfiguration(), 
master.getFileSystem(), new Path(source));
 +
 +        if (time > 0) {
 +          executor.schedule(this, time, TimeUnit.MILLISECONDS);
 +          rescheduled = true;
 +        } else {
 +          initiateSort(sortId, source, destination);
 +        }
 +      } catch (FileNotFoundException e) {
 +        log.debug("Unable to initate log sort for " + source + ": " + e);
 +      } catch (Exception e) {
 +        log.warn("Failed to initiate log sort " + source, e);
 +      } finally {
 +        if (!rescheduled) {
 +          synchronized (RecoveryManager.this) {
 +            closeTasksQueued.remove(sortId);
 +          }
 +        }
 +      }
 +    }
 +
 +  }
 +
 +  private void initiateSort(String sortId, String source, final String 
destination) throws KeeperException, InterruptedException, IOException {
 +    String work = source + "|" + destination;
-     new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY).addWork(sortId, work.getBytes(Constants.UTF8));
++    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY).addWork(sortId, work.getBytes(UTF_8));
 +
 +    synchronized (this) {
 +      sortsQueued.add(sortId);
 +    }
 +
 +    final String path = ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY + "/" + sortId;
 +    log.info("Created zookeeper entry " + path + " with data " + work);
 +  }
 +
 +  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> 
walogs) throws IOException {
 +    boolean recoveryNeeded = false;
 +
 +    for (Collection<String> logs : walogs) {
 +      for (String walog : logs) {
 +
 +        String switchedWalog = VolumeUtil.switchVolume(walog, FileType.WAL, 
ServerConstants.getVolumeReplacements());
 +        if (switchedWalog != null) {
 +          // replaces the volume used for sorting, but do not change entry in 
metadata table. When the tablet loads it will change the metadata table entry. 
If
 +          // the tablet has the same replacement config, then it will find 
the sorted log.
 +          log.info("Volume replaced " + walog + " -> " + switchedWalog);
 +          walog = switchedWalog;
 +        }
 +
 +        String parts[] = walog.split("/");
 +        String sortId = parts[parts.length - 1];
 +        String filename = master.getFileSystem().getFullPath(FileType.WAL, 
walog).toString();
 +        String dest = RecoveryPath.getRecoveryPath(master.getFileSystem(), 
new Path(filename)).toString();
 +        log.debug("Recovering " + filename + " to " + dest);
 +
 +        boolean sortQueued;
 +        synchronized (this) {
 +          sortQueued = sortsQueued.contains(sortId);
 +        }
 +
 +        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) 
+ Constants.ZRECOVERY + "/" + sortId) == null) {
 +          synchronized (this) {
 +            sortsQueued.remove(sortId);
 +          }
 +        }
 +
 +        if 
(master.getFileSystem().exists(SortedLogState.getFinishedMarkerPath(dest))) {
 +          synchronized (this) {
 +            closeTasksQueued.remove(sortId);
 +            recoveryDelay.remove(sortId);
 +            sortsQueued.remove(sortId);
 +          }
 +          continue;
 +        }
 +
 +        recoveryNeeded = true;
 +        synchronized (this) {
 +          if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 +            AccumuloConfiguration aconf = 
master.getConfiguration().getConfiguration();
 +            LogCloser closer = 
aconf.instantiateClassProperty(Property.MASTER_WALOG_CLOSER_IMPLEMETATION, 
LogCloser.class, new HadoopLogCloser());
 +            Long delay = recoveryDelay.get(sortId);
 +            if (delay == null) {
 +              delay = 
master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
 +            } else {
 +              delay = Math.min(2 * delay, 1000 * 60 * 5l);
 +            }
 +
 +            log.info("Starting recovery of " + filename + " (in : " + (delay 
/ 1000) + "s), tablet " + extent + " holds a reference");
 +
 +            executor.schedule(new LogSortTask(closer, filename, dest, 
sortId), delay, TimeUnit.MILLISECONDS);
 +            closeTasksQueued.add(sortId);
 +            recoveryDelay.put(sortId, delay);
 +          }
 +        }
 +      }
 +    }
 +    return recoveryNeeded;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
index f981bae,0000000..a674557
mode 100644,000000..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java
@@@ -1,49 -1,0 +1,51 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.state;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +
 +public class SetGoalState {
 +  
 +  /**
 +   * Utility program that will change the goal state for the master from the 
command line.
 +   */
 +  public static void main(String[] args) throws Exception {
 +    if (args.length != 1 || MasterGoalState.valueOf(args[0]) == null) {
 +      System.err.println("Usage: accumulo " + SetGoalState.class.getName() + 
" [NORMAL|SAFE_MODE|CLEAN_STOP]");
 +      System.exit(-1);
 +    }
 +    SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.waitForZookeeperAndHdfs(fs);
-     
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance())
 + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(Constants.UTF8),
++    
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance())
 + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(UTF_8),
 +        NodeExistsPolicy.OVERWRITE);
 +  }
 +  
 +}

Reply via email to