This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 3f47da1 Refactor TabletGroupWatcher (#1761) 3f47da1 is described below commit 3f47da1777f5d059d42d7706574ee3fd9882006b Author: Mike Miller <mmil...@apache.org> AuthorDate: Mon Nov 2 10:14:14 2020 -0500 Refactor TabletGroupWatcher (#1761) * Create TabletLists to hold the many different data structures being tracked in the run method of TabletGroupWatcher * Create methods for some of the logic in the switch case * Pass TabletLists to the flush method and break flush into methods * Rename TabletLocationState.getServer() to getLocation() as location is a more meaningful name instead of server * Move markDeadServerLogsAsClosed from Master and make private --- .../server/master/state/TabletLocationState.java | 2 +- .../master/state/TabletLocationStateTest.java | 8 +- .../java/org/apache/accumulo/master/Master.java | 13 - .../apache/accumulo/master/TabletGroupWatcher.java | 279 ++++++++++++--------- 4 files changed, 171 insertions(+), 131 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java index c3a96aa..c2111ef 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java @@ -87,7 +87,7 @@ public class TabletLocationState { return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : ""); } - public TServerInstance getServer() { + public TServerInstance getLocation() { TServerInstance result = null; if (current != null) { result = current; diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java index 9b9d148..b31d138 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/master/state/TabletLocationStateTest.java @@ -104,25 +104,25 @@ public class TabletLocationStateTest { @Test public void testGetServer_Current() throws Exception { tls = new TabletLocationState(keyExtent, null, current, last, null, walogs, true); - assertSame(current, tls.getServer()); + assertSame(current, tls.getLocation()); } @Test public void testGetServer_Future() throws Exception { tls = new TabletLocationState(keyExtent, future, null, last, null, walogs, true); - assertSame(future, tls.getServer()); + assertSame(future, tls.getLocation()); } @Test public void testGetServer_Last() throws Exception { tls = new TabletLocationState(keyExtent, null, null, last, null, walogs, true); - assertSame(last, tls.getServer()); + assertSame(last, tls.getLocation()); } @Test public void testGetServer_None() throws Exception { tls = new TabletLocationState(keyExtent, null, null, null, null, walogs, true); - assertNull(tls.getServer()); + assertNull(tls.getLocation()); } @Test diff --git a/server/manager/src/main/java/org/apache/accumulo/master/Master.java b/server/manager/src/main/java/org/apache/accumulo/master/Master.java index bacc583..45449f7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/manager/src/main/java/org/apache/accumulo/master/Master.java @@ -100,8 +100,6 @@ import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.log.WalStateManager; -import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.master.LiveTServerSet; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer; @@ -135,7 +133,6 @@ import org.apache.accumulo.server.util.TableInfoUtil; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.accumulo.start.classloader.vfs.ContextManager; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.thrift.TException; @@ -1675,16 +1672,6 @@ public class Master extends AbstractServer } } - public void markDeadServerLogsAsClosed(Map<TServerInstance,List<Path>> logsForDeadServers) - throws WalMarkerException { - WalStateManager mgr = new WalStateManager(getContext()); - for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) { - for (Path path : server.getValue()) { - mgr.closeWal(server.getKey(), path); - } - } - } - public void updateBulkImportStatus(String directory, BulkImportState state) { bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), state); } diff --git a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 5647554..d12eb67 100644 --- a/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -134,6 +134,37 @@ abstract class TabletGroupWatcher extends Daemon { return candidates.equals(lastScanServers); } + /** + * Collection of data structures used to track Tablet assignments + */ + private static class TabletLists { + private final List<Assignment> assignments = new ArrayList<>(); + private final List<Assignment> assigned = new ArrayList<>(); + private final List<TabletLocationState> assignedToDeadServers = new ArrayList<>(); + private final List<TabletLocationState> suspendedToGoneServers = new ArrayList<>(); + private final Map<KeyExtent,TServerInstance> unassigned = new HashMap<>(); + private final Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>(); + // read only lists of tablet servers + private final SortedMap<TServerInstance,TabletServerStatus> currentTServers; + private final SortedMap<TServerInstance,TabletServerStatus> destinations; + + public TabletLists(Master m, SortedMap<TServerInstance,TabletServerStatus> curTServers) { + var destinationsMod = new TreeMap<>(curTServers); + // Don't move tablets to servers that are shutting down + destinationsMod.keySet().removeAll(m.serversToShutdown); + this.destinations = Collections.unmodifiableSortedMap(destinationsMod); + this.currentTServers = Collections.unmodifiableSortedMap(curTServers); + } + + public void reset() { + assignments.clear(); + assigned.clear(); + assignedToDeadServers.clear(); + suspendedToGoneServers.clear(); + unassigned.clear(); + } + } + @Override public void run() { Thread.currentThread().setName("Watching " + store.name()); @@ -172,16 +203,7 @@ abstract class TabletGroupWatcher extends Daemon { continue; } - // Don't move tablets to servers that are shutting down - SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<>(currentTServers); - destinations.keySet().removeAll(master.serversToShutdown); - - List<Assignment> assignments = new ArrayList<>(); - List<Assignment> assigned = new ArrayList<>(); - List<TabletLocationState> assignedToDeadServers = new ArrayList<>(); - List<TabletLocationState> suspendedToGoneServers = new ArrayList<>(); - Map<KeyExtent,TServerInstance> unassigned = new HashMap<>(); - Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>(); + TabletLists tLists = new TabletLists(master, currentTServers); MasterState masterState = master.getMasterState(); int[] counts = new int[TabletState.values().length]; @@ -200,15 +222,10 @@ abstract class TabletGroupWatcher extends Daemon { continue; // Don't overwhelm the tablet servers with work - if (unassigned.size() + unloaded + if (tLists.unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(destinations, assignments, assigned, assignedToDeadServers, - logsForDeadServers, suspendedToGoneServers, unassigned); - assignments.clear(); - assigned.clear(); - assignedToDeadServers.clear(); - suspendedToGoneServers.clear(); - unassigned.clear(); + flushChanges(tLists, wals); + tLists.reset(); unloaded = 0; eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); } @@ -220,7 +237,7 @@ abstract class TabletGroupWatcher extends Daemon { return mStats != null ? mStats : new MergeStats(new MergeInfo()); }); TabletGoalState goal = master.getGoalState(tls, mergeStats.getMergeInfo()); - TServerInstance server = tls.getServer(); + TServerInstance location = tls.getLocation(); TabletState state = tls.getState(currentTServers.keySet()); TabletLogger.missassigned(tls.extent, goal.toString(), state.toString(), tls.future, @@ -252,90 +269,45 @@ abstract class TabletGroupWatcher extends Daemon { } switch (state) { case HOSTED: - if (server.equals(master.migrations.get(tls.extent))) + if (location.equals(master.migrations.get(tls.extent))) master.migrations.remove(tls.extent); break; case ASSIGNED_TO_DEAD_SERVER: - assignedToDeadServers.add(tls); - if (server.equals(master.migrations.get(tls.extent))) - master.migrations.remove(tls.extent); - TServerInstance tserver = tls.futureOrCurrent(); - if (!logsForDeadServers.containsKey(tserver)) { - logsForDeadServers.put(tserver, wals.getWalsInUse(tserver)); - } + hostDeadTablet(tLists, tls, location, wals); break; case SUSPENDED: - if (master.getSteadyTime() - tls.suspend.suspensionTime - < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { - // Tablet is suspended. See if its tablet server is back. - TServerInstance returnInstance = null; - Iterator<TServerInstance> find = destinations - .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); - if (find.hasNext()) { - TServerInstance found = find.next(); - if (found.getLocation().equals(tls.suspend.server)) { - returnInstance = found; - } - } - - // Old tablet server is back. Return this tablet to its previous owner. - if (returnInstance != null) { - assignments.add(new Assignment(tls.extent, returnInstance)); - } - // else - tablet server not back. Don't ask for a new assignment right now. - - } else { - // Treat as unassigned, ask for a new assignment. - unassigned.put(tls.extent, server); - } + hostSuspendedTablet(tLists, tls, location, tableConf); break; case UNASSIGNED: - // maybe it's a finishing migration - TServerInstance dest = master.migrations.get(tls.extent); - if (dest != null) { - // if destination is still good, assign it - if (destinations.containsKey(dest)) { - assignments.add(new Assignment(tls.extent, dest)); - } else { - // get rid of this migration - master.migrations.remove(tls.extent); - unassigned.put(tls.extent, server); - } - } else { - unassigned.put(tls.extent, server); - } + hostUnassignedTablet(tLists, tls.extent, location); break; case ASSIGNED: // Send another reminder - assigned.add(new Assignment(tls.extent, tls.future)); + tLists.assigned.add(new Assignment(tls.extent, tls.future)); break; } } else { switch (state) { case SUSPENDED: // Request a move to UNASSIGNED, so as to allow balancing to continue. - suspendedToGoneServers.add(tls); - cancelOfflineTableMigrations(tls); + tLists.suspendedToGoneServers.add(tls); + cancelOfflineTableMigrations(tls.extent); break; case UNASSIGNED: - cancelOfflineTableMigrations(tls); + cancelOfflineTableMigrations(tls.extent); break; case ASSIGNED_TO_DEAD_SERVER: - assignedToDeadServers.add(tls); - if (!logsForDeadServers.containsKey(tls.futureOrCurrent())) { - logsForDeadServers.put(tls.futureOrCurrent(), - wals.getWalsInUse(tls.futureOrCurrent())); - } + unassignDeadTablet(tLists, tls, wals); break; case HOSTED: - TServerConnection client = master.tserverSet.getConnection(server); + TServerConnection client = master.tserverSet.getConnection(location); if (client != null) { client.unloadTablet(master.masterLock, tls.extent, goal.howUnload(), master.getSteadyTime()); unloaded++; totalUnloaded++; } else { - Master.log.warn("Could not connect to server {}", server); + Master.log.warn("Could not connect to server {}", location); } break; case ASSIGNED: @@ -345,8 +317,7 @@ abstract class TabletGroupWatcher extends Daemon { counts[state.ordinal()]++; } - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, - suspendedToGoneServers, unassigned); + flushChanges(tLists, wals); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(masterState); @@ -397,11 +368,76 @@ abstract class TabletGroupWatcher extends Daemon { } } - private void cancelOfflineTableMigrations(TabletLocationState tls) { - TServerInstance dest = master.migrations.get(tls.extent); - TableState tableState = master.getTableManager().getTableState(tls.extent.tableId()); - if (dest != null && tableState == TableState.OFFLINE) { + private void unassignDeadTablet(TabletLists tLists, TabletLocationState tls, WalStateManager wals) + throws WalMarkerException { + tLists.assignedToDeadServers.add(tls); + if (!tLists.logsForDeadServers.containsKey(tls.futureOrCurrent())) { + tLists.logsForDeadServers.put(tls.futureOrCurrent(), + wals.getWalsInUse(tls.futureOrCurrent())); + } + } + + private void hostUnassignedTablet(TabletLists tLists, KeyExtent tablet, + TServerInstance location) { + // maybe it's a finishing migration + TServerInstance dest = master.migrations.get(tablet); + if (dest != null) { + // if destination is still good, assign it + if (tLists.destinations.containsKey(dest)) { + tLists.assignments.add(new Assignment(tablet, dest)); + } else { + // get rid of this migration + master.migrations.remove(tablet); + tLists.unassigned.put(tablet, location); + } + } else { + tLists.unassigned.put(tablet, location); + } + } + + private void hostSuspendedTablet(TabletLists tLists, TabletLocationState tls, + TServerInstance location, TableConfiguration tableConf) { + if (master.getSteadyTime() - tls.suspend.suspensionTime + < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { + // Tablet is suspended. See if its tablet server is back. + TServerInstance returnInstance = null; + Iterator<TServerInstance> find = tLists.destinations + .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); + if (find.hasNext()) { + TServerInstance found = find.next(); + if (found.getLocation().equals(tls.suspend.server)) { + returnInstance = found; + } + } + + // Old tablet server is back. Return this tablet to its previous owner. + if (returnInstance != null) { + tLists.assignments.add(new Assignment(tls.extent, returnInstance)); + } + // else - tablet server not back. Don't ask for a new assignment right now. + + } else { + // Treat as unassigned, ask for a new assignment. + tLists.unassigned.put(tls.extent, location); + } + } + + private void hostDeadTablet(TabletLists tLists, TabletLocationState tls, TServerInstance location, + WalStateManager wals) throws WalMarkerException { + tLists.assignedToDeadServers.add(tls); + if (location.equals(master.migrations.get(tls.extent))) master.migrations.remove(tls.extent); + TServerInstance tserver = tls.futureOrCurrent(); + if (!tLists.logsForDeadServers.containsKey(tserver)) { + tLists.logsForDeadServers.put(tserver, wals.getWalsInUse(tserver)); + } + } + + private void cancelOfflineTableMigrations(KeyExtent extent) { + TServerInstance dest = master.migrations.get(extent); + TableState tableState = master.getTableManager().getTableState(extent.tableId()); + if (dest != null && tableState == TableState.OFFLINE) { + master.migrations.remove(extent); } } @@ -811,50 +847,50 @@ abstract class TabletGroupWatcher extends Daemon { } } - private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, - List<Assignment> assignments, List<Assignment> assigned, - List<TabletLocationState> assignedToDeadServers, - Map<TServerInstance,List<Path>> logsForDeadServers, - List<TabletLocationState> suspendedToGoneServers, Map<KeyExtent,TServerInstance> unassigned) - throws DistributedStoreException, TException, WalMarkerException { - boolean tabletsSuspendable = canSuspendTablets(); - if (!assignedToDeadServers.isEmpty()) { - int maxServersToShow = min(assignedToDeadServers.size(), 100); - Master.log.debug("{} assigned to dead servers: {}...", assignedToDeadServers.size(), - assignedToDeadServers.subList(0, maxServersToShow)); - Master.log.debug("logs for dead servers: {}", logsForDeadServers); - if (tabletsSuspendable) { - store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime()); + private void handleDeadTablets(TabletLists tLists, WalStateManager wals) + throws WalMarkerException, DistributedStoreException { + var deadTablets = tLists.assignedToDeadServers; + var deadLogs = tLists.logsForDeadServers; + + if (!deadTablets.isEmpty()) { + int maxServersToShow = min(deadTablets.size(), 100); + Master.log.debug("{} assigned to dead servers: {}...", deadTablets.size(), + deadTablets.subList(0, maxServersToShow)); + Master.log.debug("logs for dead servers: {}", deadLogs); + if (canSuspendTablets()) { + store.suspend(deadTablets, deadLogs, master.getSteadyTime()); } else { - store.unassign(assignedToDeadServers, logsForDeadServers); + store.unassign(deadTablets, deadLogs); } - master.markDeadServerLogsAsClosed(logsForDeadServers); + markDeadServerLogsAsClosed(wals, deadLogs); master.nextEvent.event( "Marked %d tablets as suspended because they don't have current servers", - assignedToDeadServers.size()); + deadTablets.size()); } - if (!suspendedToGoneServers.isEmpty()) { - int maxServersToShow = min(assignedToDeadServers.size(), 100); - Master.log.debug(assignedToDeadServers.size() + " suspended to gone servers: " - + assignedToDeadServers.subList(0, maxServersToShow) + "..."); - store.unsuspend(suspendedToGoneServers); + if (!tLists.suspendedToGoneServers.isEmpty()) { + int maxServersToShow = min(deadTablets.size(), 100); + Master.log.debug(deadTablets.size() + " suspended to gone servers: " + + deadTablets.subList(0, maxServersToShow) + "..."); + store.unsuspend(tLists.suspendedToGoneServers); } + } - if (!currentTServers.isEmpty()) { + private void getAssignmentsFromBalancer(TabletLists tLists, + Map<KeyExtent,TServerInstance> unassigned) { + if (!tLists.currentTServers.isEmpty()) { Map<KeyExtent,TServerInstance> assignedOut = new HashMap<>(); - master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), - Collections.unmodifiableMap(unassigned), assignedOut); + master.tabletBalancer.getAssignments(tLists.currentTServers, unassigned, assignedOut); for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) { if (unassigned.containsKey(assignment.getKey())) { if (assignment.getValue() != null) { - if (!currentTServers.containsKey(assignment.getValue())) { + if (!tLists.currentTServers.containsKey(assignment.getValue())) { Master.log.warn( "balancer assigned {} to a tablet server that is not current {} ignoring", assignment.getKey(), assignment.getValue()); continue; } - assignments.add(new Assignment(assignment.getKey(), assignment.getValue())); + tLists.assignments.add(new Assignment(assignment.getKey(), assignment.getValue())); } } else { Master.log.warn( @@ -866,13 +902,22 @@ abstract class TabletGroupWatcher extends Daemon { if (!unassigned.isEmpty() && assignedOut.isEmpty()) Master.log.warn("Load balancer failed to assign any tablets"); } + } + + private void flushChanges(TabletLists tLists, WalStateManager wals) + throws DistributedStoreException, TException, WalMarkerException { + var unassigned = Collections.unmodifiableMap(tLists.unassigned); - if (!assignments.isEmpty()) { - Master.log.info(String.format("Assigning %d tablets", assignments.size())); - store.setFutureLocations(assignments); + handleDeadTablets(tLists, wals); + + getAssignmentsFromBalancer(tLists, unassigned); + + if (!tLists.assignments.isEmpty()) { + Master.log.info(String.format("Assigning %d tablets", tLists.assignments.size())); + store.setFutureLocations(tLists.assignments); } - assignments.addAll(assigned); - for (Assignment a : assignments) { + tLists.assignments.addAll(tLists.assigned); + for (Assignment a : tLists.assignments) { TServerConnection client = master.tserverSet.getConnection(a.server); if (client != null) { client.assignTablet(master.masterLock, a.tablet); @@ -883,4 +928,12 @@ abstract class TabletGroupWatcher extends Daemon { } } + private static void markDeadServerLogsAsClosed(WalStateManager mgr, + Map<TServerInstance,List<Path>> logsForDeadServers) throws WalMarkerException { + for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) { + for (Path path : server.getValue()) { + mgr.closeWal(server.getKey(), path); + } + } + } }