ACCUMULO-3135 Modify server-side table-operations to throw ThriftTableOperationException when table doesn't exist
The server-side implementations of the table operations typically follow the pattern of: accept table name, get table id, check permission, run table operation. Fetching the table id does a (trusted) check of whether or not the table that was requested to operate upon actually exists or not (we don't want to blindly accept table IDs from users in most cases). However, there is a race condition in which a table may be deleted after we fetch the table ID and before we can check the permissions for the user on said table. SecurityOperation only throws ThriftSecurityExceptions. While this makes sense in the context of the SecurityOperation class, we have to translate a ThriftSecurityException for a nonexistent table into a ThriftTableOperationException so that the client implementation will throw a TableNotFoundException instead of an AccumuloSecurityException. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2579d518 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2579d518 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2579d518 Branch: refs/heads/1.6.2-SNAPSHOT Commit: 2579d518bbd22def04bef9b145755312f4ca565f Parents: 7983b2f Author: Josh Elser <[email protected]> Authored: Wed Sep 24 13:58:09 2014 -0400 Committer: Josh Elser <[email protected]> Committed: Wed Sep 24 13:58:09 2014 -0400 ---------------------------------------------------------------------- .../apache/accumulo/server/master/Master.java | 663 +++++++++++-------- 1 file changed, 390 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2579d518/server/src/main/java/org/apache/accumulo/server/master/Master.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java index 12f8fed..5465ac8 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/Master.java +++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java @@ -176,13 +176,13 @@ import org.apache.zookeeper.data.Stat; /** * The Master is responsible for assigning and balancing tablets to tablet servers. - * + * * The master will also coordinate log recoveries and reports general status. */ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState { - + final private static Logger log = Logger.getLogger(Master.class); - + final private static int ONE_SECOND = 1000; final private static Text METADATA_TABLE_ID = new Text(Constants.METADATA_TABLE_ID); final private static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND; @@ -193,7 +193,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = 1000; final private static int MAX_TSERVER_WORK_CHUNK = 5000; final private static int MAX_BAD_STATUS_COUNT = 3; - + final private FileSystem fs; final private Instance instance; final private String hostname; @@ -206,28 +206,28 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt final private EventCoordinator nextEvent = new EventCoordinator(); final private Object mergeLock = new Object(); private RecoveryManager recoveryManager = null; - + private ZooLock masterLock = null; private TServer clientService = null; private TabletBalancer tabletBalancer; - + private MasterState state = MasterState.INITIAL; - + private Fate<Master> fate; - + volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections .unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>()); - + private final Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>()); - + synchronized private MasterState getMasterState() { return state; } - + public boolean stillMaster() { return getMasterState() != MasterState.STOP; } - + static final boolean X = true; static final boolean _ = false; // @formatter:off @@ -260,14 +260,14 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt clientService.stop(); Master.this.nextEvent.event("stopped event loop"); } - + }, 100l, 1000l); } - + if (oldState != newState && (newState == MasterState.HAVE_LOCK)) { upgradeZookeeper(); } - + if (oldState != newState && (newState == MasterState.NORMAL)) { upgradeMetadata(); } @@ -315,7 +315,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt private final CountDownLatch waitForMetadataUpgrade = new CountDownLatch(1); private final ServerConfiguration serverConfig; - + private void upgradeMetadata() { // we make sure we're only doing the rest of this method once so that we can signal to other threads that an upgrade wasn't needed. if (upgradeMetadataRunning.compareAndSet(false, true)) { @@ -363,7 +363,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + private int totalAssignedOrHosted() { int result = 0; for (TabletGroupWatcher watcher : watchers) { @@ -373,11 +373,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + private int nonMetaDataTabletsAssignedOrHosted() { return totalAssignedOrHosted() - assignedOrHosted(new Text(Constants.METADATA_TABLE_ID)); } - + private int notHosted() { int result = 0; for (TabletGroupWatcher watcher : watchers) { @@ -387,7 +387,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + // The number of unassigned tablets that should be assigned: displayed on the monitor page private int displayUnassigned() { int result = 0; @@ -424,7 +424,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + private void checkNotMetadataTable(String tableName, TableOperation operation) throws ThriftTableOperationException { if (tableName.compareTo(Constants.METADATA_TABLE_NAME) == 0) { String why = "Table names cannot be == " + Constants.METADATA_TABLE_NAME; @@ -432,7 +432,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.OTHER, why); } } - + private void checkTableName(String tableName, TableOperation operation) throws ThriftTableOperationException { if (!tableName.matches(Constants.VALID_TABLE_NAME_REGEX)) { String why = "Table names must only contain word characters (letters, digits, and underscores): " + tableName; @@ -443,28 +443,28 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt String why = "Table name already exists: " + tableName; throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.EXISTS, why); } - + } - + public void mustBeOnline(final String tableId) throws ThriftTableOperationException { Tables.clearCache(instance); if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE)) throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online"); } - + public Connector getConnector() throws AccumuloException, AccumuloSecurityException { return instance.getConnector(SecurityConstants.SYSTEM_PRINCIPAL, SecurityConstants.getSystemToken()); } - + private void waitAround(EventCoordinator.Listener listener) { listener.waitForEvents(ONE_SECOND); } - + // TODO: maybe move this to Property? We do this in TabletServer, Master, TableLoadBalancer, etc. - ACCUMULO-1295 public static <T> T createInstanceFromPropertyName(AccumuloConfiguration conf, Property property, Class<T> base, T defaultInstance) { String clazzName = conf.get(property); T instance = null; - + try { Class<? extends T> clazz = AccumuloVFSClassLoader.loadClass(clazzName, base); instance = clazz.newInstance(); @@ -472,22 +472,22 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } catch (Exception e) { log.warn("Failed to load class ", e); } - + if (instance == null) { log.info("Using " + defaultInstance.getClass().getName()); instance = defaultInstance; } return instance; } - + public Master(ServerConfiguration config, FileSystem fs, String hostname) throws IOException { this.serverConfig = config; this.instance = config.getInstance(); this.fs = TraceFileSystem.wrap(fs); this.hostname = hostname; - + AccumuloConfiguration aconf = serverConfig.getConfiguration(); - + log.info("Version " + Constants.VERSION); log.info("Instance " + instance.getInstanceID()); ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); @@ -496,7 +496,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt this.tabletBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); this.tabletBalancer.init(serverConfig); } - + public TServerConnection getConnection(TServerInstance server) { try { return tserverSet.getConnection(server); @@ -504,23 +504,23 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt return null; } } - + private class MasterClientServiceHandler implements MasterClientService.Iface { - + protected String checkTableId(String tableName, TableOperation operation) throws ThriftTableOperationException { final String tableId = Tables.getNameToIdMap(getConfiguration().getInstance()).get(tableName); if (tableId == null) throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.NOTFOUND, null); return tableId; } - + @Override public long initiateFlush(TInfo tinfo, TCredentials c, String tableId) throws ThriftSecurityException, ThriftTableOperationException, TException { security.canFlush(c, tableId); - + String zTablePath = Constants.ZROOT + "/" + getConfiguration().getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_FLUSH_ID; - + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); byte fid[]; try { @@ -540,20 +540,20 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return Long.parseLong(new String(fid, Constants.UTF8)); } - + @Override public void waitForFlush(TInfo tinfo, TCredentials c, String tableId, ByteBuffer startRow, ByteBuffer endRow, long flushID, long maxLoops) throws ThriftSecurityException, ThriftTableOperationException, TException { security.canFlush(c, tableId); - + if (endRow != null && startRow != null && ByteBufferUtil.toText(startRow).compareTo(ByteBufferUtil.toText(endRow)) >= 0) throw new ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.BAD_RANGE, "start row must be less than end row"); - + Set<TServerInstance> serversToFlush = new HashSet<TServerInstance>(tserverSet.getCurrentServers()); - + for (long l = 0; l < maxLoops; l++) { - + for (TServerInstance instance : serversToFlush) { try { final TServerConnection server = tserverSet.getConnection(instance); @@ -563,14 +563,14 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt log.error(ex.toString()); } } - + if (l == maxLoops - 1) break; - + UtilWaitThread.sleep(50); - + serversToFlush.clear(); - + try { Connector conn = getConnector(); Scanner scanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS)); @@ -579,63 +579,63 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY); scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange()); - + RowIterator ri = new RowIterator(scanner); - + int tabletsToWaitFor = 0; int tabletCount = 0; - + Text ert = ByteBufferUtil.toText(endRow); - + while (ri.hasNext()) { Iterator<Entry<Key,Value>> row = ri.next(); long tabletFlushID = -1; int logs = 0; boolean online = false; - + TServerInstance server = null; - + Entry<Key,Value> entry = null; while (row.hasNext()) { entry = row.next(); Key key = entry.getKey(); - + if (Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) { tabletFlushID = Long.parseLong(entry.getValue().toString()); } - + if (Constants.METADATA_LOG_COLUMN_FAMILY.equals(key.getColumnFamily())) logs++; - + if (Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.equals(key.getColumnFamily())) { online = true; server = new TServerInstance(entry.getValue(), key.getColumnQualifier()); } - + } - + // when tablet is not online and has no logs, there is no reason to wait for it if ((online || logs > 0) && tabletFlushID < flushID) { tabletsToWaitFor++; if (server != null) serversToFlush.add(server); } - + tabletCount++; - + Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow(); if (tabletEndRow == null || (ert != null && tabletEndRow.compareTo(ert) >= 0)) break; } - + if (tabletsToWaitFor == 0) break; - + // TODO detect case of table offline AND tablets w/ logs? - ACCUMULO-1296 - + if (tabletCount == 0 && !Tables.exists(instance, tableId)) throw new ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.NOTFOUND, null); - + } catch (AccumuloException e) { log.debug("Failed to scan !METADATA table to wait for flush " + tableId, e); } catch (TabletDeletedException tde) { @@ -648,13 +648,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new ThriftTableOperationException(); } } - + } - + @Override public MasterMonitorInfo getMasterStats(TInfo info, TCredentials credentials) throws ThriftSecurityException, TException { final MasterMonitorInfo result = new MasterMonitorInfo(); - + result.tServerInfo = new ArrayList<TabletServerStatus>(); result.tableMap = new DefaultMap<String,TableInfo>(new TableInfo()); for (Entry<TServerInstance,TabletServerStatus> serverEntry : tserverStatus.entrySet()) { @@ -684,13 +684,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt result.deadTabletServers = obit.getList(); return result; } - + private void alterTableProperty(TCredentials c, String tableName, String property, String value, TableOperation op) throws ThriftSecurityException, ThriftTableOperationException { final String tableId = checkTableId(tableName, op); if (!security.canAlterTable(c, tableId)) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + try { if (value == null || value.isEmpty()) { TablePropUtil.removeTableProperty(tableId, property); @@ -707,29 +707,29 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new ThriftTableOperationException(tableId, tableName, op, TableOperationExceptionType.OTHER, "Problem altering table property"); } } - + @Override public void removeTableProperty(TInfo info, TCredentials credentials, String tableName, String property) throws ThriftSecurityException, ThriftTableOperationException, TException { alterTableProperty(credentials, tableName, property, null, TableOperation.REMOVE_PROPERTY); } - + @Override public void setTableProperty(TInfo info, TCredentials credentials, String tableName, String property, String value) throws ThriftSecurityException, ThriftTableOperationException, TException { alterTableProperty(credentials, tableName, property, value, TableOperation.SET_PROPERTY); } - + @Override public void shutdown(TInfo info, TCredentials c, boolean stopTabletServers) throws ThriftSecurityException, TException { security.canPerformSystemActions(c); Master.this.shutdown(stopTabletServers); } - + @Override public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer, boolean force) throws ThriftSecurityException, TException { security.canPerformSystemActions(c); - + final InetSocketAddress addr = AddressUtil.parseAddress(tabletServer, Property.TSERV_CLIENTPORT); final String addrString = org.apache.accumulo.core.util.AddressUtil.toString(addr); final TServerInstance doomed = tserverSet.find(addrString); @@ -740,13 +740,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt return; } } - + long tid = fate.startTransaction(); fate.seedTransaction(tid, new TraceRepo<Master>(new ShutdownTServer(doomed, force)), false); fate.waitForCompletion(tid); fate.delete(tid); } - + @Override public void reportSplitExtent(TInfo info, TCredentials credentials, String serverName, TabletSplit split) throws TException { KeyExtent oldTablet = new KeyExtent(split.oldTablet); @@ -761,11 +761,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } log.warn("Got a split from a server we don't recognize: " + serverName); } - + @Override public void reportTabletStatus(TInfo info, TCredentials credentials, String serverName, TabletLoadState status, TKeyExtent ttablet) throws TException { KeyExtent tablet = new KeyExtent(ttablet); - + switch (status) { case LOAD_FAILURE: log.error(serverName + " reports assignment failed for tablet " + tablet); @@ -789,14 +789,14 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt break; } } - + @Override public void setMasterGoalState(TInfo info, TCredentials c, MasterGoalState state) throws ThriftSecurityException, TException { security.canPerformSystemActions(c); - + Master.this.setMasterGoalState(state); } - + private void updatePlugins(String property) { if (property.equals(Property.MASTER_TABLET_BALANCER.getKey())) { TabletBalancer balancer = createInstanceFromPropertyName(instance.getConfiguration(), Property.MASTER_TABLET_BALANCER, TabletBalancer.class, @@ -806,11 +806,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt log.info("tablet balancer changed to " + tabletBalancer.getClass().getName()); } } - + @Override public void removeSystemProperty(TInfo info, TCredentials c, String property) throws ThriftSecurityException, TException { security.canPerformSystemActions(c); - + try { SystemPropUtil.removeSystemProperty(property); updatePlugins(property); @@ -819,11 +819,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new TException(e.getMessage()); } } - + @Override public void setSystemProperty(TInfo info, TCredentials c, String property, String value) throws ThriftSecurityException, TException { security.canPerformSystemActions(c); - + try { SystemPropUtil.setSystemProperty(property, value); updatePlugins(property); @@ -832,24 +832,24 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new TException(e.getMessage()); } } - + private void authenticate(TCredentials c) throws ThriftSecurityException { if (!security.authenticateUser(c, c)) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS); - + } - + @Override public long beginTableOperation(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { authenticate(credentials); return fate.startTransaction(); } - + @Override public void executeTableOperation(TInfo tinfo, TCredentials c, long opid, org.apache.accumulo.core.master.thrift.TableOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoCleanup) throws ThriftSecurityException, ThriftTableOperationException, TException { authenticate(c); - + switch (op) { case CREATE: { String tableName = ByteBufferUtil.toString(arguments.get(0)); @@ -857,66 +857,93 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); checkNotMetadataTable(tableName, TableOperation.CREATE); checkTableName(tableName, TableOperation.CREATE); - + org.apache.accumulo.core.client.admin.TimeType timeType = org.apache.accumulo.core.client.admin.TimeType.valueOf(ByteBufferUtil.toString(arguments .get(1))); fate.seedTransaction(opid, new TraceRepo<Master>(new CreateTable(c.getPrincipal(), tableName, timeType, options)), autoCleanup); - + break; } case RENAME: { String oldTableName = ByteBufferUtil.toString(arguments.get(0)); String newTableName = ByteBufferUtil.toString(arguments.get(1)); - + String tableId = checkTableId(oldTableName, TableOperation.RENAME); checkNotMetadataTable(oldTableName, TableOperation.RENAME); checkNotMetadataTable(newTableName, TableOperation.RENAME); checkTableName(newTableName, TableOperation.RENAME); - if (!security.canRenameTable(c, tableId)) + + final boolean canRename; + try { + canRename = security.canRenameTable(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, oldTableName, TableOperation.RENAME); + throw e; + } + + if (!canRename) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new RenameTable(tableId, oldTableName, newTableName)), autoCleanup); - + break; } case CLONE: { String srcTableId = ByteBufferUtil.toString(arguments.get(0)); String tableName = ByteBufferUtil.toString(arguments.get(1)); - + checkNotMetadataTable(tableName, TableOperation.CLONE); checkTableName(tableName, TableOperation.CLONE); - if (!security.canCloneTable(c, srcTableId)) + + final boolean canCloneTable; + try { + canCloneTable = security.canCloneTable(c, srcTableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, srcTableId, null, TableOperation.CLONE); + throw e; + } + + if (!canCloneTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + Map<String,String> propertiesToSet = new HashMap<String,String>(); Set<String> propertiesToExclude = new HashSet<String>(); - + for (Entry<String,String> entry : options.entrySet()) { if (entry.getKey().startsWith(TableOperationsImpl.CLONE_EXCLUDE_PREFIX)) { propertiesToExclude.add(entry.getKey().substring(TableOperationsImpl.CLONE_EXCLUDE_PREFIX.length())); continue; } - + if (!TablePropUtil.isPropertyValid(entry.getKey(), entry.getValue())) { throw new ThriftTableOperationException(null, tableName, TableOperation.CLONE, TableOperationExceptionType.OTHER, "Property or value not valid " + entry.getKey() + "=" + entry.getValue()); } - + propertiesToSet.put(entry.getKey(), entry.getValue()); } - + fate.seedTransaction(opid, new TraceRepo<Master>(new CloneTable(c.getPrincipal(), srcTableId, tableName, propertiesToSet, propertiesToExclude)), autoCleanup); - + break; } case DELETE: { String tableName = ByteBufferUtil.toString(arguments.get(0)); final String tableId = checkTableId(tableName, TableOperation.DELETE); checkNotMetadataTable(tableName, TableOperation.DELETE); - if (!security.canDeleteTable(c, tableId)) + + final boolean canDeleteTable; + try { + canDeleteTable = security.canDeleteTable(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.DELETE); + throw e; + } + + if (!canDeleteTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new DeleteTable(tableId)), autoCleanup); break; } @@ -924,10 +951,18 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt String tableName = ByteBufferUtil.toString(arguments.get(0)); final String tableId = checkTableId(tableName, TableOperation.ONLINE); checkNotMetadataTable(tableName, TableOperation.ONLINE); - - if (!security.canOnlineOfflineTable(c, tableId)) + + final boolean canOnlineOfflineTable; + try { + canOnlineOfflineTable = security.canOnlineOfflineTable(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.ONLINE); + throw e; + } + + if (!canOnlineOfflineTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new ChangeTableState(tableId, TableOperation.ONLINE)), autoCleanup); break; } @@ -935,10 +970,18 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt String tableName = ByteBufferUtil.toString(arguments.get(0)); final String tableId = checkTableId(tableName, TableOperation.OFFLINE); checkNotMetadataTable(tableName, TableOperation.OFFLINE); - - if (!security.canOnlineOfflineTable(c, tableId)) + + final boolean canOnlineOfflineTable; + try { + canOnlineOfflineTable = security.canOnlineOfflineTable(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.OFFLINE); + throw e; + } + + if (!canOnlineOfflineTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new ChangeTableState(tableId, TableOperation.OFFLINE)), autoCleanup); break; } @@ -956,10 +999,18 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } log.debug("Creating merge op: " + tableId + " " + startRow + " " + endRow); - - if (!security.canMerge(c, tableId)) + + final boolean canMerge; + try { + canMerge = security.canMerge(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.MERGE); + throw e; + } + + if (!canMerge) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow)), autoCleanup); break; } @@ -967,13 +1018,21 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt String tableName = ByteBufferUtil.toString(arguments.get(0)); Text startRow = ByteBufferUtil.toText(arguments.get(1)); Text endRow = ByteBufferUtil.toText(arguments.get(2)); - + final String tableId = checkTableId(tableName, TableOperation.DELETE_RANGE); checkNotMetadataTable(tableName, TableOperation.DELETE_RANGE); - - if (!security.canDeleteRange(c, tableId)) + + final boolean canDeleteRange; + try { + canDeleteRange = security.canDeleteRange(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.DELETE_RANGE); + throw e; + } + + if (!canDeleteRange) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow)), autoCleanup); break; } @@ -982,13 +1041,21 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt String dir = ByteBufferUtil.toString(arguments.get(1)); String failDir = ByteBufferUtil.toString(arguments.get(2)); boolean setTime = Boolean.parseBoolean(ByteBufferUtil.toString(arguments.get(3))); - + final String tableId = checkTableId(tableName, TableOperation.BULK_IMPORT); checkNotMetadataTable(tableName, TableOperation.BULK_IMPORT); - - if (!security.canBulkImport(c, tableId)) + + final boolean canBulkImport; + try { + canBulkImport = security.canBulkImport(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.BULK_IMPORT); + throw e; + } + + if (!canBulkImport) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new BulkImport(tableId, dir, failDir, setTime)), autoCleanup); break; } @@ -997,61 +1064,111 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt byte[] startRow = ByteBufferUtil.toBytes(arguments.get(1)); byte[] endRow = ByteBufferUtil.toBytes(arguments.get(2)); List<IteratorSetting> iterators = IteratorUtil.decodeIteratorSettings(ByteBufferUtil.toBytes(arguments.get(3))); - - if (!security.canCompact(c, tableId)) + + final boolean canCompact; + try { + canCompact = security.canCompact(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, null, TableOperation.COMPACT); + throw e; + } + + if (!canCompact) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new CompactRange(tableId, startRow, endRow, iterators)), autoCleanup); break; } case COMPACT_CANCEL: { String tableId = ByteBufferUtil.toString(arguments.get(0)); - - if (!security.canCompact(c, tableId)) + + final boolean canCancelCompact; + try { + canCancelCompact = security.canCompact(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, null, TableOperation.COMPACT_CANCEL); + throw e; + } + + if (!canCancelCompact) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + fate.seedTransaction(opid, new TraceRepo<Master>(new CancelCompactions(tableId)), autoCleanup); break; } case IMPORT: { String tableName = ByteBufferUtil.toString(arguments.get(0)); String exportDir = ByteBufferUtil.toString(arguments.get(1)); - + if (!security.canImport(c)) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + checkNotMetadataTable(tableName, TableOperation.CREATE); checkTableName(tableName, TableOperation.CREATE); - + fate.seedTransaction(opid, new TraceRepo<Master>(new ImportTable(c.getPrincipal(), tableName, exportDir)), autoCleanup); break; } case EXPORT: { String tableName = ByteBufferUtil.toString(arguments.get(0)); String exportDir = ByteBufferUtil.toString(arguments.get(1)); - + String tableId = checkTableId(tableName, TableOperation.EXPORT); - - if (!security.canExport(c, tableId)) + + final boolean canExport; + try { + canExport = security.canExport(c, tableId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, tableName, TableOperation.EXPORT); + throw e; + } + + if (!canExport) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - + checkNotMetadataTable(tableName, TableOperation.EXPORT); - + fate.seedTransaction(opid, new TraceRepo<Master>(new ExportTable(tableName, tableId, exportDir)), autoCleanup); break; } - + default: throw new UnsupportedOperationException(); } - + } - + + /** + * Inspects the {@link ThriftSecurityException} and throws a {@link ThriftTableOperationException} if the {@link SecurityErrorCode} on the + * {@link ThriftSecurityException} was {code}TABLE_DOESNT_EXIST{code}. If the {@link ThriftSecurityException} is thrown because a table doesn't exist + * anymore, clients will likely see an {@link AccumuloSecurityException} instead of a {@link TableNotFoundException} as expected. If the + * {@link ThriftSecurityException} has a different {@link SecurityErrorCode}, this method does nothing and expects the caller to properly handle the + * original exception. + * + * @param e + * A caught ThriftSecurityException + * @param tableId + * Table ID being operated on, or null + * @param tableName + * Table name being operated on, or null + * @param op + * The TableOperation the Master was attempting to perform + * @throws ThriftTableOperationException + * Thrown if {@link e} was thrown because {@link SecurityErrorCode#TABLE_DOESNT_EXIST} + */ + private void throwIfTableMissingSecurityException(ThriftSecurityException e, String tableId, String tableName, TableOperation op) + throws ThriftTableOperationException { + // ACCUMULO-3135 Table can be deleted after we get table ID but before we can check permission + if (e.isSetCode() && SecurityErrorCode.TABLE_DOESNT_EXIST == e.getCode()) { + throw new ThriftTableOperationException(tableId, tableName, op, TableOperationExceptionType.NOTFOUND, "Table no longer exists"); + } + } + @Override public String waitForTableOperation(TInfo tinfo, TCredentials credentials, long opid) throws ThriftSecurityException, ThriftTableOperationException, TException { authenticate(credentials); - + TStatus status = fate.waitForCompletion(opid); if (status == TStatus.FAILED) { Exception e = fate.getException(opid); @@ -1064,26 +1181,26 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt else throw new RuntimeException(e); } - + String ret = fate.getReturn(opid); if (ret == null) ret = ""; // thrift does not like returning null return ret; } - + @Override public void finishTableOperation(TInfo tinfo, TCredentials credentials, long opid) throws ThriftSecurityException, TException { authenticate(credentials); fate.delete(opid); } } - + public MergeInfo getMergeInfo(KeyExtent tablet) { if (tablet.isRootTablet()) return new MergeInfo(); return getMergeInfo(tablet.getTableId()); } - + public MergeInfo getMergeInfo(Text tableId) { synchronized (mergeLock) { try { @@ -1105,7 +1222,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException { synchronized (mergeLock) { String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getRange().getTableId().toString() + "/merge"; @@ -1126,7 +1243,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } nextEvent.event("Merge state of %s set to %s", info.getRange(), state); } - + public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException { synchronized (mergeLock) { String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge"; @@ -1135,7 +1252,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } nextEvent.event("Merge state of %s cleared", tableId); } - + private void setMasterGoalState(MasterGoalState state) { try { ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(Constants.UTF8), @@ -1144,7 +1261,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt log.error("Unable to set master goal state in zookeeper"); } } - + MasterGoalState getMasterGoalState() { while (true) try { @@ -1155,7 +1272,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt UtilWaitThread.sleep(1000); } } - + private void shutdown(boolean stopTabletServers) { if (stopTabletServers) { setMasterGoalState(MasterGoalState.CLEAN_STOP); @@ -1166,16 +1283,16 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } setMasterState(MasterState.STOP); } - + public boolean hasCycled(long time) { for (TabletGroupWatcher watcher : watchers) { if (watcher.stats.lastScanFinished() < time) return false; } - + return true; } - + public void clearMigrations(String tableId) { synchronized (migrations) { Iterator<KeyExtent> iterator = migrations.keySet().iterator(); @@ -1187,11 +1304,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + static enum TabletGoalState { HOSTED, UNASSIGNED, DELETED }; - + TabletGoalState getSystemGoalState(TabletLocationState tls) { switch (getMasterState()) { case NORMAL: @@ -1214,7 +1331,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt // unreachable return TabletGoalState.HOSTED; } - + TabletGoalState getTableGoalState(KeyExtent extent) { TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString()); if (tableState == null) @@ -1229,7 +1346,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt return TabletGoalState.HOSTED; } } - + TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) { KeyExtent extent = tls.extent; // Shutting down? @@ -1257,7 +1374,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt if (tls.chopped && tls.walogs.isEmpty()) return TabletGoalState.UNASSIGNED; } - + return TabletGoalState.HOSTED; case WAITING_FOR_OFFLINE: case MERGING: @@ -1265,7 +1382,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + // taking table offline? state = getTableGoalState(extent); if (state == TabletGoalState.HOSTED) { @@ -1278,34 +1395,34 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return state; } - + private class TabletGroupWatcher extends Daemon { - + final TabletStateStore store; final TabletGroupWatcher dependentWatcher; - + final TableStats stats = new TableStats(); - + TabletGroupWatcher(TabletStateStore store, TabletGroupWatcher dependentWatcher) { this.store = store; this.dependentWatcher = dependentWatcher; } - + Map<Text,TableCounts> getStats() { return stats.getLast(); } - + TableCounts getStats(Text tableId) { return stats.getLast(tableId); } - + @Override public void run() { - + Thread.currentThread().setName("Watching " + store.name()); int[] oldCounts = new int[TabletState.values().length]; EventCoordinator.Listener eventListener = nextEvent.getListener(); - + while (stillMaster()) { int totalUnloaded = 0; int unloaded = 0; @@ -1317,27 +1434,27 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt currentMerges.put(merge.getRange().getTableId(), new MergeStats(merge)); } } - + // Get the current status for the current list of tservers SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>(); for (TServerInstance entry : tserverSet.getCurrentServers()) { currentTServers.put(entry, tserverStatus.get(entry)); } - + if (currentTServers.size() == 0) { eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS); continue; } - + // Don't move tablets to servers that are shutting down SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers); destinations.keySet().removeAll(serversToShutdown); - + List<Assignment> assignments = new ArrayList<Assignment>(); List<Assignment> assigned = new ArrayList<Assignment>(); List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>(); Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>(); - + int[] counts = new int[TabletState.values().length]; stats.begin(); // Walk through the tablets in our store, and work tablets @@ -1349,7 +1466,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt // ignore entries for tables that do not exist in zookeeper if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null) continue; - + // Don't overwhelm the tablet servers with work if (unassigned.size() + unloaded > MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); @@ -1376,12 +1493,12 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); sendChopRequest(mergeStats.getMergeInfo(), state, tls); sendSplitRequest(mergeStats.getMergeInfo(), state, tls); - + // Always follow through with assignments if (state == TabletState.ASSIGNED) { goal = TabletGoalState.HOSTED; } - + // if we are shutting down all the tabletservers, we have to do it in order if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) { if (serversToShutdown.equals(currentTServers.keySet())) { @@ -1390,7 +1507,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + if (goal == TabletGoalState.HOSTED) { if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) { if (recoveryManager.recoverLogs(tls.extent, tls.walogs)) @@ -1452,12 +1569,12 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } counts[state.ordinal()]++; } - + flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); - + // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(); - + // Report changes for (TabletState state : TabletState.values()) { int i = state.ordinal(); @@ -1470,14 +1587,14 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt if (totalUnloaded > 0) { nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded); } - + updateMergeState(mergeStatsCache); - + log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS); } catch (Exception ex) { log.error("Error processing table state for store " + store.name(), ex); - if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { + if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow()); } else { UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS); @@ -1485,7 +1602,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private void repairMetadata(Text row) { Master.log.debug("Attempting repair on " + row); // ACCUMULO-2261 if a dying tserver writes a location before its lock information propagates, it may cause duplicate assignment. @@ -1542,7 +1659,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) { // Already split? if (!info.getState().equals(MergeState.SPLITTING)) @@ -1582,7 +1699,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) { // Don't bother if we're in the wrong state if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED)) @@ -1609,7 +1726,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) { for (MergeStats stats : mergeStatsCache.values()) { try { @@ -1623,7 +1740,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt if (update != stats.getMergeInfo().getState()) { setMergeState(stats.getMergeInfo(), update); } - + if (update == MergeState.MERGING) { try { if (stats.getMergeInfo().isDelete()) { @@ -1641,7 +1758,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private void deleteTablets(MergeInfo info) throws AccumuloException { KeyExtent range = info.getRange(); log.debug("Deleting tablets for " + range); @@ -1695,7 +1812,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } finally { bw.close(); } - + if (followingTablet != null) { log.debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow()); bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig()); @@ -1718,7 +1835,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new AccumuloException(ex); } } - + private void mergeMetadataRecords(MergeInfo info) throws AccumuloException { KeyExtent range = info.getRange(); log.debug("Merging metadata for " + range); @@ -1733,7 +1850,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false); if (range.isMeta()) scanRange = scanRange.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE); - + BatchWriter bw = null; try { long fileCount = 0; @@ -1764,7 +1881,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString())); } } - + // read the logical time from the last tablet in the merge range, it is not included in // the loop above scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); @@ -1778,37 +1895,37 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString()); } } - + if (maxLogicalTime != null) Constants.METADATA_TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes(Constants.UTF8))); - + if (!m.getUpdates().isEmpty()) { bw.addMutation(m); } - + bw.flush(); - + log.debug("Moved " + fileCount + " files to " + stop); - + if (firstPrevRowValue == null) { log.debug("tablet already merged"); return; } - + stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue)); Mutation updatePrevRow = stop.getPrevRowUpdateMutation(); log.debug("Setting the prevRow for last tablet: " + stop); bw.addMutation(updatePrevRow); bw.flush(); - + deleteTablets(scanRange, bw, conn); - + // Clean-up the last chopped marker m = new Mutation(stopRow); Constants.METADATA_CHOPPED_COLUMN.putDelete(m); bw.addMutation(m); bw.flush(); - + } catch (Exception ex) { throw new AccumuloException(ex); } finally { @@ -1820,7 +1937,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private void deleteTablets(Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException { Scanner scanner; Mutation m; @@ -1838,19 +1955,19 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt while (row.hasNext()) { Entry<Key,Value> entry = row.next(); Key key = entry.getKey(); - + if (m == null) m = new Mutation(key.getRow()); - + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); log.debug("deleting entry " + key); } bw.addMutation(m); } - + bw.flush(); } - + private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { try { Connector conn = getConnector(); @@ -1872,7 +1989,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex); } } - + private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned, List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException { if (!assignedToDeadServers.isEmpty()) { @@ -1881,7 +1998,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt store.unassign(assignedToDeadServers); nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); } - + if (!currentTServers.isEmpty()) { Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>(); tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut); @@ -1898,7 +2015,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt if (!unassigned.isEmpty() && assignedOut.isEmpty()) log.warn("Load balancer failed to assign any tablets"); } - + if (assignments.size() > 0) { log.info(String.format("Assigning %d tablets", assignments.size())); store.setFutureLocations(assignments); @@ -1913,11 +2030,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + } - + private class MigrationCleanupThread extends Daemon { - + @Override public void run() { setName("Migration Cleanup Thread"); @@ -1933,7 +2050,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS); } } - + /** * If a migrating tablet splits, and the tablet dies before sending the * master a message, the migration will refer to a non-existing tablet, @@ -1968,9 +2085,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private class StatusThread extends Daemon { - + @Override public void run() { setName("Status Thread"); @@ -2044,11 +2161,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } - + private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation()); checkForHeldServer(tserverStatus); - + if (!badServers.isEmpty()) { log.debug("not balancing because the balance information is out-of-date " + badServers.keySet()); } else if (notHosted() > 0) { @@ -2062,7 +2179,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return DEFAULT_WAIT_FOR_WATCHER; } - + private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) { TServerInstance instance = null; int crazyHoldTime = 0; @@ -2089,7 +2206,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt tserverSet.remove(instance); } } - + private long balanceTablets() { List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>(); Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>(); @@ -2097,7 +2214,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt migrationsCopy.addAll(migrations.keySet()); } long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), Collections.unmodifiableSet(migrationsCopy), migrationsOut); - + for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) { if (migrations.containsKey(m.tablet)) { log.warn("balancer requested migration more than once, skipping " + m); @@ -2111,9 +2228,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return wait; } - + } - + private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation() { long start = System.currentTimeMillis(); SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>(); @@ -2158,24 +2275,24 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.)); return result; } - + public void run() throws IOException, InterruptedException, KeeperException { final String zroot = ZooUtil.getRoot(instance); - + getMasterLock(zroot + Constants.ZMASTER_LOCK); - + recoveryManager = new RecoveryManager(this); - + TableManager.getInstance().addObserver(this); - + StatusThread statusThread = new StatusThread(); statusThread.start(); - + MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread(); migrationCleanupThread.start(); - + tserverSet.startListeningForTabletServerChanges(); - + ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() { @Override public void process(WatchedEvent event) { @@ -2188,10 +2305,10 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } }); - + TCredentials systemAuths = SecurityConstants.getSystemCredentials(); final TabletStateStore stores[] = { - new ZooTabletStateStore(new ZooStore(zroot)), + new ZooTabletStateStore(new ZooStore(zroot)), new RootTabletStateStore(instance, systemAuths, this), new MetaDataStateStore(instance, systemAuths, this) }; @@ -2235,17 +2352,17 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt String address = org.apache.accumulo.core.util.AddressUtil.toString(sock); log.info("Setting master lock data to " + address); masterLock.replaceLockData(address.getBytes(Constants.UTF8)); - + while (!clientService.isServing()) { UtilWaitThread.sleep(100); } while (clientService.isServing()) { UtilWaitThread.sleep(500); } - + final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; statusThread.join(remaining(deadline)); - + // quit, even if the tablet servers somehow jam up and the watchers // don't stop for (TabletGroupWatcher watcher : watchers) { @@ -2253,25 +2370,25 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } log.info("exiting"); } - + private long remaining(long deadline) { return Math.max(1, deadline - System.currentTimeMillis()); } - + public ZooLock getMasterLock() { return masterLock; } - + private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher { - + boolean acquiredLock = false; boolean failedToAcquireLock = false; - + @Override public void lostLock(LockLossReason reason) { Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1); } - + @Override public void unableToMonitorLockNode(final Throwable e) { Halt.halt(-1, new Runnable() { @@ -2280,13 +2397,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt log.fatal("No longer able to monitor master lock node", e); } }); - + } - + @Override public synchronized void acquiredLock() { log.debug("Acquired master lock"); - + if (acquiredLock || failedToAcquireLock) { Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); } @@ -2294,11 +2411,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt acquiredLock = true; notifyAll(); } - + @Override public synchronized void failedToAcquireLock(Exception e) { log.warn("Failed to get master lock " + e); - + if (acquiredLock) { Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); } @@ -2306,7 +2423,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt failedToAcquireLock = true; notifyAll(); } - + public synchronized void waitForChange() { while (!acquiredLock && !failedToAcquireLock) { try { @@ -2321,19 +2438,19 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt final String masterClientAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, getSystemConfiguration().getPort( Property.MASTER_CLIENTPORT))); - + while (true) { - + MasterLockWatcher masterLockWatcher = new MasterLockWatcher(); masterLock = new ZooLock(zMasterLoc); masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes(Constants.UTF8)); masterLockWatcher.waitForChange(); - + if (masterLockWatcher.acquiredLock) { break; } - + if (!masterLockWatcher.failedToAcquireLock) { throw new IllegalStateException("master lock in unknown state"); } @@ -2345,11 +2462,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt setMasterState(MasterState.HAVE_LOCK); } - + public static void main(String[] args) throws Exception { try { SecurityUtil.serverLogin(); - + FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()); String hostname = Accumulo.getLocalAddress(args); Instance instance = HdfsZooInstance.getInstance(); @@ -2363,9 +2480,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt System.exit(1); } } - + static final String I_DONT_KNOW_WHY = "unexpected failure"; - + @Override public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) { DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS); @@ -2381,7 +2498,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) obit.post(dead.hostPort(), cause); } - + Set<TServerInstance> unexpected = new HashSet<TServerInstance>(deleted); unexpected.removeAll(this.serversToShutdown); if (unexpected.size() > 0) { @@ -2398,7 +2515,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt synchronized (serversToShutdown) { cleanListByHostAndPort(serversToShutdown, deleted, added); } - + synchronized (migrations) { Iterator<Entry<KeyExtent,TServerInstance>> iter = migrations.entrySet().iterator(); while (iter.hasNext()) { @@ -2431,7 +2548,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } - + @Override public void stateChanged(String tableId, TableState state) { nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); @@ -2439,13 +2556,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt clearMigrations(tableId); } } - + @Override public void initialize(Map<String,TableState> tableIdToStateMap) {} - + @Override public void sessionExpired() {} - + @Override public Set<String> onlineTables() { Set<String> result = new HashSet<String>(); @@ -2455,7 +2572,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt return result; } TableManager manager = TableManager.getInstance(); - + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { TableState state = manager.getTableState(tableId); if (state != null) { @@ -2465,12 +2582,12 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + @Override public Set<TServerInstance> onlineTabletServers() { return tserverSet.getCurrentServers(); } - + @Override public Collection<MergeInfo> merges() { List<MergeInfo> result = new ArrayList<MergeInfo>(); @@ -2479,38 +2596,38 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } return result; } - + public void killTServer(TServerInstance server) { nextEvent.event("Forcing server down %s", server); serversToShutdown.add(server); } - + // recovers state from the persistent transaction to shutdown a server public void shutdownTServer(TServerInstance server) { nextEvent.event("Tablet Server shutdown requested for %s", server); serversToShutdown.add(server); } - + public EventCoordinator getEventCoordinator() { return nextEvent; } - + public Instance getInstance() { return this.instance; } - + public AccumuloConfiguration getSystemConfiguration() { return serverConfig.getConfiguration(); } - + public ServerConfiguration getConfiguration() { return serverConfig; } - + public FileSystem getFileSystem() { return this.fs; } - + public void updateRecoveryInProgress(String file) { recoveriesInProgress.add(file); }
